diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-09-04 17:39:05 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-04 17:39:05 +0800 |
commit | 04a63a22a24abaaa91b1d981e6d95260d80dadf4 (patch) | |
tree | d8e7335984a1b53f097f00f4e4956112d22aa673 /core/agreement.go | |
parent | 09393166791785ab6730b1c812b4a4fd07a92293 (diff) | |
download | dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.gz dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.bz2 dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.lz dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.xz dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.tar.zst dexon-consensus-04a63a22a24abaaa91b1d981e6d95260d80dadf4.zip |
core: BA-based consensus core. (#93)
Diffstat (limited to 'core/agreement.go')
-rw-r--r-- | core/agreement.go | 169 |
1 files changed, 138 insertions, 31 deletions
diff --git a/core/agreement.go b/core/agreement.go index 299971e..8fb2207 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/dexon-foundation/dexon-consensus-core/common" "github.com/dexon-foundation/dexon-consensus-core/core/types" @@ -62,6 +63,16 @@ type agreementReceiver interface { confirmBlock(common.Hash) } +type pendingBlock struct { + block *types.Block + receivedTime time.Time +} + +type pendingVote struct { + vote *types.Vote + receivedTime time.Time +} + // agreementData is the data for agreementState. type agreementData struct { recv agreementReceiver @@ -74,17 +85,22 @@ type agreementData struct { votes map[uint64][]map[types.ValidatorID]*types.Vote votesLock sync.RWMutex blocks map[types.ValidatorID]*types.Block + blocksLock sync.Mutex blockProposer blockProposerFn } // agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. type agreement struct { - state agreementState - data *agreementData - aID *atomic.Value - validators map[types.ValidatorID]struct{} - sigToPub SigToPubFn - hasOutput bool + state agreementState + data *agreementData + aID *atomic.Value + validators map[types.ValidatorID]struct{} + sigToPub SigToPubFn + hasOutput bool + lock sync.RWMutex + pendingBlock []pendingBlock + pendingVote []pendingVote + candidateBlock map[common.Hash]*types.Block } // newAgreement creates a agreement instance. @@ -102,10 +118,11 @@ func newAgreement( leader: leader, blockProposer: blockProposer, }, - aID: &atomic.Value{}, - sigToPub: sigToPub, + aID: &atomic.Value{}, + sigToPub: sigToPub, + candidateBlock: make(map[common.Hash]*types.Block), } - agreement.restart(validators) + agreement.restart(validators, types.Position{}) return agreement } @@ -117,19 +134,67 @@ func (a *agreement) terminate() { } // restart the agreement -func (a *agreement) restart(validators types.ValidatorIDs) { - a.data.votesLock.Lock() - defer a.data.votesLock.Unlock() - a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote) - a.data.votes[1] = newVoteListMap() - a.data.period = 1 - a.data.blocks = make(map[types.ValidatorID]*types.Block) - a.data.requiredVote = len(validators)/3*2 + 1 - a.hasOutput = false - a.state = newPrepareState(a.data) - a.validators = make(map[types.ValidatorID]struct{}) - for _, v := range validators { - a.validators[v] = struct{}{} +func (a *agreement) restart(validators types.ValidatorIDs, aID types.Position) { + func() { + a.lock.Lock() + defer a.lock.Unlock() + a.data.votesLock.Lock() + defer a.data.votesLock.Unlock() + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() + a.data.votes = make(map[uint64][]map[types.ValidatorID]*types.Vote) + a.data.votes[1] = newVoteListMap() + a.data.period = 1 + a.data.blocks = make(map[types.ValidatorID]*types.Block) + a.data.requiredVote = len(validators)/3*2 + 1 + a.data.leader.restart() + a.hasOutput = false + a.state = newPrepareState(a.data) + a.validators = make(map[types.ValidatorID]struct{}) + for _, v := range validators { + a.validators[v] = struct{}{} + } + a.candidateBlock = make(map[common.Hash]*types.Block) + a.aID.Store(aID) + }() + + expireTime := time.Now().Add(-10 * time.Second) + replayBlock := make([]*types.Block, 0) + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingBlock := make([]pendingBlock, 0) + for _, pending := range a.pendingBlock { + if pending.block.Position == aID { + replayBlock = append(replayBlock, pending.block) + } else if pending.receivedTime.After(expireTime) { + newPendingBlock = append(newPendingBlock, pending) + } + } + a.pendingBlock = newPendingBlock + }() + + replayVote := make([]*types.Vote, 0) + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingVote := make([]pendingVote, 0) + for _, pending := range a.pendingVote { + if pending.vote.Position == aID { + replayVote = append(replayVote, pending.vote) + } else if pending.receivedTime.After(expireTime) { + newPendingVote = append(newPendingVote, pending) + } + } + a.pendingVote = newPendingVote + }() + + for _, block := range replayBlock { + a.processBlock(block) + } + + for _, vote := range replayVote { + a.processVote(vote) } } @@ -143,11 +208,6 @@ func (a *agreement) agreementID() types.Position { return a.aID.Load().(types.Position) } -// setAgreementID sets the current agreementID. -func (a *agreement) setAgreementID(ID types.Position) { - a.aID.Store(ID) -} - // nextState is called at the spcifi clock time. func (a *agreement) nextState() (err error) { a.state, err = a.state.nextState() @@ -155,7 +215,12 @@ func (a *agreement) nextState() (err error) { } func (a *agreement) sanityCheck(vote *types.Vote) error { - if _, exist := a.validators[vote.ProposerID]; !exist { + if exist := func() bool { + a.lock.RLock() + defer a.lock.RUnlock() + _, exist := a.validators[vote.ProposerID] + return exist + }(); !exist { return ErrNotValidator } ok, err := verifyVoteSignature(vote, a.sigToPub) @@ -165,7 +230,12 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { if !ok { return ErrIncorrectVoteSignature } - if _, exist := a.data.votes[vote.Period]; exist { + if exist := func() bool { + a.data.votesLock.RLock() + defer a.data.votesLock.RUnlock() + _, exist := a.data.votes[vote.Period] + return exist + }(); exist { if oldVote, exist := a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist { if vote.BlockHash != oldVote.BlockHash { @@ -179,6 +249,8 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { // prepareVote prepares a vote. func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) ( err error) { + vote.ProposerID = a.data.ID + vote.Position = a.agreementID() hash := hashVote(vote) vote.Signature, err = prv.Sign(hash) return @@ -186,10 +258,16 @@ func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) ( // processVote is the entry point for processing Vote. func (a *agreement) processVote(vote *types.Vote) error { - vote = vote.Clone() if err := a.sanityCheck(vote); err != nil { return err } + if vote.Position != a.agreementID() { + a.pendingVote = append(a.pendingVote, pendingVote{ + vote: vote, + receivedTime: time.Now().UTC(), + }) + return nil + } if func() bool { a.data.votesLock.Lock() defer a.data.votesLock.Unlock() @@ -211,21 +289,50 @@ func (a *agreement) processVote(vote *types.Vote) error { return nil } +// prepareBlok prepares a block. +func (a *agreement) prepareBlock( + block *types.Block, prv crypto.PrivateKey) error { + return a.data.leader.prepareBlock(block, prv) +} + // processBlock is the entry point for processing Block. func (a *agreement) processBlock(block *types.Block) error { + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() if b, exist := a.data.blocks[block.ProposerID]; exist { if b.Hash != block.Hash { return &ErrFork{block.ProposerID, b.Hash, block.Hash} } return nil } - a.data.blocks[block.ProposerID] = block + if block.Position != a.agreementID() { + a.pendingBlock = append(a.pendingBlock, pendingBlock{ + block: block, + receivedTime: time.Now().UTC(), + }) + return nil + } if err := a.data.leader.processBlock(block); err != nil { return err } + a.data.blocks[block.ProposerID] = block + a.addCandidateBlock(block) return nil } +func (a *agreement) addCandidateBlock(block *types.Block) { + a.lock.Lock() + defer a.lock.Unlock() + a.candidateBlock[block.Hash] = block +} + +func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) { + a.lock.RLock() + defer a.lock.RUnlock() + b, e := a.candidateBlock[hash] + return b, e +} + func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( blockHash common.Hash, ok bool) { a.votesLock.RLock() |