diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-12-26 15:23:54 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-26 15:23:54 +0800 |
commit | 39c02fe0f7c81491ea897fafcf32595d280bbdbe (patch) | |
tree | 1ac3d002de42bb7471624656713e331db55aaea2 | |
parent | 00416c9df2fec5398389863fb6f885a1fe11a6cc (diff) | |
download | dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.gz dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.bz2 dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.lz dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.xz dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.zst dexon-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.zip |
core: fix stuffs (#383)
* Merge core.Consensus constructors
* Downgrade severity of logs
* Refine logic to add blocks from pool to lattice
* Add test.LaunchDummyReceiver
-rw-r--r-- | core/agreement-mgr.go | 2 | ||||
-rw-r--r-- | core/configuration-chain.go | 4 | ||||
-rw-r--r-- | core/consensus.go | 181 | ||||
-rw-r--r-- | core/lattice.go | 17 | ||||
-rw-r--r-- | core/syncer/agreement.go | 4 | ||||
-rw-r--r-- | core/syncer/consensus.go | 12 | ||||
-rw-r--r-- | core/test/utils.go | 19 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 70 |
8 files changed, 128 insertions, 181 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index a9fa21d..2b5c4bc 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -313,7 +313,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { if config = mgr.getConfig(nextRound); config != nil { break } else { - mgr.logger.Info("round is not ready", "round", nextRound) + mgr.logger.Debug("round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } diff --git a/core/configuration-chain.go b/core/configuration-chain.go index ad24e44..5c389a7 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -135,7 +135,7 @@ func (cc *configurationChain) runDKG(round uint64) error { } cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round) for !cc.gov.IsDKGMPKReady(round) { - cc.logger.Info("DKG MPKs are not ready yet. Try again later...", + cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", "nodeID", cc.ID) cc.dkgLock.Unlock() time.Sleep(500 * time.Millisecond) @@ -206,7 +206,7 @@ func (cc *configurationChain) runDKG(round uint64) error { // unexpected network fluctuation and ensure the robustness of DKG protocol. cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) for !cc.gov.IsDKGFinal(round) { - cc.logger.Info("DKG is not ready yet. Try again later...", + cc.logger.Debug("DKG is not ready yet. Try again later...", "nodeID", cc.ID) time.Sleep(500 * time.Millisecond) } diff --git a/core/consensus.go b/core/consensus.go index 7f77df9..cab28cb 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -399,8 +399,8 @@ func NewConsensus( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { - - return newConsensus(dMoment, app, gov, db, network, prv, logger, true) + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -413,19 +413,60 @@ func NewConsensusForSimulation( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false) +} - return newConsensus(dMoment, app, gov, db, network, prv, logger, false) +// NewConsensusFromSyncer constructs an Consensus instance from information +// provided from syncer. +// +// You need to provide the initial block for this newly created Consensus +// instance to bootstrap with. A proper choice is the last finalized block you +// delivered to syncer. +func NewConsensusFromSyncer( + initBlock *types.Block, + initRoundBeginTime time.Time, + app Application, + gov Governance, + db db.Database, + networkModule Network, + prv crypto.PrivateKey, + latticeModule *Lattice, + blocks []*types.Block, + randomnessResults []*types.BlockRandomnessResult, + logger common.Logger) (*Consensus, error) { + // Setup Consensus instance. + con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, + networkModule, prv, logger, latticeModule, true) + // Dump all BA-confirmed blocks to the consensus instance. + for _, b := range blocks { + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + // Dump all randomness result to the consensus instance. + for _, r := range randomnessResults { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { + con.logger.Error("failed to process randomness result when syncing", + "result", r) + continue + } + } + return con, nil } // newConsensus creates a Consensus instance. -func newConsensus( - dMoment time.Time, +func newConsensusForRound( + initBlock *types.Block, + initRoundBeginTime time.Time, app Application, gov Governance, db db.Database, network Network, prv crypto.PrivateKey, logger common.Logger, + latticeModule *Lattice, usingNonBlocking bool) *Consensus { // TODO(w): load latest blockHeight from DB, and use config at that height. @@ -437,12 +478,14 @@ func newConsensus( if a, ok := app.(Debug); ok { debugApp = a } - // Get configuration for genesis round. - var round uint64 - config := utils.GetConfigWithPanic(gov, round, logger) + // Get configuration for bootstrap round. + initRound := initBlock.Position.Round + initConfig := utils.GetConfigWithPanic(gov, initRound, logger) // Init lattice. - lattice := NewLattice( - dMoment, round, config, signer, app, debugApp, db, logger) + if latticeModule == nil { + latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig, + signer, app, debugApp, db, logger) + } // Init configuration chain. ID := types.NewNodeID(prv.PublicKey()) recv := &consensusDKGReceiver{ @@ -453,13 +496,7 @@ func newConsensus( network: network, logger: logger, } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) + cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger) recv.cfgModule = cfgModule appModule := app if usingNonBlocking { @@ -467,86 +504,14 @@ func newConsensus( } // Construct Consensus instance. con := &Consensus{ - ID: ID, - ccModule: newCompactionChain(gov), - lattice: lattice, - app: appModule, - debugApp: debugApp, - gov: gov, - db: db, - network: network, - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - signer: signer, - event: common.NewEvent(), - logger: logger, - resetRandomnessTicker: make(chan struct{}), - } - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, round, dMoment) - if err := con.prepare(&types.Block{}); err != nil { - panic(err) - } - return con -} - -// NewConsensusFromSyncer constructs an Consensus instance from information -// provided from syncer. -// -// You need to provide the initial block for this newly created Consensus -// instance to bootstrap with. A proper choice is the last finalized block you -// delivered to syncer. -func NewConsensusFromSyncer( - initBlock *types.Block, - initRoundBeginTime time.Time, - app Application, - gov Governance, - db db.Database, - networkModule Network, - prv crypto.PrivateKey, - latticeModule *Lattice, - blocks []*types.Block, - randomnessResults []*types.BlockRandomnessResult, - logger common.Logger) (*Consensus, error) { - // Setup the cache for node sets. - nodeSetCache := utils.NewNodeSetCache(gov) - // Setup signer module. - signer := utils.NewSigner(prv) - // Init configuration chain. - ID := types.NewNodeID(prv.PublicKey()) - recv := &consensusDKGReceiver{ - ID: ID, - gov: gov, - signer: signer, - nodeSetCache: nodeSetCache, - network: networkModule, - logger: logger, - } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) - recv.cfgModule = cfgModule - // Check if the application implement Debug interface. - var debugApp Debug - if a, ok := app.(Debug); ok { - debugApp = a - } - // Setup Consensus instance. - con := &Consensus{ ID: ID, ccModule: newCompactionChain(gov), lattice: latticeModule, - app: newNonBlocking(app, debugApp), + app: appModule, + debugApp: debugApp, gov: gov, db: db, - network: networkModule, + network: network, baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, @@ -557,27 +522,11 @@ func NewConsensusFromSyncer( logger: logger, } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime) - // Bootstrap the consensus instance. + con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) if err := con.prepare(initBlock); err != nil { - return nil, err - } - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err - } - } - // Dump all randomness result to the consensus instance. - for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r, false); err != nil { - con.logger.Error("failed to process randomness result when syncing", - "result", r) - continue - } + panic(err) } - return con, nil + return con } // prepare the Consensus instance to be ready for blocks after 'initBlock'. @@ -678,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) { con.logger.Debug("Calling Governance.CRS to check if already proposed", "round", round+1) if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Info("CRS already proposed", "round", round+1) + con.logger.Debug("CRS already proposed", "round", round+1) return } con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", @@ -749,7 +698,7 @@ func (con *Consensus) initialRound( if (nextCRS != common.Hash{}) { return true } - con.logger.Info("CRS is not ready yet. Try again later...", + con.logger.Debug("CRS is not ready yet. Try again later...", "nodeID", con.ID, "round", round) return false @@ -762,7 +711,7 @@ func (con *Consensus) initialRound( go func(nextRound uint64) { if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for baMgr", + con.logger.Debug("unable to prepare CRS for baMgr", "round", nextRound) return } @@ -786,7 +735,7 @@ func (con *Consensus) initialRound( // unexpected network fluctuation and ensure the robustness. if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for DKG set", + con.logger.Debug("unable to prepare CRS for DKG set", "round", nextRound) return } @@ -1158,7 +1107,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if con.nonFinalizedBlockDelivered { panic(fmt.Errorf("attempting to skip finalized block: %s", b)) } - con.logger.Info("skip delivery of finalized block", + con.logger.Debug("skip delivery of finalized block", "block", b, "finalization-height", b.Finalization.Height) continue diff --git a/core/lattice.go b/core/lattice.go index db19cf9..591c63d 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -193,17 +193,28 @@ func (l *Lattice) addBlockToLattice( if err == nil { var output []*types.Block if output, err = l.data.addBlock(tip); err != nil { - l.logger.Error("Sanity Check failed", "error", err) - continue + // We should be able to add this block once sanity check + // passed. + l.logger.Error("Failed to add sanity-checked block", + "block", tip, "error", err) + panic(err) } hasOutput = true outputBlocks = append(outputBlocks, output...) + l.pool.removeTip(i) + continue } if _, ok := err.(*ErrAckingBlockNotExists); ok { + l.logger.Debug("Pending block for lattice", + "pending", tip, + "last", l.data.chains[tip.Position.ChainID]) err = nil continue + } else { + l.logger.Error("Unexpected sanity check error", + "block", tip, "error", err) + panic(err) } - l.pool.removeTip(i) } if !hasOutput { break diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index fee4624..32ea654 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -106,7 +106,7 @@ func (a *agreement) processBlock(b *types.Block) { func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { - a.logger.Info("agreement result already confirmed", "result", r) + a.logger.Debug("agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { @@ -116,7 +116,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { a.pendings[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r - a.logger.Info("agreement result cached", "result", r) + a.logger.Debug("agreement result cached", "result", r) return } if err := core.VerifyAgreementResult(r, a.cache); err != nil { diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 32bbab3..c767a6d 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -153,7 +153,7 @@ func (con *Consensus) checkIfValidated() bool { if validatedChainCount == numChains { return true } - con.logger.Info("not validated yet", "validated-chain", validatedChainCount) + con.logger.Debug("not validated yet", "validated-chain", validatedChainCount) return false } @@ -197,7 +197,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { if overlapCount == numChains { return true } - con.logger.Info("not synced yet", + con.logger.Debug("not synced yet", "overlap-count", overlapCount, "num-chain", numChains, "last-block", blocks[len(blocks)-1]) @@ -262,7 +262,7 @@ func (con *Consensus) ensureAgreementOverlapRound() bool { } if tipRoundMap[r] == con.configs[r].NumChains { con.agreementRoundCut = r - con.logger.Info("agreement round cut found, round", r) + con.logger.Debug("agreement round cut found, round", r) return true } } @@ -411,7 +411,7 @@ func (con *Consensus) SyncBlocks( "expected", tipHeight+1) return false, ErrInvalidSyncingFinalizationHeight } - con.logger.Info("syncBlocks", + con.logger.Debug("syncBlocks", "position", &blocks[0].Position, "final height", blocks[0].Finalization.Height, "len", len(blocks), @@ -446,7 +446,7 @@ func (con *Consensus) SyncBlocks( return false, err } if syncBlock != nil { - con.logger.Info("deliver set found", "block", syncBlock) + con.logger.Debug("deliver set found", "block", syncBlock) // New lattice with the round of syncBlock. con.initConsensusObj(syncBlock) con.setupConfigs(blocks) @@ -700,7 +700,7 @@ func (con *Consensus) startCRSMonitor() { if round == lastNotifiedRound { return } - con.logger.Info("CRS is ready", "round", round) + con.logger.Debug("CRS is ready", "round", round) con.lock.RLock() defer con.lock.RUnlock() lastNotifiedRound = round diff --git a/core/test/utils.go b/core/test/utils.go index 84c90a9..8a14ebf 100644 --- a/core/test/utils.go +++ b/core/test/utils.go @@ -18,6 +18,7 @@ package test import ( + "context" "errors" "fmt" "math" @@ -225,3 +226,21 @@ func VerifyDB(db db.Database) error { } return nil } + +// LaunchDummyReceiver launches a go routine to receive and drop messages from +// a network module. An optional context could be passed to stop the go routine. +func LaunchDummyReceiver( + ctx context.Context, networkModule *Network) context.CancelFunc { + dummyCtx, dummyCancel := context.WithCancel(ctx) + go func() { + loop: + for { + select { + case <-dummyCtx.Done(): + break loop + case <-networkModule.ReceiveChan(): + } + } + }() + return dummyCancel +} diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index c1b812b..d70ae4f 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -353,53 +353,27 @@ func (s *ConsensusTestSuite) TestSync() { } // Clean syncNode's network receive channel, or it might exceed the limit // and block other go routines. - dummyReceiverCtx, dummyReceiverCtxCancel := context.WithCancel( - context.Background()) - go func() { - Loop: - for { - select { - case <-syncNode.network.ReceiveChan(): - case <-dummyReceiverCtx.Done(): - break Loop - } - } - }() - // Print status every 5 seconds so CI won't fail. - monitorCtx, monitorCancel := context.WithCancel( - context.Background()) - defer monitorCancel() - go func() { - for { - select { - case <-time.After(5 * time.Second): - for _, n := range nodes { - pos := n.app.GetLatestDeliveredPosition() - fmt.Println("latestPos", n.ID, &pos) - break - } - case <-monitorCtx.Done(): - return - } - } - }() + dummyReceiverCtxCancel := test.LaunchDummyReceiver( + context.Background(), syncNode.network) ReachAlive: for { + // Check if any error happened or sleep for a period of time. + select { + case err := <-errChan: + req.NoError(err) + case <-time.After(5 * time.Second): + } // If all nodes excepts syncNode have reached aliveRound, call syncNode's // Run() and send it all blocks in one of normal node's compaction chain. for id, n := range nodes { if id == syncNode.ID { continue } - if n.app.GetLatestDeliveredPosition().Round < aliveRound { + pos := n.app.GetLatestDeliveredPosition() + if pos.Round < aliveRound { + fmt.Println("latestPos", n.ID, &pos) continue ReachAlive } - // Check if any error happened or sleep for a period of time. - select { - case err := <-errChan: - req.NoError(err) - case <-time.After(5 * time.Second): - } } dummyReceiverCtxCancel() break @@ -446,34 +420,28 @@ ReachAlive: }() // Wait until all nodes reach 'untilRound'. go func() { + n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition() ReachFinished: for { + fmt.Println("latestPos", n.ID, &pos) time.Sleep(5 * time.Second) - for _, n := range nodes { + for _, n = range nodes { + pos = n.app.GetLatestDeliveredPosition() if n.ID == stoppedNode.ID { if n.con == nil { continue } - if n.app.GetLatestDeliveredPosition().Round < stopRound { - continue + if pos.Round < stopRound { + continue ReachFinished } // Stop a node, we should still be able to proceed. stoppedNode.con.Stop() stoppedNode.con = nil fmt.Println("one node stopped", stoppedNode.ID) - // Initiate a dummy routine to consume the receive channel. - go func() { - for { - select { - case <-runnerCtx.Done(): - return - case <-stoppedNode.network.ReceiveChan(): - } - } - }() + test.LaunchDummyReceiver(runnerCtx, stoppedNode.network) continue } - if n.app.GetLatestDeliveredPosition().Round < untilRound { + if pos.Round < untilRound { continue ReachFinished } } |