aboutsummaryrefslogtreecommitdiffstats
path: root/core/agreement.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-09-04 17:39:05 +0800
committerGitHub <noreply@github.com>2018-09-04 17:39:05 +0800
commit04a63a22a24abaaa91b1d981e6d95260d80dadf4 (patch)
treed8e7335984a1b53f097f00f4e4956112d22aa673 /core/agreement.go
parent09393166791785ab6730b1c812b4a4fd07a92293 (diff)
downloaddexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar
dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.gz
dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.bz2
dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.lz
dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.xz
dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.zst
dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.zip
core: BA-based consensus core. (#93)
Diffstat (limited to 'core/agreement.go')
-rw-r--r--core/agreement.go169
1 files changed, 138 insertions, 31 deletions
diff --git a/core/agreement.go b/core/agreement.go
index 299971e..8fb2207 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -21,6 +21,7 @@ import (
"fmt"
"sync"
"sync/atomic"
+ "time"
"github.com/dexon-foundation/dexon-consensus-core/common"
"github.com/dexon-foundation/dexon-consensus-core/core/types"
@@ -62,6 +63,16 @@ type agreementReceiver interface {
confirmBlock(common.Hash)
}
+type pendingBlock struct {
+ block *types.Block
+ receivedTime time.Time
+}
+
+type pendingVote struct {
+ vote *types.Vote
+ receivedTime time.Time
+}
+
// agreementData is the data for agreementState.
type agreementData struct {
recv agreementReceiver
@@ -74,17 +85,22 @@ type agreementData struct {
votes map[uint64][]map[types.ValidatorID]*types.Vote
votesLock sync.RWMutex
blocks map[types.ValidatorID]*types.Block
+ blocksLock sync.Mutex
blockProposer blockProposerFn
}
// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
- state agreementState
- data *agreementData
- aID *atomic.Value
- validators map[types.ValidatorID]struct{}
- sigToPub SigToPubFn
- hasOutput bool
+ state agreementState
+ data *agreementData
+ aID *atomic.Value
+ validators map[types.ValidatorID]struct{}
+ sigToPub SigToPubFn
+ hasOutput bool
+ lock sync.RWMutex
+ pendingBlock []pendingBlock
+ pendingVote []pendingVote
+ candidateBlock map[common.Hash]*types.Block
}
// newAgreement creates a agreement instance.
@@ -102,10 +118,11 @@ func newAgreement(
leader: leader,
blockProposer: blockProposer,
},
- aID: &atomic.Value{},
- sigToPub: sigToPub,
+ aID: &atomic.Value{},
+ sigToPub: sigToPub,
+ candidateBlock: make(map[common.Hash]*types.Block),
}
- agreement.restart(validators)
+ agreement.restart(validators, types.Position{})
return agreement
}
@@ -117,19 +134,67 @@ func (a *agreement) terminate() {
}
// restart the agreement
-func (a *agreement) restart(validators types.ValidatorIDs) {
- a.data.votesLock.Lock()
- defer a.data.votesLock.Unlock()
- a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote)
- a.data.votes[1] = newVoteListMap()
- a.data.period = 1
- a.data.blocks = make(map[types.ValidatorID]*types.Block)
- a.data.requiredVote = len(validators)/3*2 + 1
- a.hasOutput = false
- a.state = newPrepareState(a.data)
- a.validators = make(map[types.ValidatorID]struct{})
- for _, v := range validators {
- a.validators[v] = struct{}{}
+func (a *agreement) restart(validators types.ValidatorIDs, aID types.Position) {
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ a.data.votesLock.Lock()
+ defer a.data.votesLock.Unlock()
+ a.data.blocksLock.Lock()
+ defer a.data.blocksLock.Unlock()
+ a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote)
+ a.data.votes[1] = newVoteListMap()
+ a.data.period = 1
+ a.data.blocks = make(map[types.ValidatorID]*types.Block)
+ a.data.requiredVote = len(validators)/3*2 + 1
+ a.data.leader.restart()
+ a.hasOutput = false
+ a.state = newPrepareState(a.data)
+ a.validators = make(map[types.ValidatorID]struct{})
+ for _, v := range validators {
+ a.validators[v] = struct{}{}
+ }
+ a.candidateBlock = make(map[common.Hash]*types.Block)
+ a.aID.Store(aID)
+ }()
+
+ expireTime := time.Now().Add(-10 * time.Second)
+ replayBlock := make([]*types.Block, 0)
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingBlock := make([]pendingBlock, 0)
+ for _, pending := range a.pendingBlock {
+ if pending.block.Position == aID {
+ replayBlock = append(replayBlock, pending.block)
+ } else if pending.receivedTime.After(expireTime) {
+ newPendingBlock = append(newPendingBlock, pending)
+ }
+ }
+ a.pendingBlock = newPendingBlock
+ }()
+
+ replayVote := make([]*types.Vote, 0)
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingVote := make([]pendingVote, 0)
+ for _, pending := range a.pendingVote {
+ if pending.vote.Position == aID {
+ replayVote = append(replayVote, pending.vote)
+ } else if pending.receivedTime.After(expireTime) {
+ newPendingVote = append(newPendingVote, pending)
+ }
+ }
+ a.pendingVote = newPendingVote
+ }()
+
+ for _, block := range replayBlock {
+ a.processBlock(block)
+ }
+
+ for _, vote := range replayVote {
+ a.processVote(vote)
}
}
@@ -143,11 +208,6 @@ func (a *agreement) agreementID() types.Position {
return a.aID.Load().(types.Position)
}
-// setAgreementID sets the current agreementID.
-func (a *agreement) setAgreementID(ID types.Position) {
- a.aID.Store(ID)
-}
-
// nextState is called at the spcifi clock time.
func (a *agreement) nextState() (err error) {
a.state, err = a.state.nextState()
@@ -155,7 +215,12 @@ func (a *agreement) nextState() (err error) {
}
func (a *agreement) sanityCheck(vote *types.Vote) error {
- if _, exist := a.validators[vote.ProposerID]; !exist {
+ if exist := func() bool {
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+ _, exist := a.validators[vote.ProposerID]
+ return exist
+ }(); !exist {
return ErrNotValidator
}
ok, err := verifyVoteSignature(vote, a.sigToPub)
@@ -165,7 +230,12 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
if !ok {
return ErrIncorrectVoteSignature
}
- if _, exist := a.data.votes[vote.Period]; exist {
+ if exist := func() bool {
+ a.data.votesLock.RLock()
+ defer a.data.votesLock.RUnlock()
+ _, exist := a.data.votes[vote.Period]
+ return exist
+ }(); exist {
if oldVote, exist :=
a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist {
if vote.BlockHash != oldVote.BlockHash {
@@ -179,6 +249,8 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
// prepareVote prepares a vote.
func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) (
err error) {
+ vote.ProposerID = a.data.ID
+ vote.Position = a.agreementID()
hash := hashVote(vote)
vote.Signature, err = prv.Sign(hash)
return
@@ -186,10 +258,16 @@ func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) (
// processVote is the entry point for processing Vote.
func (a *agreement) processVote(vote *types.Vote) error {
- vote = vote.Clone()
if err := a.sanityCheck(vote); err != nil {
return err
}
+ if vote.Position != a.agreementID() {
+ a.pendingVote = append(a.pendingVote, pendingVote{
+ vote: vote,
+ receivedTime: time.Now().UTC(),
+ })
+ return nil
+ }
if func() bool {
a.data.votesLock.Lock()
defer a.data.votesLock.Unlock()
@@ -211,21 +289,50 @@ func (a *agreement) processVote(vote *types.Vote) error {
return nil
}
+// prepareBlok prepares a block.
+func (a *agreement) prepareBlock(
+ block *types.Block, prv crypto.PrivateKey) error {
+ return a.data.leader.prepareBlock(block, prv)
+}
+
// processBlock is the entry point for processing Block.
func (a *agreement) processBlock(block *types.Block) error {
+ a.data.blocksLock.Lock()
+ defer a.data.blocksLock.Unlock()
if b, exist := a.data.blocks[block.ProposerID]; exist {
if b.Hash != block.Hash {
return &ErrFork{block.ProposerID, b.Hash, block.Hash}
}
return nil
}
- a.data.blocks[block.ProposerID] = block
+ if block.Position != a.agreementID() {
+ a.pendingBlock = append(a.pendingBlock, pendingBlock{
+ block: block,
+ receivedTime: time.Now().UTC(),
+ })
+ return nil
+ }
if err := a.data.leader.processBlock(block); err != nil {
return err
}
+ a.data.blocks[block.ProposerID] = block
+ a.addCandidateBlock(block)
return nil
}
+func (a *agreement) addCandidateBlock(block *types.Block) {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ a.candidateBlock[block.Hash] = block
+}
+
+func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) {
+ a.lock.RLock()
+ defer a.lock.RUnlock()
+ b, e := a.candidateBlock[hash]
+ return b, e
+}
+
func (a *agreementData) countVote(period uint64, voteType types.VoteType) (
blockHash common.Hash, ok bool) {
a.votesLock.RLock()