diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-26 11:59:24 +0800 |
---|---|---|
committer | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-27 15:25:10 +0800 |
commit | 7783bc4ba52bfc534d5b4d91e78abb2ddad7d078 (patch) | |
tree | f57ea6f4d8b2faa23ab95691717b071f507e621f /core/agreement.go | |
parent | b8ced165b1fb03394f8758e08148b0e5d06aa07b (diff) | |
download | dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.gz dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.bz2 dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.lz dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.xz dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.zst dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.zip |
core: bring back agreement result (#515)
* core: bring back agreement result
* add logger
* Fix
* fixup
Diffstat (limited to 'core/agreement.go')
-rw-r--r-- | core/agreement.go | 102 |
1 files changed, 81 insertions, 21 deletions
diff --git a/core/agreement.go b/core/agreement.go index 5e7b7de..d53440b 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -117,20 +117,21 @@ type agreementData struct { // agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. type agreement struct { - state agreementState - data *agreementData - aID *atomic.Value - doneChan chan struct{} - notarySet map[types.NodeID]struct{} - hasVoteFast bool - hasOutput bool - lock sync.RWMutex - pendingBlock []pendingBlock - pendingVote []pendingVote - candidateBlock map[common.Hash]*types.Block - fastForward chan uint64 - signer *utils.Signer - logger common.Logger + state agreementState + data *agreementData + aID *atomic.Value + doneChan chan struct{} + notarySet map[types.NodeID]struct{} + hasVoteFast bool + hasOutput bool + lock sync.RWMutex + pendingBlock []pendingBlock + pendingVote []pendingVote + pendingAgreementResult map[types.Position]*types.AgreementResult + candidateBlock map[common.Hash]*types.Block + fastForward chan uint64 + signer *utils.Signer + logger common.Logger } // newAgreement creates a agreement instance. @@ -146,11 +147,12 @@ func newAgreement( ID: ID, leader: leader, }, - aID: &atomic.Value{}, - candidateBlock: make(map[common.Hash]*types.Block), - fastForward: make(chan uint64, 1), - signer: signer, - logger: logger, + aID: &atomic.Value{}, + pendingAgreementResult: make(map[types.Position]*types.AgreementResult), + candidateBlock: make(map[common.Hash]*types.Block), + fastForward: make(chan uint64, 1), + signer: signer, + logger: logger, } agreement.stop() return agreement @@ -205,6 +207,22 @@ func (a *agreement) restart( return } + var result *types.AgreementResult + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingAgreementResult := make( + map[types.Position]*types.AgreementResult) + for pos, agr := range a.pendingAgreementResult { + if pos.Newer(aID) { + newPendingAgreementResult[pos] = agr + } else if pos == aID { + result = agr + } + } + a.pendingAgreementResult = newPendingAgreementResult + }() + expireTime := time.Now().Add(-10 * time.Second) replayBlock := make([]*types.Block, 0) func() { @@ -215,7 +233,11 @@ func (a *agreement) restart( if aID.Newer(pending.block.Position) { continue } else if pending.block.Position == aID { - replayBlock = append(replayBlock, pending.block) + if result == nil || + result.Position.Round < DKGDelayRound || + result.BlockHash == pending.block.Hash { + replayBlock = append(replayBlock, pending.block) + } } else if pending.receivedTime.After(expireTime) { newPendingBlock = append(newPendingBlock, pending) } @@ -232,7 +254,9 @@ func (a *agreement) restart( if aID.Newer(pending.vote.Position) { continue } else if pending.vote.Position == aID { - replayVote = append(replayVote, pending.vote) + if result == nil || result.Position.Round < DKGDelayRound { + replayVote = append(replayVote, pending.vote) + } } else if pending.receivedTime.After(expireTime) { newPendingVote = append(newPendingVote, pending) } @@ -247,6 +271,13 @@ func (a *agreement) restart( } } + if result != nil { + if err := a.processAgreementResult(result); err != nil { + a.logger.Error("Failed to process agreement result when retarting", + "result", result) + } + } + for _, vote := range replayVote { if err := a.processVote(vote); err != nil { a.logger.Error("Failed to process vote when restarting agreement", @@ -526,6 +557,8 @@ func (a *agreement) processFinalizedBlock(block *types.Block) { } a.addCandidateBlockNoLock(block) a.hasOutput = true + a.data.lock.Lock() + defer a.data.lock.Unlock() a.data.recv.ConfirmBlock(block.Hash, nil) if a.doneChan != nil { close(a.doneChan) @@ -533,6 +566,33 @@ func (a *agreement) processFinalizedBlock(block *types.Block) { } } +func (a *agreement) processAgreementResult(result *types.AgreementResult) error { + a.lock.Lock() + defer a.lock.Unlock() + aID := a.agreementID() + if result.Position.Older(aID) { + return nil + } else if result.Position.Newer(aID) { + a.pendingAgreementResult[result.Position] = result + return nil + } + if a.hasOutput { + return nil + } + a.data.lock.Lock() + defer a.data.lock.Unlock() + if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist { + a.data.recv.PullBlocks(common.Hashes{result.BlockHash}) + } + a.hasOutput = true + a.data.recv.ConfirmBlock(result.BlockHash, nil) + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } + return nil +} + func (a *agreement) done() <-chan struct{} { a.lock.Lock() defer a.lock.Unlock() |