From dce509a13ef5873b9cae3c1cabdb97e219b6fb7d Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Mon, 24 Dec 2018 10:29:31 +0800 Subject: core: fix issues found when testing syncing. (#379) * Avoid panic when stopping multiple times. * Fix syncer panic when round switching * Add getCurrentConfig to total-ordering, and panic with more info * Avoid infinite loop. --- core/consensus.go | 42 ++++++++++++++++++++++++++++++------------ core/syncer/consensus.go | 24 +++++++++++++----------- core/total-ordering.go | 20 +++++++++++++++----- core/utils.go | 20 ++++++++++++++++++++ 4 files changed, 78 insertions(+), 28 deletions(-) (limited to 'core') diff --git a/core/consensus.go b/core/consensus.go index 44ce43f..0d4a38a 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -601,7 +601,11 @@ func (con *Consensus) prepare(initBlock *types.Block) error { // Setup lattice module. initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger) if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil { - return err + if err == ErrRoundNotIncreasing { + err = nil + } else { + return err + } } // Register events. dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) @@ -732,23 +736,37 @@ func (con *Consensus) initialRound( }() }) } + // checkCRS is a generator of checker to check if CRS for that round is + // ready or not. + checkCRS := func(round uint64) func() bool { + return func() bool { + nextCRS := con.gov.CRS(round) + if (nextCRS != common.Hash{}) { + return true + } + con.logger.Info("CRS is not ready yet. Try again later...", + "nodeID", con.ID, + "round", round) + return false + } + } // Initiate BA modules. con.event.RegisterTime( startTime.Add(config.RoundInterval/2+config.LambdaDKG), func(time.Time) { go func(nextRound uint64) { - for (con.gov.CRS(nextRound) == common.Hash{}) { - con.logger.Info("CRS is not ready yet. Try again later...", - "nodeID", con.ID, + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Info("unable to prepare CRS for baMgr", "round", nextRound) - time.Sleep(500 * time.Millisecond) + return } // Notify BA for new round. nextConfig := utils.GetConfigWithPanic( con.gov, nextRound, con.logger) - con.logger.Debug("Calling Governance.CRS", - "round", nextRound) - nextCRS := utils.GetCRSWithPanic(con.gov, nextRound, con.logger) + nextCRS := utils.GetCRSWithPanic( + con.gov, nextRound, con.logger) + con.logger.Info("appendConfig for baMgr", "round", nextRound) if err := con.baMgr.appendConfig( nextRound, nextConfig, nextCRS); err != nil { panic(err) @@ -761,11 +779,11 @@ func (con *Consensus) initialRound( go func(nextRound uint64) { // Normally, gov.CRS would return non-nil. Use this for in case of // unexpected network fluctuation and ensure the robustness. - for (con.gov.CRS(nextRound) == common.Hash{}) { - con.logger.Info("CRS is not ready yet. Try again later...", - "nodeID", con.ID, + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Info("unable to prepare CRS for DKG set", "round", nextRound) - time.Sleep(500 * time.Millisecond) + return } nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) if err != nil { diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 43a9e21..32bbab3 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -446,9 +446,10 @@ func (con *Consensus) SyncBlocks( return false, err } if syncBlock != nil { - con.logger.Info("deliver set found", syncBlock) + con.logger.Info("deliver set found", "block", syncBlock) // New lattice with the round of syncBlock. con.initConsensusObj(syncBlock) + con.setupConfigs(blocks) // Process blocks from syncBlock to blocks' last block. b := blocks[len(blocks)-1] blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 @@ -586,7 +587,7 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) { con.resizeByNumChains(curMaxNumChains) // Notify core.Lattice for new configs. if con.lattice != nil { - for con.latticeLastRound+1 <= maxRound { + for con.latticeLastRound+1 <= untilRound { con.latticeLastRound++ if err := con.lattice.AppendConfig( con.latticeLastRound, @@ -714,14 +715,12 @@ func (con *Consensus) startCRSMonitor() { select { case <-con.ctx.Done(): return - case <-time.After(1 * time.Second): - // Notify agreement modules for the latest round that CRS is - // available if the round is not notified yet. - var crsRound = lastNotifiedRound - for (con.gov.CRS(crsRound+1) != common.Hash{}) { - crsRound++ - } - notifyNewCRS(crsRound) + case <-time.After(500 * time.Millisecond): + } + // Notify agreement modules for the latest round that CRS is + // available if the round is not notified yet. + if (con.gov.CRS(lastNotifiedRound+1) != common.Hash{}) { + notifyNewCRS(lastNotifiedRound + 1) } } }() @@ -732,7 +731,10 @@ func (con *Consensus) stopAgreement() { con.lock.Lock() defer con.lock.Unlock() for _, a := range con.agreements { - close(a.inputChan) + if a.inputChan != nil { + close(a.inputChan) + a.inputChan = nil + } } }() con.agreementWaitGroup.Wait() diff --git a/core/total-ordering.go b/core/total-ordering.go index 3bf6946..744471a 100644 --- a/core/total-ordering.go +++ b/core/total-ordering.go @@ -19,6 +19,7 @@ package core import ( "errors" + "fmt" "math" "sort" "sync" @@ -1017,7 +1018,7 @@ func (to *totalOrdering) generateDeliverSet() ( chainID, otherChainID uint32 info, otherInfo *totalOrderingCandidateInfo precedings = make(map[uint32]struct{}) - cfg = to.configs[to.curRound-to.configs[0].roundID] + cfg = to.getCurrentConfig() ) mode = TotalOrderingModeNormal to.globalVector.updateCandidateInfo(to.dirtyChainIDs, to.objCache) @@ -1162,7 +1163,7 @@ func (to *totalOrdering) flushBlocks( b *types.Block) (flushed []*types.Block, mode uint32, err error) { mode = TotalOrderingModeFlush - cfg := to.configs[to.curRound-to.configs[0].roundID] + cfg := to.getCurrentConfig() if cfg.isLastBlock(b) { to.flushReadyChains[b.Position.ChainID] = struct{}{} } @@ -1216,7 +1217,7 @@ func (to *totalOrdering) flushBlocks( to.globalVector.cachedCandidateInfo = nil to.switchRound() // Force picking new candidates. - numChains := to.configs[to.curRound-to.configs[0].roundID].numChains + numChains := to.getCurrentConfig().numChains to.output(map[common.Hash]struct{}{}, numChains) return } @@ -1226,7 +1227,7 @@ func (to *totalOrdering) deliverBlocks() ( delivered []*types.Block, mode uint32, err error) { hashes, mode := to.generateDeliverSet() - cfg := to.configs[to.curRound-to.configs[0].roundID] + cfg := to.getCurrentConfig() // Output precedings. delivered = to.output(hashes, cfg.numChains) // Check if any block in delivered set is the last block in this round, if @@ -1271,12 +1272,21 @@ func (to *totalOrdering) deliverBlocks() ( return } +func (to *totalOrdering) getCurrentConfig() *totalOrderingConfig { + cfgIdx := to.curRound - to.configs[0].roundID + if cfgIdx >= uint64(len(to.configs)) { + panic(fmt.Errorf("total ordering config is not ready: %v, %v, %v", + to.curRound, to.configs[0].roundID, len(to.configs))) + } + return to.configs[cfgIdx] +} + // processBlock is the entry point of totalOrdering. func (to *totalOrdering) processBlock( b *types.Block) ([]*types.Block, uint32, error) { // NOTE: Block b is assumed to be in topologically sorted, i.e., all its // acking blocks are during or after total ordering stage. - cfg := to.configs[to.curRound-to.configs[0].roundID] + cfg := to.getCurrentConfig() to.pendings[b.Hash] = b to.buildBlockRelation(b) isOldest, err := to.updateVectors(b) diff --git a/core/utils.go b/core/utils.go index 838369c..0c2d155 100644 --- a/core/utils.go +++ b/core/utils.go @@ -18,6 +18,7 @@ package core import ( + "context" "errors" "fmt" "math/rand" @@ -214,3 +215,22 @@ func isTravisCI() bool { func getDKGThreshold(config *types.Config) int { return int(config.DKGSetSize/3) + 1 } + +// checkWithCancel is a helper to perform periodic checking with cancel. +func checkWithCancel(parentCtx context.Context, interval time.Duration, + checker func() bool) (ret bool) { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() +Loop: + for { + select { + case <-ctx.Done(): + break Loop + case <-time.After(interval): + } + if ret = checker(); ret { + return + } + } + return +} -- cgit v1.2.3