From 338bf8676563a103cc78bbacef75fbaaac4293d7 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Wed, 12 Dec 2018 16:55:19 +0800 Subject: syncer: fix stuffs (#366) * return delivered blocks when processing finalized blocks * check deliver sequence when processing finalized blocks * skip delivery of finalized blocks * remove duplicated calls to BlockConfirmed * add numChains change in test scenario * fix the bug that restartNotary is triggered by older block than current aID. --- core/agreement-mgr.go | 24 +++++++--- core/consensus.go | 76 ++++++++++++++++++++------------ core/consensus_test.go | 11 ++--- core/lattice.go | 12 ++--- core/syncer/consensus.go | 89 ++++++++++++++++++++++++++++---------- core/total-ordering.go | 13 ++++-- integration_test/consensus_test.go | 8 +++- 7 files changed, 160 insertions(+), 73 deletions(-) diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index c95f913..4cb47b1 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -20,6 +20,7 @@ package core import ( "context" "errors" + "math" "sync" "time" @@ -174,7 +175,7 @@ func (mgr *agreementMgr) appendConfig( recv := &consensusBAReceiver{ consensus: mgr.con, chainID: i, - restartNotary: make(chan bool, 1), + restartNotary: make(chan types.Position, 1), } agrModule := newAgreement( mgr.con.ID, @@ -252,7 +253,9 @@ func (mgr *agreementMgr) processAgreementResult( int(mgr.gov.Configuration(result.Position.Round).NotarySetSize), types.NewNotarySetTarget(crs, result.Position.ChainID)) for key := range result.Votes { - agreement.processVote(&result.Votes[key]) + if err := agreement.processVote(&result.Votes[key]); err != nil { + return err + } } agreement.restart(nIDs, result.Position, crs) } @@ -388,7 +391,7 @@ Loop: // Run BA for this round. recv.round = currentRound recv.changeNotaryTime = roundEndTime - recv.restartNotary <- false + recv.restartNotary <- types.Position{ChainID: math.MaxUint32} if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, @@ -412,10 +415,17 @@ Loop: default: } select { - case newNotary := <-recv.restartNotary: - if newNotary { - // This round is finished. - break Loop + case restartPos := <-recv.restartNotary: + if !isStop(restartPos) { + if restartPos.Round > oldPos.Round { + // This round is finished. + break Loop + } + if restartPos.Older(&oldPos) { + // The restartNotary event is triggered by 'BlockConfirmed' + // of some older block. + break + } } var nextHeight uint64 for { diff --git a/core/consensus.go b/core/consensus.go index 11df5d4..3f4443f 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -71,7 +71,7 @@ type consensusBAReceiver struct { changeNotaryTime time.Time round uint64 isNotary bool - restartNotary chan bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { @@ -145,6 +145,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block = <-ch recv.consensus.logger.Info("Receive unknown block", "hash", hash.String()[:6], + "position", &block.Position, "chainID", recv.chainID) recv.agreementModule.addCandidateBlock(block) recv.agreementModule.lock.Lock() @@ -161,8 +162,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( parentHash := hash for { recv.consensus.logger.Warn("Parent block not confirmed", - "hash", parentHash, - "chainID", recv.chainID) + "parent-hash", parentHash.String()[:6], + "cur-position", &block.Position) ch := make(chan *types.Block) if !func() bool { recv.consensus.lock.Lock() @@ -188,7 +189,8 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.logger.Info("Receive parent block", - "hash", block.ParentHash.String()[:6], + "parent-hash", block.ParentHash.String()[:6], + "cur-position", &block.Position, "chainID", recv.chainID) recv.consensus.ccModule.registerBlock(block) if err := recv.consensus.processBlock(block); err != nil { @@ -238,12 +240,12 @@ CleanChannelLoop: break CleanChannelLoop } } + newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { recv.round++ - recv.restartNotary <- true - } else { - recv.restartNotary <- false + newPos.Round++ } + recv.restartNotary <- newPos } func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { @@ -365,15 +367,16 @@ type Consensus struct { network Network // Misc. - dMoment time.Time - nodeSetCache *utils.NodeSetCache - round uint64 - roundToNotify uint64 - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc - event *common.Event - logger common.Logger + dMoment time.Time + nodeSetCache *utils.NodeSetCache + round uint64 + roundToNotify uint64 + lock sync.RWMutex + ctx context.Context + ctxCancel context.CancelFunc + event *common.Event + logger common.Logger + nonFinalizedBlockDelivered bool } // NewConsensus construct an Consensus instance. @@ -515,7 +518,6 @@ func NewConsensusFromSyncer( } // Dump all BA-confirmed blocks to the consensus instance. for _, b := range blocks { - con.app.BlockConfirmed(*b) con.ccModule.registerBlock(b) if err := con.processBlock(b); err != nil { return nil, err @@ -523,7 +525,7 @@ func NewConsensusFromSyncer( } // Dump all randomness result to the consensus instance. for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r); err != nil { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { con.logger.Error("failed to process randomness result when syncing", "result", r) continue @@ -815,27 +817,30 @@ MessageLoop: // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", + "block", val, "error", err) } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", + "block", val, "error", err) } } case *types.Vote: if err := con.ProcessVote(val); err != nil { con.logger.Error("Failed to process vote", - "error", err, - "vote", val) + "vote", val, + "error", err) } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { con.logger.Error("Failed to process agreement result", + "result", val, "error", err) } case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val); err != nil { + if err := con.ProcessBlockRandomnessResult(val, true); err != nil { con.logger.Error("Failed to process block randomness result", "hash", val.BlockHash.String()[:6], "position", &val.Position, @@ -952,7 +957,7 @@ func (con *Consensus) ProcessAgreementResult( Position: rand.Position, Randomness: tsig.Signature, } - if err := con.ProcessBlockRandomnessResult(result); err != nil { + if err := con.ProcessBlockRandomnessResult(result, true); err != nil { con.logger.Error("Failed to process randomness result", "error", err) return @@ -963,7 +968,7 @@ func (con *Consensus) ProcessAgreementResult( // ProcessBlockRandomnessResult processes the randomness result. func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult) error { + rand *types.BlockRandomnessResult, needBroadcast bool) error { if rand.Position.Round == 0 { return nil } @@ -974,11 +979,13 @@ func (con *Consensus) ProcessBlockRandomnessResult( return err } } - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash.String()[:6], - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) - con.network.BroadcastRandomnessResult(rand) + if needBroadcast { + con.logger.Debug("Calling Network.BroadcastRandomnessResult", + "hash", rand.BlockHash.String()[:6], + "position", &rand.Position, + "randomness", hex.EncodeToString(rand.Randomness)) + con.network.BroadcastRandomnessResult(rand) + } return nil } @@ -1039,6 +1046,19 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { } // Pass delivered blocks to compaction chain. for _, b := range deliveredBlocks { + if b.IsFinalized() { + if con.nonFinalizedBlockDelivered { + panic(fmt.Errorf("attempting to skip finalized block: %s", b)) + } + con.logger.Info("skip delivery of finalized block", + "block", b, + "finalization-height", b.Finalization.Height) + continue + } else { + // Mark that some non-finalized block delivered. After this flag + // turned on, it's not allowed to deliver finalized blocks anymore. + con.nonFinalizedBlockDelivered = true + } if err = con.ccModule.processBlock(b); err != nil { return } diff --git a/core/consensus_test.go b/core/consensus_test.go index 5fb804b..c1cdca8 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -155,7 +155,7 @@ func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) { case *types.AgreementResult: err = con.ProcessAgreementResult(val) case *types.BlockRandomnessResult: - err = con.ProcessBlockRandomnessResult(val) + err = con.ProcessBlockRandomnessResult(val, true) case *typesDKG.PrivateShare: err = con.cfgModule.processPrivateShare(val) case *typesDKG.PartialSignature: @@ -267,11 +267,12 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // It's a helper function to emit one block // to all core.Consensus objects. broadcast := func(b *types.Block) { + h := common.NewRandomHash() + b.Finalization.Randomness = h[:] for _, obj := range objs { - h := common.NewRandomHash() - b.Finalization.Randomness = h[:] - obj.con.ccModule.registerBlock(b) - req.Nil(obj.con.processBlock(b)) + copied := b.Clone() + obj.con.ccModule.registerBlock(copied) + req.Nil(obj.con.processBlock(copied)) } } // Genesis blocks diff --git a/core/lattice.go b/core/lattice.go index e578e3f..6ea5f8b 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -304,21 +304,21 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { } // ProcessFinalizedBlock is used for syncing lattice data. -func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { +func (l *Lattice) ProcessFinalizedBlock(b *types.Block) ([]*types.Block, error) { l.lock.Lock() defer l.lock.Unlock() // Syncing state for core.latticeData module. if err := l.data.addFinalizedBlock(b); err != nil { - panic(err) + return nil, err } l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) // Syncing state for core.totalOrdering module. toDelivered, deliveredMode, err := l.toModule.processBlock(b) if err != nil { - panic(err) + return nil, err } if len(toDelivered) == 0 { - return + return nil, nil } hashes := make(common.Hashes, len(toDelivered)) for idx := range toDelivered { @@ -329,7 +329,7 @@ func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { } // Sync core.consensusTimestamp module. if err = l.ctModule.processBlocks(toDelivered); err != nil { - panic(err) + return nil, err } - return + return toDelivered, nil } diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 176673b..dca6112 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -39,6 +39,9 @@ var ( ErrGenesisBlockReached = fmt.Errorf("genesis block reached") // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. ErrInvalidBlockOrder = fmt.Errorf("invalid block order") + // ErrMismatchBlockHashSequence means the delivering sequence is not + // correct, compared to finalized blocks. + ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence") ) // Consensus is for syncing consensus module. @@ -52,14 +55,16 @@ type Consensus struct { network core.Network nodeSetCache *utils.NodeSetCache - lattice *core.Lattice - latticeLastRound uint64 - randomnessResults []*types.BlockRandomnessResult - blocks []types.ByPosition - agreements []*agreement - configs []*types.Config - roundBeginTimes []time.Time - agreementRoundCut uint64 + lattice *core.Lattice + validatedChains map[uint32]struct{} + finalizedBlockHashes common.Hashes + latticeLastRound uint64 + randomnessResults []*types.BlockRandomnessResult + blocks []types.ByPosition + agreements []*agreement + configs []*types.Config + roundBeginTimes []time.Time + agreementRoundCut uint64 // lock for accessing all fields. lock sync.RWMutex @@ -92,6 +97,7 @@ func NewConsensus( prv: prv, logger: logger, isSynced: false, + validatedChains: make(map[uint32]struct{}), configs: []*types.Config{gov.Configuration(0)}, roundBeginTimes: []time.Time{dMoment}, receiveChan: make(chan *types.Block, 1000), @@ -125,7 +131,27 @@ func (con *Consensus) initConsensusObj(initBlock *types.Block) { con.startCRSMonitor() } -func (con *Consensus) checkIfSynced(blocks []*types.Block) { +func (con *Consensus) checkIfValidated() bool { + con.lock.RLock() + defer con.lock.RUnlock() + var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + var validatedChainCount uint32 + // Make sure we validate some block in all chains. + for chainID := range con.validatedChains { + if chainID < numChains { + validatedChainCount++ + } + } + if validatedChainCount == numChains { + return true + } + con.logger.Info("not validated yet", "validated-chain", validatedChainCount) + return false +} + +func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { + con.lock.RLock() + defer con.lock.RUnlock() var ( numChains = con.configs[con.blocks[0][0].Position.Round].NumChains compactionTips = make([]*types.Block, numChains) @@ -142,7 +168,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) { } } if (b.Finalization.ParentHash == common.Hash{}) { - return + return false } b1, err := con.db.Get(b.Finalization.ParentHash) if err != nil { @@ -153,8 +179,6 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) { // Check if chain tips of compaction chain and current cached confirmed // blocks are overlapped on each chain, numChains is decided by the round // of last block we seen on compaction chain. - con.lock.RLock() - defer con.lock.RUnlock() for chainID, b := range compactionTips { if len(con.blocks[chainID]) > 0 { if !b.Position.Older(&con.blocks[chainID][0].Position) { @@ -163,13 +187,13 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) { } } if overlapCount == numChains { - con.isSynced = true - } else { - con.logger.Info("not overlap yet", - "overlap-count", overlapCount, - "num-chain", numChains, - "last-block", blocks[len(blocks)-1]) + return true } + con.logger.Info("not synced yet", + "overlap-count", overlapCount, + "num-chain", numChains, + "last-block", blocks[len(blocks)-1]) + return false } // ensureAgreementOverlapRound ensures the oldest blocks in each chain in @@ -333,6 +357,25 @@ func (con *Consensus) findLatticeSyncBlock( } } +func (con *Consensus) processFinalizedBlock(block *types.Block) error { + if con.lattice == nil { + return nil + } + con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) + delivered, err := con.lattice.ProcessFinalizedBlock(block) + if err != nil { + return err + } + for idx, b := range delivered { + if con.finalizedBlockHashes[idx] != b.Hash { + return ErrMismatchBlockHashSequence + } + con.validatedChains[b.Position.ChainID] = struct{}{} + } + con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):] + return nil +} + // SyncBlocks syncs blocks from compaction chain, latest is true if the caller // regards the blocks are the latest ones. Notice that latest can be true for // many times. @@ -365,8 +408,8 @@ func (con *Consensus) SyncBlocks( return nil, err } } - if con.lattice != nil { - con.lattice.ProcessFinalizedBlock(b) + if err := con.processFinalizedBlock(b); err != nil { + return nil, err } } if latest && con.lattice == nil { @@ -397,7 +440,9 @@ func (con *Consensus) SyncBlocks( b = &b1 } for _, b := range blocksToProcess { - con.lattice.ProcessFinalizedBlock(b) + if err := con.processFinalizedBlock(b); err != nil { + return nil, err + } } } } @@ -405,7 +450,7 @@ func (con *Consensus) SyncBlocks( // Check if compaction and agreements' blocks are overlapped. The // overlapping of compaction chain and BA's oldest blocks means the // syncing is done. - con.checkIfSynced(blocks) + con.isSynced = con.checkIfValidated() && con.checkIfSynced(blocks) } if con.isSynced { // Stop network and CRS routines, wait until they are all stoped. diff --git a/core/total-ordering.go b/core/total-ordering.go index 52f9270..3bf6946 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -535,11 +535,13 @@ type totalOrderingGlobalVector struct { cachedCandidateInfo *totalOrderingCandidateInfo } -func newTotalOrderingGlobalVector(numChains uint32) *totalOrderingGlobalVector { +func newTotalOrderingGlobalVector( + initRound uint64, numChains uint32) *totalOrderingGlobalVector { return &totalOrderingGlobalVector{ blocks: make([][]*types.Block, numChains), tips: make([]*types.Block, numChains), breakpoints: make([][]*totalOrderingBreakpoint, numChains), + curRound: initRound, } } @@ -792,14 +794,14 @@ type totalOrdering struct { } // newTotalOrdering constructs an totalOrdering instance. -func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering { +func newTotalOrdering( + dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering { config := &totalOrderingConfig{} config.fromConfig(round, cfg) config.setRoundBeginTime(dMoment) candidates := make([]*totalOrderingCandidateInfo, config.numChains) to := &totalOrdering{ pendings: make(map[common.Hash]*types.Block), - globalVector: newTotalOrderingGlobalVector(config.numChains), dirtyChainIDs: make([]int, 0, config.numChains), acked: make(map[common.Hash]map[common.Hash]struct{}), objCache: newTotalOrderingObjectCache(config.numChains), @@ -807,6 +809,8 @@ func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *total candidates: candidates, candidateChainIDs: make([]uint32, 0, config.numChains), curRound: config.roundID, + globalVector: newTotalOrderingGlobalVector( + config.roundID, config.numChains), } to.configs = []*totalOrderingConfig{config} return to @@ -898,7 +902,8 @@ func (to *totalOrdering) clean(b *types.Block) { } // updateVectors is a helper function to update all cached vectors. -func (to *totalOrdering) updateVectors(b *types.Block) (isOldest bool, err error) { +func (to *totalOrdering) updateVectors( + b *types.Block) (isOldest bool, err error) { var ( candidateHash common.Hash chainID uint32 diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 16cfa8f..6d693c8 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -322,6 +322,12 @@ func (s *ConsensusTestSuite) TestSync() { req.NoError(err) req.NoError(seedGov.State().RequestChange( test.StateChangeRoundInterval, 50*time.Second)) + req.NoError(seedGov.State().RequestChange( + test.StateChangeNumChains, uint32(5))) + seedGov.CatchUpWithRound(0) + req.NoError(seedGov.State().RequestChange( + test.StateChangeNumChains, uint32(4))) + seedGov.CatchUpWithRound(1) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) // Choose the first node as "syncNode" that its consensus' Run() is called @@ -444,7 +450,7 @@ ReachAlive: // Stop a node, we should still be able to proceed. stoppedNode.con.Stop() stoppedNode.con = nil - fmt.Println("one node stopped") + fmt.Println("one node stopped", stoppedNode.ID) // Initiate a dummy routine to consume the receive channel. go func() { for { -- cgit v1.2.3