diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-12-12 16:55:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-12 16:55:19 +0800 |
commit | 338bf8676563a103cc78bbacef75fbaaac4293d7 (patch) | |
tree | 33587f90c7d7b8d61c99bebeb4ffee9c0b69668f /core/consensus.go | |
parent | d60fedadb35d56ed873bad301cf3e5fd9a96410d (diff) | |
download | tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.gz tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.bz2 tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.lz tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.xz tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.tar.zst tangerine-consensus-338bf8676563a103cc78bbacef75fbaaac4293d7.zip |
syncer: fix stuffs (#366)
* return delivered blocks when processing finalized blocks
* check deliver sequence when processing finalized blocks
* skip delivery of finalized blocks
* remove duplicated calls to BlockConfirmed
* add numChains change in test scenario
* fix the bug that restartNotary is triggered by older block
than current aID.
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 76 |
1 files changed, 48 insertions, 28 deletions
diff --git a/core/consensus.go b/core/consensus.go index 11df5d4..3f4443f 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -71,7 +71,7 @@ type consensusBAReceiver struct { changeNotaryTime time.Time round uint64 isNotary bool - restartNotary chan bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { @@ -145,6 +145,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block = <-ch recv.consensus.logger.Info("Receive unknown block", "hash", hash.String()[:6], + "position", &block.Position, "chainID", recv.chainID) recv.agreementModule.addCandidateBlock(block) recv.agreementModule.lock.Lock() @@ -161,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( parentHash := hash for { recv.consensus.logger.Warn("Parent block not confirmed", - "hash", parentHash, - "chainID", recv.chainID) + "parent-hash", parentHash.String()[:6], + "cur-position", &block.Position) ch := make(chan *types.Block) if !func() bool { recv.consensus.lock.Lock() @@ -188,7 +189,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.logger.Info("Receive parent block", - "hash", block.ParentHash.String()[:6], + "parent-hash", block.ParentHash.String()[:6], + "cur-position", &block.Position, "chainID", recv.chainID) recv.consensus.ccModule.registerBlock(block) if err := recv.consensus.processBlock(block); err != nil { @@ -238,12 +240,12 @@ CleanChannelLoop: break CleanChannelLoop } } + newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { recv.round++ - recv.restartNotary <- true - } else { - recv.restartNotary <- false + newPos.Round++ } + recv.restartNotary <- newPos } func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { @@ -365,15 +367,16 @@ type Consensus struct { network Network // Misc. - dMoment time.Time - nodeSetCache *utils.NodeSetCache - round uint64 - roundToNotify uint64 - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc - event *common.Event - logger common.Logger + dMoment time.Time + nodeSetCache *utils.NodeSetCache + round uint64 + roundToNotify uint64 + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc + event *common.Event + logger common.Logger + nonFinalizedBlockDelivered bool } // NewConsensus construct an Consensus instance. @@ -515,7 +518,6 @@ func NewConsensusFromSyncer( } // Dump all BA-confirmed blocks to the consensus instance. for _, b := range blocks { - con.app.BlockConfirmed(*b) con.ccModule.registerBlock(b) if err := con.processBlock(b); err != nil { return nil, err @@ -523,7 +525,7 @@ func NewConsensusFromSyncer( } // Dump all randomness result to the consensus instance. for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r); err != nil { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { con.logger.Error("failed to process randomness result when syncing", "result", r) continue @@ -815,27 +817,30 @@ MessageLoop: // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", + "block", val, "error", err) } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", + "block", val, "error", err) } } case *types.Vote: if err := con.ProcessVote(val); err != nil { con.logger.Error("Failed to process vote", - "error", err, - "vote", val) + "vote", val, + "error", err) } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { con.logger.Error("Failed to process agreement result", + "result", val, "error", err) } case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val); err != nil { + if err := con.ProcessBlockRandomnessResult(val, true); err != nil { con.logger.Error("Failed to process block randomness result", "hash", val.BlockHash.String()[:6], "position", &val.Position, @@ -952,7 +957,7 @@ func (con *Consensus) ProcessAgreementResult( Position: rand.Position, Randomness: tsig.Signature, } - if err := con.ProcessBlockRandomnessResult(result); err != nil { + if err := con.ProcessBlockRandomnessResult(result, true); err != nil { con.logger.Error("Failed to process randomness result", "error", err) return @@ -963,7 +968,7 @@ func (con *Consensus) ProcessAgreementResult( // ProcessBlockRandomnessResult processes the randomness result. func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult) error { + rand *types.BlockRandomnessResult, needBroadcast bool) error { if rand.Position.Round == 0 { return nil } @@ -974,11 +979,13 @@ func (con *Consensus) ProcessBlockRandomnessResult( return err } } - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash.String()[:6], - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) - con.network.BroadcastRandomnessResult(rand) + if needBroadcast { + con.logger.Debug("Calling Network.BroadcastRandomnessResult", + "hash", rand.BlockHash.String()[:6], + "position", &rand.Position, + "randomness", hex.EncodeToString(rand.Randomness)) + con.network.BroadcastRandomnessResult(rand) + } return nil } @@ -1039,6 +1046,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } // Pass delivered blocks to compaction chain. for _, b := range deliveredBlocks { + if b.IsFinalized() { + if con.nonFinalizedBlockDelivered { + panic(fmt.Errorf("attempting to skip finalized block: %s", b)) + } + con.logger.Info("skip delivery of finalized block", + "block", b, + "finalization-height", b.Finalization.Height) + continue + } else { + // Mark that some non-finalized block delivered. After this flag + // turned on, it's not allowed to deliver finalized blocks anymore. + con.nonFinalizedBlockDelivered = true + } if err = con.ccModule.processBlock(b); err != nil { return } |