aboutsummaryrefslogtreecommitdiffstats
path: root/core/agreement.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-26 11:59:24 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-03-27 15:25:10 +0800
commit7783bc4ba52bfc534d5b4d91e78abb2ddad7d078 (patch)
treef57ea6f4d8b2faa23ab95691717b071f507e621f /core/agreement.go
parentb8ced165b1fb03394f8758e08148b0e5d06aa07b (diff)
downloaddexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.gz
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.bz2
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.lz
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.xz
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.tar.zst
dexon-consensus-7783bc4ba52bfc534d5b4d91e78abb2ddad7d078.zip
core: bring back agreement result (#515)
* core: bring back agreement result * add logger * Fix * fixup
Diffstat (limited to 'core/agreement.go')
-rw-r--r--core/agreement.go102
1 files changed, 81 insertions, 21 deletions
diff --git a/core/agreement.go b/core/agreement.go
index 5e7b7de..d53440b 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -117,20 +117,21 @@ type agreementData struct {
// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
- state agreementState
- data *agreementData
- aID *atomic.Value
- doneChan chan struct{}
- notarySet map[types.NodeID]struct{}
- hasVoteFast bool
- hasOutput bool
- lock sync.RWMutex
- pendingBlock []pendingBlock
- pendingVote []pendingVote
- candidateBlock map[common.Hash]*types.Block
- fastForward chan uint64
- signer *utils.Signer
- logger common.Logger
+ state agreementState
+ data *agreementData
+ aID *atomic.Value
+ doneChan chan struct{}
+ notarySet map[types.NodeID]struct{}
+ hasVoteFast bool
+ hasOutput bool
+ lock sync.RWMutex
+ pendingBlock []pendingBlock
+ pendingVote []pendingVote
+ pendingAgreementResult map[types.Position]*types.AgreementResult
+ candidateBlock map[common.Hash]*types.Block
+ fastForward chan uint64
+ signer *utils.Signer
+ logger common.Logger
}
// newAgreement creates a agreement instance.
@@ -146,11 +147,12 @@ func newAgreement(
ID: ID,
leader: leader,
},
- aID: &atomic.Value{},
- candidateBlock: make(map[common.Hash]*types.Block),
- fastForward: make(chan uint64, 1),
- signer: signer,
- logger: logger,
+ aID: &atomic.Value{},
+ pendingAgreementResult: make(map[types.Position]*types.AgreementResult),
+ candidateBlock: make(map[common.Hash]*types.Block),
+ fastForward: make(chan uint64, 1),
+ signer: signer,
+ logger: logger,
}
agreement.stop()
return agreement
@@ -205,6 +207,22 @@ func (a *agreement) restart(
return
}
+ var result *types.AgreementResult
+ func() {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ newPendingAgreementResult := make(
+ map[types.Position]*types.AgreementResult)
+ for pos, agr := range a.pendingAgreementResult {
+ if pos.Newer(aID) {
+ newPendingAgreementResult[pos] = agr
+ } else if pos == aID {
+ result = agr
+ }
+ }
+ a.pendingAgreementResult = newPendingAgreementResult
+ }()
+
expireTime := time.Now().Add(-10 * time.Second)
replayBlock := make([]*types.Block, 0)
func() {
@@ -215,7 +233,11 @@ func (a *agreement) restart(
if aID.Newer(pending.block.Position) {
continue
} else if pending.block.Position == aID {
- replayBlock = append(replayBlock, pending.block)
+ if result == nil ||
+ result.Position.Round < DKGDelayRound ||
+ result.BlockHash == pending.block.Hash {
+ replayBlock = append(replayBlock, pending.block)
+ }
} else if pending.receivedTime.After(expireTime) {
newPendingBlock = append(newPendingBlock, pending)
}
@@ -232,7 +254,9 @@ func (a *agreement) restart(
if aID.Newer(pending.vote.Position) {
continue
} else if pending.vote.Position == aID {
- replayVote = append(replayVote, pending.vote)
+ if result == nil || result.Position.Round < DKGDelayRound {
+ replayVote = append(replayVote, pending.vote)
+ }
} else if pending.receivedTime.After(expireTime) {
newPendingVote = append(newPendingVote, pending)
}
@@ -247,6 +271,13 @@ func (a *agreement) restart(
}
}
+ if result != nil {
+ if err := a.processAgreementResult(result); err != nil {
+ a.logger.Error("Failed to process agreement result when retarting",
+ "result", result)
+ }
+ }
+
for _, vote := range replayVote {
if err := a.processVote(vote); err != nil {
a.logger.Error("Failed to process vote when restarting agreement",
@@ -526,6 +557,8 @@ func (a *agreement) processFinalizedBlock(block *types.Block) {
}
a.addCandidateBlockNoLock(block)
a.hasOutput = true
+ a.data.lock.Lock()
+ defer a.data.lock.Unlock()
a.data.recv.ConfirmBlock(block.Hash, nil)
if a.doneChan != nil {
close(a.doneChan)
@@ -533,6 +566,33 @@ func (a *agreement) processFinalizedBlock(block *types.Block) {
}
}
+func (a *agreement) processAgreementResult(result *types.AgreementResult) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+ aID := a.agreementID()
+ if result.Position.Older(aID) {
+ return nil
+ } else if result.Position.Newer(aID) {
+ a.pendingAgreementResult[result.Position] = result
+ return nil
+ }
+ if a.hasOutput {
+ return nil
+ }
+ a.data.lock.Lock()
+ defer a.data.lock.Unlock()
+ if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist {
+ a.data.recv.PullBlocks(common.Hashes{result.BlockHash})
+ }
+ a.hasOutput = true
+ a.data.recv.ConfirmBlock(result.BlockHash, nil)
+ if a.doneChan != nil {
+ close(a.doneChan)
+ a.doneChan = nil
+ }
+ return nil
+}
+
func (a *agreement) done() <-chan struct{} {
a.lock.Lock()
defer a.lock.Unlock()