diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-01-11 12:58:30 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-11 12:58:30 +0800 |
commit | 809e8def862fdfa792061a448f952747f1af4d3c (patch) | |
tree | bd038971e65a09bc9bb399f03a37b194ce67ae3c /core/syncer | |
parent | fa25817354d5b7d40f5911004232392acfe7fe53 (diff) | |
download | dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.gz dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.bz2 dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.lz dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.xz dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.tar.zst dexon-consensus-809e8def862fdfa792061a448f952747f1af4d3c.zip |
syncer: fix issues when switching to core.Consensus (#418)
- when confirmed blocks passed to core.Consensus
aren't continuous in position in some chain, the
pulling would skip those missing blocks.
- fix: when some block is missing, avoid adding it
and all blocks after it to core.Consensus.
- we need to avoid the receive channel of network
module full.
- fix: during switching to core.Consensus, we
need to launch a dummy receiver to receive
from receive channel of network module.
- fix: between the period during core.Consensus
created and before running, a dummy receiver is
also required to receive from receive channel of
network module.
Diffstat (limited to 'core/syncer')
-rw-r--r-- | core/syncer/agreement.go | 2 | ||||
-rw-r--r-- | core/syncer/consensus.go | 87 |
2 files changed, 54 insertions, 35 deletions
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index eaad860..acc4f1c 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -1,4 +1,4 @@ -// Copyright 2018 The dexon-consensus-core Authors +// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus-core library. // // The dexon-consensus-core library is free software: you can redistribute it diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 306017f..75c1067 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -39,7 +39,8 @@ var ( ErrNotSynced = fmt.Errorf("not synced yet") // ErrGenesisBlockReached is reported when genesis block reached. ErrGenesisBlockReached = fmt.Errorf("genesis block reached") - // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered + // blocks. ErrInvalidBlockOrder = fmt.Errorf("invalid block order") // ErrMismatchBlockHashSequence means the delivering sequence is not // correct, compared to finalized blocks. @@ -83,6 +84,9 @@ type Consensus struct { ctxCancel context.CancelFunc syncedLastBlock *types.Block syncedConsensus *core.Consensus + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []interface{} } // NewConsensus creates an instance for Consensus (syncer consensus). @@ -286,12 +290,17 @@ func (con *Consensus) findLatticeSyncBlock( blocks []*types.Block) (*types.Block, error) { lastBlock := blocks[len(blocks)-1] round := lastBlock.Position.Round + isConfigChanged := func(prev, cur *types.Config) bool { + return prev.K != cur.K || + prev.NumChains != cur.NumChains || + prev.PhiRatio != cur.PhiRatio + } for { // Find round r which r-1, r, r+1 are all in same total ordering config. for { - sameAsPrevRound := round == 0 || !con.isConfigChanged( + sameAsPrevRound := round == 0 || !isConfigChanged( con.configs[round-1], con.configs[round]) - sameAsNextRound := !con.isConfigChanged( + sameAsNextRound := !isConfigChanged( con.configs[round], con.configs[round+1]) if sameAsPrevRound && sameAsNextRound { break @@ -314,8 +323,9 @@ func (con *Consensus) findLatticeSyncBlock( lastBlock = &b } // Find the deliver set by hash for two times. Blocks in a deliver set - // returned by total ordering is sorted by hash. If a block's parent hash - // is greater than its hash means there is a cut between deliver sets. + // returned by total ordering is sorted by hash. If a block's parent + // hash is greater than its hash means there is a cut between deliver + // sets. var curBlock, prevBlock *types.Block var deliverSetFirstBlock, deliverSetLastBlock *types.Block curBlock = lastBlock @@ -464,8 +474,8 @@ func (con *Consensus) SyncBlocks( } } if latest && con.lattice == nil { - // New Lattice and find the deliver set of total ordering when "latest" is - // true for first time. Deliver set is found by block hashes. + // New Lattice and find the deliver set of total ordering when "latest" + // is true for first time. Deliver set is found by block hashes. var syncBlock *types.Block syncBlock, err = con.findLatticeSyncBlock(blocks) if err != nil { @@ -482,7 +492,8 @@ func (con *Consensus) SyncBlocks( con.setupConfigs(blocks) // Process blocks from syncBlock to blocks' last block. b := blocks[len(blocks)-1] - blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 + blocksCount := + b.Finalization.Height - syncBlock.Finalization.Height + 1 blocksToProcess := make([]*types.Block, blocksCount) for { blocksToProcess[blocksCount-1] = b @@ -512,8 +523,13 @@ func (con *Consensus) SyncBlocks( if err = con.Stop(); err != nil { return } + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) con.syncedLastBlock = blocks[len(blocks)-1] - synced = con.syncedLastBlock != nil + synced = true } } return @@ -521,6 +537,8 @@ func (con *Consensus) SyncBlocks( // GetSyncedConsensus returns the core.Consensus instance after synced. func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + con.lock.Lock() + defer con.lock.Unlock() if con.syncedConsensus != nil { return con.syncedConsensus, nil } @@ -529,18 +547,16 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { } // flush all blocks in con.blocks into core.Consensus, and build // core.Consensus from syncer. - confirmedBlocks := []*types.Block{} + confirmedBlocks := make([][]*types.Block, len(con.blocks)) + for i, bs := range con.blocks { + confirmedBlocks[i] = []*types.Block(bs) + } randomnessResults := []*types.BlockRandomnessResult{} - func() { - con.lock.Lock() - defer con.lock.Unlock() - for _, bs := range con.blocks { - confirmedBlocks = append(confirmedBlocks, bs...) - } - for _, r := range con.randomnessResults { - randomnessResults = append(randomnessResults, r) - } - }() + for _, r := range con.randomnessResults { + randomnessResults = append(randomnessResults, r) + } + con.dummyCancel() + <-con.dummyFinished var err error con.syncedConsensus, err = core.NewConsensusFromSyncer( con.syncedLastBlock, @@ -553,6 +569,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.lattice, confirmedBlocks, randomnessResults, + con.dummyMsgBuffer, con.logger) return con.syncedConsensus, err } @@ -560,7 +577,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { // Stop the syncer. // // This method is mainly for caller to stop the syncer before synced, the syncer -// would call this method automatically after synced. +// would call this method automatically after being synced. func (con *Consensus) Stop() error { con.logger.Trace("syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. @@ -654,7 +671,8 @@ func (con *Consensus) resizeByNumChains(numChains uint32) { // Resize the pool of blocks. con.blocks = append(con.blocks, types.ByPosition{}) // Resize agreement modules. - a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) + a := newAgreement( + con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) con.agreements = append(con.agreements, a) con.agreementWaitGroup.Add(1) go func() { @@ -679,8 +697,8 @@ func (con *Consensus) startAgreement() { func() { con.lock.Lock() defer con.lock.Unlock() - // If round is cut in agreements, do not add blocks with round less - // then cut round. + // If round is cut in agreements, do not add blocks with + // round less then cut round. if b.Position.Round < con.agreementRoundCut { return } @@ -764,15 +782,22 @@ func (con *Consensus) startNetwork() { default: continue Loop } - func() { + if func() bool { con.lock.RLock() defer con.lock.RUnlock() if pos.ChainID >= uint32(len(con.agreements)) { - con.logger.Error("Unknown chainID message received (syncer)", + // This error might be easily encountered when the + // "latest" parameter of SyncBlocks is turned on too + // early. + con.logger.Error( + "Unknown chainID message received (syncer)", "position", &pos) + return false } - }() - con.agreements[pos.ChainID].inputChan <- val + return true + }() { + con.agreements[pos.ChainID].inputChan <- val + } case <-con.ctx.Done(): return } @@ -849,9 +874,3 @@ func (con *Consensus) stopAgreement() { close(con.receiveChan) close(con.pullChan) } - -func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool { - return prev.K != cur.K || - prev.NumChains != cur.NumChains || - prev.PhiRatio != cur.PhiRatio -} |