diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-03-16 10:07:08 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-16 10:07:08 +0800 |
commit | 50c9419ba04ed44854fdb1afc1ef2e865be9876f (patch) | |
tree | 2acafd84bdb61d54da5621c29c3914986df45b65 | |
parent | b02fa5ee430cff9dafc9d9c399099a88d554a083 (diff) | |
download | dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.gz dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.bz2 dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.lz dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.xz dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.zst dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.zip |
core, syncer: integrate utils.RoundEvent (#490)
-rw-r--r-- | core/agreement-mgr.go | 84 | ||||
-rw-r--r-- | core/blockchain.go | 69 | ||||
-rw-r--r-- | core/blockchain_test.go | 74 | ||||
-rw-r--r-- | core/consensus.go | 331 | ||||
-rw-r--r-- | core/consensus_test.go | 2 | ||||
-rw-r--r-- | core/interfaces.go | 8 | ||||
-rw-r--r-- | core/syncer/consensus.go | 137 | ||||
-rw-r--r-- | core/test/app.go | 6 | ||||
-rw-r--r-- | core/utils.go | 6 | ||||
-rw-r--r-- | core/utils/round-event.go | 70 | ||||
-rw-r--r-- | core/utils/utils.go | 6 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 10 | ||||
-rw-r--r-- | integration_test/round-event_test.go | 29 |
13 files changed, 478 insertions, 354 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 7b5effb..0e39fa5 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -90,8 +90,6 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, type baRoundSetting struct { notarySet map[types.NodeID]struct{} - agr *agreement - recv *consensusBAReceiver ticker Ticker crs common.Hash } @@ -111,6 +109,7 @@ type agreementMgr struct { initRound uint64 configs []agreementMgrConfig baModule *agreement + recv *consensusBAReceiver processedBAResult map[types.Position]struct{} voteFilter *utils.VoteFilter waitGroup sync.WaitGroup @@ -136,15 +135,17 @@ func newAgreementMgr(con *Consensus, initRound uint64, configs: []agreementMgrConfig{initConfig}, voteFilter: utils.NewVoteFilter(), } - recv := &consensusBAReceiver{ - consensus: con, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, + mgr.recv = &consensusBAReceiver{ + consensus: con, + restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, + changeNotaryHeightValue: &atomic.Value{}, } - recv.roundValue.Store(uint64(0)) + mgr.recv.roundValue.Store(uint64(0)) + mgr.recv.changeNotaryHeightValue.Store(uint64(0)) agr := newAgreement( mgr.ID, - recv, + mgr.recv, newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer, mgr.logger) @@ -156,7 +157,7 @@ func newAgreementMgr(con *Consensus, initRound uint64, agr.notarySet = nodes.GetSubSet( int(initConfig.notarySetSize), types.NewNotarySetTarget(initConfig.crs)) // Hacky way to make agreement module self contained. - recv.agreementModule = agr + mgr.recv.agreementModule = agr mgr.baModule = agr return } @@ -188,15 +189,43 @@ func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig { return &mgr.configs[roundIndex] } -func (mgr *agreementMgr) appendConfig( - round uint64, config *types.Config, crs common.Hash) (err error) { +func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { mgr.lock.Lock() defer mgr.lock.Unlock() - if round != uint64(len(mgr.configs))+mgr.initRound { - return ErrRoundNotIncreasing + apply := func(e utils.RoundEventParam) error { + if len(mgr.configs) > 0 { + lastCfg := mgr.configs[len(mgr.configs)-1] + if e.BeginHeight != lastCfg.RoundEndHeight() { + return ErrInvalidBlockHeight + } + if lastCfg.RoundID() == e.Round { + mgr.configs[len(mgr.configs)-1].ExtendLength() + // It's not an atomic operation to update an atomic value based + // on another. However, it's the best way so far to extend + // length of round without refactoring. + if mgr.recv.round() == e.Round { + mgr.recv.changeNotaryHeightValue.Store( + mgr.configs[len(mgr.configs)-1].RoundEndHeight()) + } + } else if lastCfg.RoundID()+1 == e.Round { + mgr.configs = append(mgr.configs, newAgreementMgrConfig( + lastCfg, e.Config, e.CRS)) + } else { + return ErrInvalidRoundID + } + } else { + c := agreementMgrConfig{} + c.from(e.Round, e.Config, e.CRS) + c.SetRoundBeginHeight(e.BeginHeight) + mgr.configs = append(mgr.configs, c) + } + return nil + } + for _, e := range evts { + if err := apply(e); err != nil { + return err + } } - mgr.configs = append(mgr.configs, newAgreementMgrConfig( - mgr.configs[len(mgr.configs)-1], config, crs)) return nil } @@ -252,7 +281,7 @@ func (mgr *agreementMgr) processAgreementResult( } } else if result.Position.Newer(aID) { mgr.logger.Info("Fast syncing BA", "position", result.Position) - nodes, err := mgr.cache.GetNodeSet(result.Position.Round) + nIDs, err := mgr.cache.GetNotarySet(result.Position.Round) if err != nil { return err } @@ -261,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult( mgr.network.PullBlocks(common.Hashes{result.BlockHash}) mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) - nIDs := nodes.GetSubSet( - int(utils.GetConfigWithPanic( - mgr.gov, result.Position.Round, mgr.logger).NotarySetSize), - types.NewNotarySetTarget(crs)) for key := range result.Votes { if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err @@ -296,10 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) { currentRound uint64 nextRound = initRound curConfig = mgr.config(initRound) - setting = baRoundSetting{ - agr: mgr.baModule, - recv: mgr.baModule.data.recv.(*consensusBAReceiver), - } + setting = baRoundSetting{} tickDuration time.Duration ) @@ -353,12 +375,12 @@ Loop: break Loop default: } - setting.recv.isNotary = checkRound() + mgr.recv.isNotary = checkRound() // Run BA for this round. - setting.recv.roundValue.Store(currentRound) - setting.recv.changeNotaryHeight = curConfig.RoundEndHeight() - setting.recv.restartNotary <- types.Position{ - Round: setting.recv.round(), + mgr.recv.roundValue.Store(currentRound) + mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight()) + mgr.recv.restartNotary <- types.Position{ + Round: mgr.recv.round(), Height: math.MaxUint64, } mgr.voteFilter = utils.NewVoteFilter() @@ -373,8 +395,8 @@ Loop: func (mgr *agreementMgr) baRoutineForOneRound( setting *baRoundSetting) (err error) { - agr := setting.agr - recv := setting.recv + agr := mgr.baModule + recv := mgr.recv oldPos := agr.agreementID() restart := func(restartPos types.Position) (breakLoop bool, err error) { if !isStop(restartPos) { diff --git a/core/blockchain.go b/core/blockchain.go index 9263f67..c5a22b6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -41,7 +41,6 @@ var ( ErrNotFollowTipPosition = errors.New("not follow tip position") ErrDuplicatedPendingBlock = errors.New("duplicated pending block") ErrRetrySanityCheckLater = errors.New("retry sanity check later") - ErrRoundNotIncreasing = errors.New("round not increasing") ErrRoundNotSwitch = errors.New("round not switch") ErrIncorrectBlockRandomnessResult = errors.New( "incorrect block randomness result") @@ -142,19 +141,8 @@ type blockChain struct { } func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, - initConfig blockChainConfig, app Application, vGetter tsigVerifierGetter, - signer *utils.Signer, logger common.Logger) *blockChain { - if initBlock != nil { - if initConfig.RoundID() != initBlock.Position.Round { - panic(fmt.Errorf("incompatible config/block %s %d", - initBlock, initConfig.RoundID())) - } - } else { - if initConfig.RoundID() != 0 { - panic(fmt.Errorf("genesis config should from round 0 %d", - initConfig.RoundID())) - } - } + app Application, vGetter tsigVerifierGetter, signer *utils.Signer, + logger common.Logger) *blockChain { return &blockChain{ ID: nID, lastConfirmed: initBlock, @@ -163,23 +151,58 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, vGetter: vGetter, app: app, logger: logger, - configs: []blockChainConfig{initConfig}, dMoment: dMoment, pendingRandomnesses: make( map[types.Position]*types.BlockRandomnessResult), } } -func (bc *blockChain) appendConfig(round uint64, config *types.Config) error { - expectedRound := uint64(len(bc.configs)) - if bc.lastConfirmed != nil { - expectedRound += bc.lastConfirmed.Position.Round +func (bc *blockChain) notifyRoundEvents(evts []utils.RoundEventParam) error { + bc.lock.Lock() + defer bc.lock.Unlock() + apply := func(e utils.RoundEventParam) error { + if len(bc.configs) > 0 { + lastCfg := bc.configs[len(bc.configs)-1] + if e.BeginHeight != lastCfg.RoundEndHeight() { + return ErrInvalidBlockHeight + } + if lastCfg.RoundID() == e.Round { + bc.configs[len(bc.configs)-1].ExtendLength() + } else if lastCfg.RoundID()+1 == e.Round { + bc.configs = append(bc.configs, newBlockChainConfig( + lastCfg, e.Config)) + } else { + return ErrInvalidRoundID + } + } else { + c := blockChainConfig{} + c.fromConfig(e.Round, e.Config) + c.SetRoundBeginHeight(e.BeginHeight) + if bc.lastConfirmed == nil { + if c.RoundID() != 0 { + panic(fmt.Errorf("genesis config should from round 0 %d", + c.RoundID())) + } + } else { + if c.RoundID() != bc.lastConfirmed.Position.Round { + panic(fmt.Errorf("incompatible config/block %s %d", + bc.lastConfirmed, c.RoundID())) + } + if !c.Contains(bc.lastConfirmed.Position.Height) { + panic(fmt.Errorf( + "unmatched round-event with block %s %d %d %d", + bc.lastConfirmed, e.Round, e.Reset, e.BeginHeight)) + } + } + bc.configs = append(bc.configs, c) + } + return nil } - if round != expectedRound { - return ErrRoundNotIncreasing + for _, e := range evts { + if err := apply(e); err != nil { + return err + } } - bc.configs = append(bc.configs, newBlockChainConfig( - bc.configs[len(bc.configs)-1], config)) return nil } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 1996fbb..3fe7697 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -128,24 +128,27 @@ func (s *BlockChainTestSuite) newRandomnessFromBlock( } func (s *BlockChainTestSuite) newBlockChain(initB *types.Block, - roundLength uint64) *blockChain { + roundLength uint64) (bc *blockChain) { initRound := uint64(0) if initB != nil { initRound = initB.Position.Round } - initConfig := blockChainConfig{} - initConfig.fromConfig(initRound, &types.Config{ - MinBlockInterval: s.blockInterval, - RoundLength: roundLength, - }) + initHeight := uint64(0) if initB != nil { - initConfig.SetRoundBeginHeight(initB.Position.Height) - } else { - initConfig.SetRoundBeginHeight(0) + initHeight = initB.Position.Height } - return newBlockChain(s.nID, s.dMoment, initB, initConfig, - test.NewApp(0, nil, nil), &testTSigVerifierGetter{}, s.signer, - &common.NullLogger{}) + bc = newBlockChain(s.nID, s.dMoment, initB, test.NewApp(0, nil, nil), + &testTSigVerifierGetter{}, s.signer, &common.NullLogger{}) + s.Require().NoError(bc.notifyRoundEvents([]utils.RoundEventParam{ + utils.RoundEventParam{ + Round: initRound, + Reset: 0, + BeginHeight: initHeight, + Config: &types.Config{ + MinBlockInterval: s.blockInterval, + RoundLength: roundLength, + }}})) + return } func (s *BlockChainTestSuite) newRoundOneInitBlock() *types.Block { @@ -330,13 +333,32 @@ func (s *BlockChainTestSuite) TestSanityCheck() { s.Require().NoError(bc.sanityCheck(b4)) } -func (s *BlockChainTestSuite) TestAppendConfig() { - bc := s.newBlockChain(nil, 10) - s.Require().Equal(ErrRoundNotIncreasing.Error(), - bc.appendConfig(0, &types.Config{}).Error()) - s.Require().Equal(ErrRoundNotIncreasing.Error(), - bc.appendConfig(2, &types.Config{}).Error()) - s.Require().NoError(bc.appendConfig(1, &types.Config{})) +func (s *BlockChainTestSuite) TestNotifyRoundEvents() { + roundLength := uint64(10) + bc := s.newBlockChain(nil, roundLength) + newEvent := func(round, reset, height uint64) []utils.RoundEventParam { + return []utils.RoundEventParam{ + utils.RoundEventParam{ + Round: round, + Reset: reset, + BeginHeight: height, + CRS: common.Hash{}, + Config: &types.Config{RoundLength: roundLength}, + }} + } + s.Require().Equal(ErrInvalidRoundID.Error(), + bc.notifyRoundEvents(newEvent(2, 0, roundLength)).Error()) + s.Require().NoError(bc.notifyRoundEvents(newEvent(1, 0, roundLength))) + // Make sure new config is appended when new round is ready. + s.Require().Len(bc.configs, 2) + s.Require().Equal(ErrInvalidRoundID.Error(), + bc.notifyRoundEvents(newEvent(3, 1, roundLength*2)).Error()) + s.Require().Equal(ErrInvalidBlockHeight.Error(), + bc.notifyRoundEvents(newEvent(1, 1, roundLength)).Error()) + s.Require().NoError(bc.notifyRoundEvents(newEvent(1, 1, roundLength*2))) + // Make sure roundEndHeight is extended when DKG reset. + s.Require().Equal(bc.configs[len(bc.configs)-1].RoundEndHeight(), + roundLength*3) } func (s *BlockChainTestSuite) TestConfirmed() { @@ -354,10 +376,16 @@ func (s *BlockChainTestSuite) TestConfirmed() { func (s *BlockChainTestSuite) TestNextBlockAndTipRound() { var roundLength uint64 = 3 bc := s.newBlockChain(nil, roundLength) - s.Require().NoError(bc.appendConfig(1, &types.Config{ - MinBlockInterval: s.blockInterval, - RoundLength: roundLength, - })) + s.Require().NoError(bc.notifyRoundEvents([]utils.RoundEventParam{ + utils.RoundEventParam{ + Round: 1, + Reset: 0, + BeginHeight: roundLength, + CRS: common.Hash{}, + Config: &types.Config{ + MinBlockInterval: s.blockInterval, + RoundLength: roundLength, + }}})) blocks := s.newBlocks(3, nil) nextH, nextT := bc.nextBlock() s.Require().Equal(nextH, uint64(0)) diff --git a/core/consensus.go b/core/consensus.go index 5ee64c2..8529e40 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -56,18 +56,22 @@ var ( // consensusBAReceiver implements agreementReceiver. type consensusBAReceiver struct { // TODO(mission): consensus would be replaced by blockChain and network. - consensus *Consensus - agreementModule *agreement - changeNotaryHeight uint64 - roundValue *atomic.Value - isNotary bool - restartNotary chan types.Position + consensus *Consensus + agreementModule *agreement + changeNotaryHeightValue *atomic.Value + roundValue *atomic.Value + isNotary bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) round() uint64 { return recv.roundValue.Load().(uint64) } +func (recv *consensusBAReceiver) changeNotaryHeight() uint64 { + return recv.changeNotaryHeightValue.Load().(uint64) +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return @@ -247,16 +251,17 @@ CleanChannelLoop: } } newPos := block.Position - if block.Position.Height+1 == recv.changeNotaryHeight { + if block.Position.Height+1 == recv.changeNotaryHeight() { newPos.Round++ recv.roundValue.Store(newPos.Round) } currentRound := recv.round() - if block.Position.Height > recv.changeNotaryHeight && + changeNotaryHeight := recv.changeNotaryHeight() + if block.Position.Height > changeNotaryHeight && block.Position.Round <= currentRound { panic(fmt.Errorf( "round not switch when confirmig: %s, %d, should switch at %d", - block, currentRound, recv.changeNotaryHeight)) + block, currentRound, changeNotaryHeight)) } recv.restartNotary <- newPos } @@ -396,11 +401,11 @@ type Consensus struct { bcModule *blockChain dMoment time.Time nodeSetCache *utils.NodeSetCache - roundForNewConfig uint64 lock sync.RWMutex ctx context.Context ctxCancel context.CancelFunc event *common.Event + roundEvent *utils.RoundEvent logger common.Logger resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} @@ -540,8 +545,10 @@ func newConsensusForRound( } // Get configuration for bootstrap round. initRound := uint64(0) + initBlockHeight := uint64(0) if initBlock != nil { initRound = initBlock.Position.Round + initBlockHeight = initBlock.Position.Height } initConfig := utils.GetConfigWithPanic(gov, initRound, logger) initCRS := utils.GetCRSWithPanic(gov, initRound, logger) @@ -566,10 +573,7 @@ func newConsensusForRound( if usingNonBlocking { appModule = newNonBlocking(app, debugApp) } - bcConfig := blockChainConfig{} - bcConfig.fromConfig(initRound, initConfig) - bcConfig.SetRoundBeginHeight(initRoundBeginHeight) - bcModule := newBlockChain(ID, dMoment, initBlock, bcConfig, appModule, + bcModule := newBlockChain(ID, dMoment, initBlock, appModule, NewTSigVerifierCache(gov, 7), signer, logger) // Construct Consensus instance. con := &Consensus{ @@ -594,6 +598,10 @@ func newConsensusForRound( processBlockChan: make(chan *types.Block, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + if con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initRound, + initRoundBeginHeight, initBlockHeight, ConfigRoundShift); err != nil { + panic(err) + } baConfig := agreementMgrConfig{} baConfig.from(initRound, initConfig, initCRS) baConfig.SetRoundBeginHeight(initRoundBeginHeight) @@ -613,26 +621,139 @@ func newConsensusForRound( // - the last finalized block func (con *Consensus) prepare( initRoundBeginHeight uint64, initBlock *types.Block) (err error) { + // Trigger the round validation method for the next round of the first + // round. // The block past from full node should be delivered already or known by // full node. We don't have to notify it. initRound := uint64(0) if initBlock != nil { initRound = initBlock.Position.Round } - // Setup blockChain module. - con.roundForNewConfig = initRound + 1 - initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger) - initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger) - if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil { - return - } if initRound == 0 { if DKGDelayRound == 0 { panic("not implemented yet") } } - // Register events. - con.initialRound(initRoundBeginHeight, initRound, initConfig) + // Register round event handler to update BA and BC modules. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + // Always updates newer configs to the later modules first in the flow. + if err := con.bcModule.notifyRoundEvents(evts); err != nil { + panic(err) + } + // The init config is provided to baModule when construction. + if evts[len(evts)-1].BeginHeight != initRoundBeginHeight { + if err := con.baMgr.notifyRoundEvents(evts); err != nil { + panic(err) + } + } + }) + // Register round event handler to propose new CRS. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + // We don't have to propose new CRS during DKG reset, the reset of DKG + // would be done by the DKG set in previous round. + e := evts[len(evts)-1] + if e.Reset != 0 || e.Round < DKGDelayRound { + return + } + if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil { + con.logger.Error("Error getting DKG set when proposing CRS", + "round", e.Round, + "error", err) + } else { + if _, exist := curDkgSet[con.ID]; !exist { + return + } + con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) { + con.logger.Debug( + "Calling Governance.CRS to check if already proposed", + "round", e.Round+1) + if (con.gov.CRS(e.Round+1) != common.Hash{}) { + con.logger.Debug("CRS already proposed", "round", e.Round+1) + return + } + con.runCRS(e.Round, e.CRS) + }) + } + }) + // Touch nodeSetCache for next round. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + if e.Reset != 0 { + return + } + con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) { + if err := con.nodeSetCache.Touch(e.Round + 1); err != nil { + con.logger.Warn("Failed to update nodeSetCache", + "round", e.Round+1, + "error", err) + } + }) + }) + // checkCRS is a generator of checker to check if CRS for that round is + // ready or not. + checkCRS := func(round uint64) func() bool { + return func() bool { + nextCRS := con.gov.CRS(round) + if (nextCRS != common.Hash{}) { + return true + } + con.logger.Debug("CRS is not ready yet. Try again later...", + "nodeID", con.ID, + "round", round) + return false + } + } + // Trigger round validation method for next period. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + // Register a routine to trigger round events. + con.event.RegisterHeight(e.NextRoundValidationHeight(), func( + blockHeight uint64) { + con.roundEvent.ValidateNextRound(blockHeight) + }) + // Register a routine to register next DKG. + con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) { + nextRound := e.Round + 1 + if nextRound < DKGDelayRound { + con.logger.Info("Skip runDKG for round", "round", nextRound) + return + } + // Normally, gov.CRS would return non-nil. Use this for in case of + // unexpected network fluctuation and ensure the robustness. + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Debug("unable to prepare CRS for DKG set", + "round", nextRound) + return + } + nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) + if err != nil { + con.logger.Error("Error getting DKG set for next round", + "round", nextRound, + "error", err) + return + } + if _, exist := nextDkgSet[con.ID]; !exist { + con.logger.Info("Not selected as DKG set", "round", nextRound) + return + } + con.logger.Info("Selected as DKG set", "round", nextRound) + nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, + con.logger) + con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold( + nextConfig)) + con.event.RegisterHeight(e.NextDKGPreparationHeight(), + func(uint64) { + func() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + con.dkgRunning = 0 + }() + con.runDKG(nextRound, nextConfig) + }) + }) + }) + con.roundEvent.TriggerInitEvent() return } @@ -704,27 +825,9 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) { }() } -func (con *Consensus) runCRS(round uint64) { - for { - con.logger.Debug("Calling Governance.CRS to check if already proposed", - "round", round+1) - if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Debug("CRS already proposed", "round", round+1) - return - } - con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", - "round", round) - if con.cfgModule.isDKGFinal(round) { - break - } - con.logger.Debug("DKG is not ready for running CRS. Retry later...", - "round", round) - time.Sleep(500 * time.Millisecond) - } +func (con *Consensus) runCRS(round uint64, hash common.Hash) { // Start running next round CRS. - con.logger.Debug("Calling Governance.CRS", "round", round) - psig, err := con.cfgModule.preparePartialSignature( - round, utils.GetCRSWithPanic(con.gov, round, con.logger)) + psig, err := con.cfgModule.preparePartialSignature(round, hash) if err != nil { con.logger.Error("Failed to prepare partial signature", "error", err) } else if err = con.signer.SignDKGPartialSignature(psig); err != nil { @@ -751,130 +854,6 @@ func (con *Consensus) runCRS(round uint64) { } } -func (con *Consensus) initialRound( - startHeight uint64, round uint64, config *types.Config) { - select { - case <-con.ctx.Done(): - return - default: - } - if round >= DKGDelayRound { - curDkgSet, err := con.nodeSetCache.GetDKGSet(round) - if err != nil { - con.logger.Error("Error getting DKG set", "round", round, "error", err) - curDkgSet = make(map[types.NodeID]struct{}) - } - // Initiate CRS routine. - if _, exist := curDkgSet[con.ID]; exist { - con.event.RegisterHeight( - startHeight+config.RoundLength/2, - func(uint64) { - go func() { - con.runCRS(round) - }() - }) - } - } - // checkCRS is a generator of checker to check if CRS for that round is - // ready or not. - checkCRS := func(round uint64) func() bool { - return func() bool { - nextCRS := con.gov.CRS(round) - if (nextCRS != common.Hash{}) { - return true - } - con.logger.Debug("CRS is not ready yet. Try again later...", - "nodeID", con.ID, - "round", round) - return false - } - } - // Initiate BA modules. - con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) { - go func(nextRound uint64) { - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for baMgr", - "round", nextRound) - return - } - // Notify BA for new round. - nextConfig := utils.GetConfigWithPanic( - con.gov, nextRound, con.logger) - nextCRS := utils.GetCRSWithPanic( - con.gov, nextRound, con.logger) - con.logger.Info("appendConfig for baMgr", "round", nextRound) - if err := con.baMgr.appendConfig( - nextRound, nextConfig, nextCRS); err != nil { - panic(err) - } - }(round + 1) - }) - // Initiate DKG for this round. - con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) { - go func(nextRound uint64) { - if nextRound < DKGDelayRound { - con.logger.Info("Skip runDKG for round", "round", nextRound) - return - } - // Normally, gov.CRS would return non-nil. Use this for in case of - // unexpected network fluctuation and ensure the robustness. - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for DKG set", - "round", nextRound) - return - } - nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) - if err != nil { - con.logger.Error("Error getting DKG set", - "round", nextRound, - "error", err) - return - } - if _, exist := nextDkgSet[con.ID]; !exist { - return - } - con.logger.Info("Selected as DKG set", "round", nextRound) - con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold(config)) - con.event.RegisterHeight(startHeight+config.RoundLength*2/3, - func(uint64) { - func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgRunning = 0 - }() - nextConfig := utils.GetConfigWithPanic( - con.gov, nextRound, con.logger) - con.runDKG(nextRound, nextConfig) - }) - }(round + 1) - }) - // Prepare blockChain module for next round and next "initialRound" routine. - con.event.RegisterHeight(startHeight+config.RoundLength, func(uint64) { - // Change round. - // Get configuration for next round. - nextRound := round + 1 - nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger) - con.initialRound( - startHeight+config.RoundLength, nextRound, nextConfig) - }) - // Touch nodeSetCache for next round. - con.event.RegisterHeight(startHeight+config.RoundLength*9/10, func(uint64) { - go func() { - // TODO(jimmy): check DKGResetCount and do not touch if nextRound is reset. - if err := con.nodeSetCache.Touch(round + 1); err != nil { - con.logger.Warn("Failed to update nodeSetCache", - "round", round+1, "error", err) - } - if _, _, err := con.bcModule.vGetter.UpdateAndGet(round + 1); err != nil { - con.logger.Warn("Failed to update tsigVerifierCache", - "round", round+1, "error", err) - } - }() - }) -} - // Stop the Consensus core. func (con *Consensus) Stop() { con.ctxCancel() @@ -1195,24 +1174,6 @@ func (con *Consensus) deliverBlock(b *types.Block) { con.cfgModule.untouchTSigHash(b.Hash) con.logger.Debug("Calling Application.BlockDelivered", "block", b) con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone()) - if b.Position.Round == con.roundForNewConfig { - // Get configuration for the round next to next round. Configuration - // for that round should be ready at this moment and is required for - // blockChain module. This logic is related to: - // - roundShift - // - notifyGenesisRound - futureRound := con.roundForNewConfig + 1 - futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger) - con.logger.Debug("Append Config", "round", futureRound) - if err := con.bcModule.appendConfig( - futureRound, futureConfig); err != nil { - con.logger.Debug("Unable to append config", - "round", futureRound, - "error", err) - panic(err) - } - con.roundForNewConfig++ - } if con.debugApp != nil { con.debugApp.BlockReady(b.Hash) } diff --git a/core/consensus_test.go b/core/consensus_test.go index 8ebfb02..75f5422 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -283,7 +283,7 @@ func (s *ConsensusTestSuite) TestDKGCRS() { crsFinish := make(chan struct{}) for _, con := range cons { go func(con *Consensus) { - con.runCRS(0) + con.runCRS(0, gov.CRS(0)) crsFinish <- struct{}{} }(con) } diff --git a/core/interfaces.go b/core/interfaces.go index 7accac2..ddd6c3b 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -101,8 +101,12 @@ type Governance interface { // Return the genesis configuration if round == 0. Configuration(round uint64) *types.Config - // CRS returns the CRS for a given round. - // Return the genesis CRS if round == 0. + // CRS returns the CRS for a given round. Return the genesis CRS if + // round == 0. + // + // The CRS returned is the proposed or latest reseted one, it would be + // changed later if corresponding DKG set failed to generate group public + // key. CRS(round uint64) common.Hash // Propose a CRS of round. diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index fd48793..f2f8f9e 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -69,12 +69,14 @@ type Consensus struct { configs []*types.Config roundBeginHeights []uint64 agreementRoundCut uint64 + heightEvt *common.Event + roundEvt *utils.RoundEvent // lock for accessing all fields. lock sync.RWMutex duringBuffering bool latestCRSRound uint64 - moduleWaitGroup sync.WaitGroup + waitGroup sync.WaitGroup agreementWaitGroup sync.WaitGroup pullChan chan common.Hash receiveChan chan *types.Block @@ -116,6 +118,7 @@ func NewConsensus( receiveChan: make(chan *types.Block, 1000), pullChan: make(chan common.Hash, 1000), randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult), + heightEvt: common.NewEvent(), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) _, con.initChainTipHeight = db.GetCompactionChainTipInfo() @@ -143,9 +146,73 @@ func (con *Consensus) assureBuffering() { return } con.duringBuffering = true + // Get latest block to prepare utils.RoundEvent. + var ( + err error + blockHash, height = con.db.GetCompactionChainTipInfo() + ) + if height == 0 { + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger, + uint64(0), uint64(0), uint64(0), core.ConfigRoundShift) + } else { + var b types.Block + if b, err = con.db.GetBlock(blockHash); err == nil { + beginHeight := con.roundBeginHeights[b.Position.Round] + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, + con.logger, b.Position.Round, beginHeight, beginHeight, + core.ConfigRoundShift) + } + } + if err != nil { + panic(err) + } + // Make sure con.roundEvt stopped before stopping con.agreementModule. + con.waitGroup.Add(1) + // Register a round event handler to notify CRS to agreementModule. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + con.waitGroup.Add(1) + go func() { + defer con.waitGroup.Done() + for _, e := range evts { + select { + case <-con.ctx.Done(): + return + default: + } + for func() bool { + select { + case <-con.ctx.Done(): + return false + case con.agreementModule.inputChan <- e.Round: + return false + case <-time.After(500 * time.Millisecond): + con.logger.Warn( + "agreement input channel is full when putting CRS", + "round", e.Round, + ) + return true + } + }() { + } + } + }() + }) + // Register a round event handler to validate next round. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func( + blockHeight uint64) { + select { + case <-con.ctx.Done(): + return + default: + } + con.roundEvt.ValidateNextRound(blockHeight) + }) + }) + con.roundEvt.TriggerInitEvent() con.startAgreement() con.startNetwork() - con.startCRSMonitor() } func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { @@ -265,6 +332,7 @@ func (con *Consensus) SyncBlocks( b.Hash, b.Finalization.Height); err != nil { return } + go con.heightEvt.NotifyHeight(b.Finalization.Height) } if latest { con.assureBuffering() @@ -346,7 +414,10 @@ func (con *Consensus) stopBuffering() { return } con.logger.Trace("stop syncer modules") - con.moduleWaitGroup.Wait() + con.roundEvt.Stop() + con.waitGroup.Done() + // Wait for all routines depends on con.agreementModule stopped. + con.waitGroup.Wait() // Since there is no one waiting for the receive channel of fullnode, we // need to launch a dummy receiver right away. con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( @@ -492,9 +563,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { // startNetwork starts network for receiving blocks and agreement results. func (con *Consensus) startNetwork() { - con.moduleWaitGroup.Add(1) + con.waitGroup.Add(1) go func() { - defer con.moduleWaitGroup.Done() + defer con.waitGroup.Done() loop: for { select { @@ -522,62 +593,6 @@ func (con *Consensus) startNetwork() { }() } -// startCRSMonitor is the dummiest way to verify if the CRS for one round -// is ready or not. -func (con *Consensus) startCRSMonitor() { - var lastNotifiedRound uint64 - // Notify all agreements for new CRS. - notifyNewCRS := func(round uint64) { - con.setupConfigsUntilRound(round) - if round == lastNotifiedRound { - return - } - con.logger.Debug("CRS is ready", "round", round) - lastNotifiedRound = round - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.latestCRSRound = round - }() - for func() bool { - select { - case <-con.ctx.Done(): - return false - case con.agreementModule.inputChan <- round: - return false - case <-time.After(500 * time.Millisecond): - con.logger.Debug( - "agreement input channel is full when putting CRS", - "round", round, - ) - return true - } - }() { - } - } - con.moduleWaitGroup.Add(1) - go func() { - defer con.moduleWaitGroup.Done() - for { - select { - case <-con.ctx.Done(): - return - case <-time.After(500 * time.Millisecond): - } - // Notify agreement modules for the latest round that CRS is - // available if the round is not notified yet. - checked := lastNotifiedRound + 1 - for (con.gov.CRS(checked) != common.Hash{}) { - checked++ - } - checked-- - if checked > lastNotifiedRound { - notifyNewCRS(checked) - } - } - }() -} - func (con *Consensus) stopAgreement() { if con.agreementModule.inputChan != nil { close(con.agreementModule.inputChan) diff --git a/core/test/app.go b/core/test/app.go index 4cfd580..df58135 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -109,11 +109,13 @@ func NewApp(initRound uint64, gov *Governance, rEvt *utils.RoundEvent) ( } if rEvt != nil { app.hEvt.RegisterHeight( - utils.GetNextRoundCheckpoint(rEvt.LastPeriod()), func(h uint64) { + utils.GetNextRoundValidationHeight(rEvt.LastPeriod()), + func(h uint64) { rEvt.ValidateNextRound(h) }) rEvt.Register(func(evts []utils.RoundEventParam) { - app.hEvt.RegisterHeight(evts[len(evts)-1].NextRoundCheckpoint(), + app.hEvt.RegisterHeight( + evts[len(evts)-1].NextRoundValidationHeight(), func(h uint64) { rEvt.ValidateNextRound(h) }) diff --git a/core/utils.go b/core/utils.go index f7dee75..5742d11 100644 --- a/core/utils.go +++ b/core/utils.go @@ -236,14 +236,14 @@ func checkWithCancel(parentCtx context.Context, interval time.Duration, defer cancel() Loop: for { + if ret = checker(); ret { + return + } select { case <-ctx.Done(): break Loop case <-time.After(interval): } - if ret = checker(); ret { - return - } } return } diff --git a/core/utils/round-event.go b/core/utils/round-event.go index bab1d32..1ce877d 100644 --- a/core/utils/round-event.go +++ b/core/utils/round-event.go @@ -28,15 +28,15 @@ import ( typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" ) -// ErrUnmatchedBlockHeightWithGov is for invalid parameters for NewRoundEvent. -type ErrUnmatchedBlockHeightWithGov struct { +// ErrUnmatchedBlockHeightWithConfig is for invalid parameters for NewRoundEvent. +type ErrUnmatchedBlockHeightWithConfig struct { round uint64 reset uint64 blockHeight uint64 } -func (e ErrUnmatchedBlockHeightWithGov) Error() string { - return fmt.Sprintf("unsynced block height and gov: round:%d reset:%d h:%d", +func (e ErrUnmatchedBlockHeightWithConfig) Error() string { + return fmt.Sprintf("unsynced block height and cfg: round:%d reset:%d h:%d", e.round, e.reset, e.blockHeight) } @@ -56,11 +56,43 @@ type RoundEventParam struct { CRS common.Hash } -// NextRoundCheckpoint returns the height to check if the next round is ready. -func (e RoundEventParam) NextRoundCheckpoint() uint64 { +// NextRoundValidationHeight returns the height to check if the next round is +// ready. +func (e RoundEventParam) NextRoundValidationHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength*9/10 +} + +// NextCRSProposingHeight returns the height to propose CRS for next round. +func (e RoundEventParam) NextCRSProposingHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength/2 +} + +// NextDKGPreparationHeight returns the height to prepare DKG set for next +// round. +func (e RoundEventParam) NextDKGPreparationHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength*2/3 +} + +// NextRoundHeight returns the height of the beginning of next round. +func (e RoundEventParam) NextRoundHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength +} + +// NextTouchNodeSetCacheHeight returns the height to touch the node set cache. +func (e RoundEventParam) NextTouchNodeSetCacheHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength*9/10 +} + +// NextDKGResetHeight returns the height to reset DKG for next period. +func (e RoundEventParam) NextDKGResetHeight() uint64 { return e.BeginHeight + e.Config.RoundLength*8/10 } +// NextDKGRegisterHeight returns the height to register DKG. +func (e RoundEventParam) NextDKGRegisterHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength/2 +} + // roundEventFn defines the fingerprint of handlers of round events. type roundEventFn func([]RoundEventParam) @@ -131,7 +163,7 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor, e.config.ExtendLength() } if !e.config.Contains(initBlockHeight) { - return nil, ErrUnmatchedBlockHeightWithGov{ + return nil, ErrUnmatchedBlockHeightWithConfig{ round: initRound, reset: resetCount, blockHeight: initBlockHeight, @@ -149,6 +181,22 @@ func (e *RoundEvent) Register(h roundEventFn) { e.handlers = append(e.handlers, h) } +// TriggerInitEvent triggers event from the initial setting. +func (e *RoundEvent) TriggerInitEvent() { + e.lock.Lock() + defer e.lock.Unlock() + events := []RoundEventParam{RoundEventParam{ + Round: e.lastTriggeredRound, + Reset: e.lastTriggeredResetCount, + BeginHeight: e.config.LastPeriodBeginHeight(), + CRS: GetCRSWithPanic(e.gov, e.lastTriggeredRound, e.logger), + Config: GetConfigWithPanic(e.gov, e.lastTriggeredRound, e.logger), + }} + for _, h := range e.handlers { + h(events) + } +} + // ValidateNextRound validate if the DKG set for next round is ready to go or // failed to setup, all registered handlers would be called once some decision // is made on chain. @@ -225,14 +273,6 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) ( "crs", param.CRS.String()[:6], ) }() - // Make sure current last config covers the blockHeight. - if !e.config.Contains(blockHeight) { - panic(ErrUnmatchedBlockHeightWithGov{ - round: e.lastTriggeredRound, - reset: e.lastTriggeredResetCount, - blockHeight: blockHeight, - }) - } nextRound := e.lastTriggeredRound + 1 if nextRound >= startRound+e.roundShift { // Avoid access configuration newer than last confirmed one over diff --git a/core/utils/utils.go b/core/utils/utils.go index d714068..14687d6 100644 --- a/core/utils/utils.go +++ b/core/utils/utils.go @@ -139,8 +139,8 @@ func GetDKGThreshold(config *types.Config) int { return int(config.DKGSetSize/3) + 1 } -// GetNextRoundCheckpoint returns the block height to check if the next round -// is ready. -func GetNextRoundCheckpoint(begin, length uint64) uint64 { +// GetNextRoundValidationHeight returns the block height to check if the next +// round is ready. +func GetNextRoundValidationHeight(begin, length uint64) uint64 { return begin + length*9/10 } diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 5131465..70e6c1f 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -180,7 +180,7 @@ func (s *ConsensusTestSuite) syncBlocksWithSomeNode( } syncerHeight = b.Finalization.Height + 1 compactionChainBlocks = append(compactionChainBlocks, &b) - if len(compactionChainBlocks) >= 100 { + if len(compactionChainBlocks) >= 20 { if syncBlocks() { break } @@ -340,9 +340,9 @@ func (s *ConsensusTestSuite) TestSync() { req = s.Require() peerCount = 4 dMoment = time.Now().UTC() - untilRound = uint64(5) - stopRound = uint64(3) - aliveRound = uint64(1) + untilRound = uint64(7) + stopRound = uint64(5) + aliveRound = uint64(4) errChan = make(chan error, 100) ) prvKeys, pubKeys, err := test.NewKeys(peerCount) @@ -442,7 +442,7 @@ ReachAlive: select { case <-runnerCtx.Done(): break SyncLoop - case <-time.After(2 * time.Second): + case <-time.After(4 * time.Second): } } }() diff --git a/integration_test/round-event_test.go b/integration_test/round-event_test.go index f8cac26..8201bfe 100644 --- a/integration_test/round-event_test.go +++ b/integration_test/round-event_test.go @@ -221,6 +221,35 @@ func (s *RoundEventTestSuite) TestLastPeriod() { s.Require().Equal(length, uint64(200)) } +func (s *RoundEventTestSuite) TestTriggerInitEvent() { + gov := s.prepareGov() + s.Require().NoError(gov.State().RequestChange(test.StateChangeRoundLength, + uint64(100))) + gov.CatchUpWithRound(0) + s.Require().NoError(gov.State().RequestChange(test.StateChangeRoundLength, + uint64(200))) + gov.CatchUpWithRound(1) + // Prepare utils.RoundEvent, starts from genesis. + rEvt, err := utils.NewRoundEvent( + context.Background(), gov, s.logger, 0, 0, 0, core.ConfigRoundShift) + s.Require().NoError(err) + // Register a handler to collects triggered events. + var evts []evtParamToCheck + rEvt.Register(func(params []utils.RoundEventParam) { + for _, p := range params { + evts = append(evts, evtParamToCheck{ + round: p.Round, + reset: p.Reset, + height: p.BeginHeight, + crs: p.CRS, + }) + } + }) + rEvt.TriggerInitEvent() + s.Require().Len(evts, 1) + s.Require().Equal(evts[0], evtParamToCheck{0, 0, 0, gov.CRS(0)}) +} + func TestRoundEvent(t *testing.T) { suite.Run(t, new(RoundEventTestSuite)) } |