diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus')
9 files changed, 658 insertions, 383 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index 7b5effba8..0e39fa52a 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -90,8 +90,6 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, type baRoundSetting struct { notarySet map[types.NodeID]struct{} - agr *agreement - recv *consensusBAReceiver ticker Ticker crs common.Hash } @@ -111,6 +109,7 @@ type agreementMgr struct { initRound uint64 configs []agreementMgrConfig baModule *agreement + recv *consensusBAReceiver processedBAResult map[types.Position]struct{} voteFilter *utils.VoteFilter waitGroup sync.WaitGroup @@ -136,15 +135,17 @@ func newAgreementMgr(con *Consensus, initRound uint64, configs: []agreementMgrConfig{initConfig}, voteFilter: utils.NewVoteFilter(), } - recv := &consensusBAReceiver{ - consensus: con, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, + mgr.recv = &consensusBAReceiver{ + consensus: con, + restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, + changeNotaryHeightValue: &atomic.Value{}, } - recv.roundValue.Store(uint64(0)) + mgr.recv.roundValue.Store(uint64(0)) + mgr.recv.changeNotaryHeightValue.Store(uint64(0)) agr := newAgreement( mgr.ID, - recv, + mgr.recv, newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer, mgr.logger) @@ -156,7 +157,7 @@ func newAgreementMgr(con *Consensus, initRound uint64, agr.notarySet = nodes.GetSubSet( int(initConfig.notarySetSize), types.NewNotarySetTarget(initConfig.crs)) // Hacky way to make agreement module self contained. - recv.agreementModule = agr + mgr.recv.agreementModule = agr mgr.baModule = agr return } @@ -188,15 +189,43 @@ func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig { return &mgr.configs[roundIndex] } -func (mgr *agreementMgr) appendConfig( - round uint64, config *types.Config, crs common.Hash) (err error) { +func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { mgr.lock.Lock() defer mgr.lock.Unlock() - if round != uint64(len(mgr.configs))+mgr.initRound { - return ErrRoundNotIncreasing + apply := func(e utils.RoundEventParam) error { + if len(mgr.configs) > 0 { + lastCfg := mgr.configs[len(mgr.configs)-1] + if e.BeginHeight != lastCfg.RoundEndHeight() { + return ErrInvalidBlockHeight + } + if lastCfg.RoundID() == e.Round { + mgr.configs[len(mgr.configs)-1].ExtendLength() + // It's not an atomic operation to update an atomic value based + // on another. However, it's the best way so far to extend + // length of round without refactoring. + if mgr.recv.round() == e.Round { + mgr.recv.changeNotaryHeightValue.Store( + mgr.configs[len(mgr.configs)-1].RoundEndHeight()) + } + } else if lastCfg.RoundID()+1 == e.Round { + mgr.configs = append(mgr.configs, newAgreementMgrConfig( + lastCfg, e.Config, e.CRS)) + } else { + return ErrInvalidRoundID + } + } else { + c := agreementMgrConfig{} + c.from(e.Round, e.Config, e.CRS) + c.SetRoundBeginHeight(e.BeginHeight) + mgr.configs = append(mgr.configs, c) + } + return nil + } + for _, e := range evts { + if err := apply(e); err != nil { + return err + } } - mgr.configs = append(mgr.configs, newAgreementMgrConfig( - mgr.configs[len(mgr.configs)-1], config, crs)) return nil } @@ -252,7 +281,7 @@ func (mgr *agreementMgr) processAgreementResult( } } else if result.Position.Newer(aID) { mgr.logger.Info("Fast syncing BA", "position", result.Position) - nodes, err := mgr.cache.GetNodeSet(result.Position.Round) + nIDs, err := mgr.cache.GetNotarySet(result.Position.Round) if err != nil { return err } @@ -261,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult( mgr.network.PullBlocks(common.Hashes{result.BlockHash}) mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) - nIDs := nodes.GetSubSet( - int(utils.GetConfigWithPanic( - mgr.gov, result.Position.Round, mgr.logger).NotarySetSize), - types.NewNotarySetTarget(crs)) for key := range result.Votes { if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err @@ -296,10 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) { currentRound uint64 nextRound = initRound curConfig = mgr.config(initRound) - setting = baRoundSetting{ - agr: mgr.baModule, - recv: mgr.baModule.data.recv.(*consensusBAReceiver), - } + setting = baRoundSetting{} tickDuration time.Duration ) @@ -353,12 +375,12 @@ Loop: break Loop default: } - setting.recv.isNotary = checkRound() + mgr.recv.isNotary = checkRound() // Run BA for this round. - setting.recv.roundValue.Store(currentRound) - setting.recv.changeNotaryHeight = curConfig.RoundEndHeight() - setting.recv.restartNotary <- types.Position{ - Round: setting.recv.round(), + mgr.recv.roundValue.Store(currentRound) + mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight()) + mgr.recv.restartNotary <- types.Position{ + Round: mgr.recv.round(), Height: math.MaxUint64, } mgr.voteFilter = utils.NewVoteFilter() @@ -373,8 +395,8 @@ Loop: func (mgr *agreementMgr) baRoutineForOneRound( setting *baRoundSetting) (err error) { - agr := setting.agr - recv := setting.recv + agr := mgr.baModule + recv := mgr.recv oldPos := agr.agreementID() restart := func(restartPos types.Position) (breakLoop bool, err error) { if !isStop(restartPos) { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go index 19a580b4f..c5a22b628 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go @@ -41,7 +41,6 @@ var ( ErrNotFollowTipPosition = errors.New("not follow tip position") ErrDuplicatedPendingBlock = errors.New("duplicated pending block") ErrRetrySanityCheckLater = errors.New("retry sanity check later") - ErrRoundNotIncreasing = errors.New("round not increasing") ErrRoundNotSwitch = errors.New("round not switch") ErrIncorrectBlockRandomnessResult = errors.New( "incorrect block randomness result") @@ -142,19 +141,8 @@ type blockChain struct { } func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, - initConfig blockChainConfig, app Application, vGetter tsigVerifierGetter, - signer *utils.Signer, logger common.Logger) *blockChain { - if initBlock != nil { - if initConfig.RoundID() != initBlock.Position.Round { - panic(fmt.Errorf("incompatible config/block %s %d", - initBlock, initConfig.RoundID())) - } - } else { - if initConfig.RoundID() != 0 { - panic(fmt.Errorf("genesis config should from round 0 %d", - initConfig.RoundID())) - } - } + app Application, vGetter tsigVerifierGetter, signer *utils.Signer, + logger common.Logger) *blockChain { return &blockChain{ ID: nID, lastConfirmed: initBlock, @@ -163,23 +151,58 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block, vGetter: vGetter, app: app, logger: logger, - configs: []blockChainConfig{initConfig}, dMoment: dMoment, pendingRandomnesses: make( map[types.Position]*types.BlockRandomnessResult), } } -func (bc *blockChain) appendConfig(round uint64, config *types.Config) error { - expectedRound := uint64(len(bc.configs)) - if bc.lastConfirmed != nil { - expectedRound += bc.lastConfirmed.Position.Round +func (bc *blockChain) notifyRoundEvents(evts []utils.RoundEventParam) error { + bc.lock.Lock() + defer bc.lock.Unlock() + apply := func(e utils.RoundEventParam) error { + if len(bc.configs) > 0 { + lastCfg := bc.configs[len(bc.configs)-1] + if e.BeginHeight != lastCfg.RoundEndHeight() { + return ErrInvalidBlockHeight + } + if lastCfg.RoundID() == e.Round { + bc.configs[len(bc.configs)-1].ExtendLength() + } else if lastCfg.RoundID()+1 == e.Round { + bc.configs = append(bc.configs, newBlockChainConfig( + lastCfg, e.Config)) + } else { + return ErrInvalidRoundID + } + } else { + c := blockChainConfig{} + c.fromConfig(e.Round, e.Config) + c.SetRoundBeginHeight(e.BeginHeight) + if bc.lastConfirmed == nil { + if c.RoundID() != 0 { + panic(fmt.Errorf("genesis config should from round 0 %d", + c.RoundID())) + } + } else { + if c.RoundID() != bc.lastConfirmed.Position.Round { + panic(fmt.Errorf("incompatible config/block %s %d", + bc.lastConfirmed, c.RoundID())) + } + if !c.Contains(bc.lastConfirmed.Position.Height) { + panic(fmt.Errorf( + "unmatched round-event with block %s %d %d %d", + bc.lastConfirmed, e.Round, e.Reset, e.BeginHeight)) + } + } + bc.configs = append(bc.configs, c) + } + return nil } - if round != expectedRound { - return ErrRoundNotIncreasing + for _, e := range evts { + if err := apply(e); err != nil { + return err + } } - bc.configs = append(bc.configs, newBlockChainConfig( - bc.configs[len(bc.configs)-1], config)) return nil } @@ -558,8 +581,11 @@ func (bc *blockChain) prepareBlock(position types.Position, } if tipConfig.IsLastBlock(tip) { if tip.Position.Round+1 != position.Round { - b, err = nil, ErrRoundNotSwitch - return + if !empty { + b, err = nil, ErrRoundNotSwitch + return + } + b.Position.Round = tip.Position.Round + 1 } } else { if tip.Position.Round != position.Round { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 4201cbcc2..8529e4031 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -56,18 +56,22 @@ var ( // consensusBAReceiver implements agreementReceiver. type consensusBAReceiver struct { // TODO(mission): consensus would be replaced by blockChain and network. - consensus *Consensus - agreementModule *agreement - changeNotaryHeight uint64 - roundValue *atomic.Value - isNotary bool - restartNotary chan types.Position + consensus *Consensus + agreementModule *agreement + changeNotaryHeightValue *atomic.Value + roundValue *atomic.Value + isNotary bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) round() uint64 { return recv.roundValue.Load().(uint64) } +func (recv *consensusBAReceiver) changeNotaryHeight() uint64 { + return recv.changeNotaryHeightValue.Load().(uint64) +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return @@ -247,16 +251,17 @@ CleanChannelLoop: } } newPos := block.Position - if block.Position.Height+1 == recv.changeNotaryHeight { + if block.Position.Height+1 == recv.changeNotaryHeight() { newPos.Round++ recv.roundValue.Store(newPos.Round) } currentRound := recv.round() - if block.Position.Height > recv.changeNotaryHeight && + changeNotaryHeight := recv.changeNotaryHeight() + if block.Position.Height > changeNotaryHeight && block.Position.Round <= currentRound { panic(fmt.Errorf( "round not switch when confirmig: %s, %d, should switch at %d", - block, currentRound, recv.changeNotaryHeight)) + block, currentRound, changeNotaryHeight)) } recv.restartNotary <- newPos } @@ -396,11 +401,11 @@ type Consensus struct { bcModule *blockChain dMoment time.Time nodeSetCache *utils.NodeSetCache - roundForNewConfig uint64 lock sync.RWMutex ctx context.Context ctxCancel context.CancelFunc event *common.Event + roundEvent *utils.RoundEvent logger common.Logger resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} @@ -453,6 +458,7 @@ func NewConsensusForSimulation( func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginHeight uint64, + startWithEmpty bool, dMoment time.Time, app Application, gov Governance, @@ -495,6 +501,23 @@ func NewConsensusFromSyncer( continue } } + if startWithEmpty { + pos := initBlock.Position + pos.Height++ + block, err := con.bcModule.addEmptyBlock(pos) + if err != nil { + panic(err) + } + con.processBlockChan <- block + if pos.Round >= DKGDelayRound { + rand := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + IsEmptyBlock: true, + } + go con.prepareRandomnessResult(rand) + } + } return con, nil } @@ -522,8 +545,10 @@ func newConsensusForRound( } // Get configuration for bootstrap round. initRound := uint64(0) + initBlockHeight := uint64(0) if initBlock != nil { initRound = initBlock.Position.Round + initBlockHeight = initBlock.Position.Height } initConfig := utils.GetConfigWithPanic(gov, initRound, logger) initCRS := utils.GetCRSWithPanic(gov, initRound, logger) @@ -548,10 +573,7 @@ func newConsensusForRound( if usingNonBlocking { appModule = newNonBlocking(app, debugApp) } - bcConfig := blockChainConfig{} - bcConfig.fromConfig(initRound, initConfig) - bcConfig.SetRoundBeginHeight(initRoundBeginHeight) - bcModule := newBlockChain(ID, dMoment, initBlock, bcConfig, appModule, + bcModule := newBlockChain(ID, dMoment, initBlock, appModule, NewTSigVerifierCache(gov, 7), signer, logger) // Construct Consensus instance. con := &Consensus{ @@ -576,6 +598,10 @@ func newConsensusForRound( processBlockChan: make(chan *types.Block, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + if con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initRound, + initRoundBeginHeight, initBlockHeight, ConfigRoundShift); err != nil { + panic(err) + } baConfig := agreementMgrConfig{} baConfig.from(initRound, initConfig, initCRS) baConfig.SetRoundBeginHeight(initRoundBeginHeight) @@ -595,26 +621,139 @@ func newConsensusForRound( // - the last finalized block func (con *Consensus) prepare( initRoundBeginHeight uint64, initBlock *types.Block) (err error) { + // Trigger the round validation method for the next round of the first + // round. // The block past from full node should be delivered already or known by // full node. We don't have to notify it. initRound := uint64(0) if initBlock != nil { initRound = initBlock.Position.Round } - // Setup blockChain module. - con.roundForNewConfig = initRound + 1 - initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger) - initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger) - if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil { - return - } if initRound == 0 { if DKGDelayRound == 0 { panic("not implemented yet") } } - // Register events. - con.initialRound(initRoundBeginHeight, initRound, initConfig) + // Register round event handler to update BA and BC modules. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + // Always updates newer configs to the later modules first in the flow. + if err := con.bcModule.notifyRoundEvents(evts); err != nil { + panic(err) + } + // The init config is provided to baModule when construction. + if evts[len(evts)-1].BeginHeight != initRoundBeginHeight { + if err := con.baMgr.notifyRoundEvents(evts); err != nil { + panic(err) + } + } + }) + // Register round event handler to propose new CRS. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + // We don't have to propose new CRS during DKG reset, the reset of DKG + // would be done by the DKG set in previous round. + e := evts[len(evts)-1] + if e.Reset != 0 || e.Round < DKGDelayRound { + return + } + if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil { + con.logger.Error("Error getting DKG set when proposing CRS", + "round", e.Round, + "error", err) + } else { + if _, exist := curDkgSet[con.ID]; !exist { + return + } + con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) { + con.logger.Debug( + "Calling Governance.CRS to check if already proposed", + "round", e.Round+1) + if (con.gov.CRS(e.Round+1) != common.Hash{}) { + con.logger.Debug("CRS already proposed", "round", e.Round+1) + return + } + con.runCRS(e.Round, e.CRS) + }) + } + }) + // Touch nodeSetCache for next round. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + if e.Reset != 0 { + return + } + con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) { + if err := con.nodeSetCache.Touch(e.Round + 1); err != nil { + con.logger.Warn("Failed to update nodeSetCache", + "round", e.Round+1, + "error", err) + } + }) + }) + // checkCRS is a generator of checker to check if CRS for that round is + // ready or not. + checkCRS := func(round uint64) func() bool { + return func() bool { + nextCRS := con.gov.CRS(round) + if (nextCRS != common.Hash{}) { + return true + } + con.logger.Debug("CRS is not ready yet. Try again later...", + "nodeID", con.ID, + "round", round) + return false + } + } + // Trigger round validation method for next period. + con.roundEvent.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + // Register a routine to trigger round events. + con.event.RegisterHeight(e.NextRoundValidationHeight(), func( + blockHeight uint64) { + con.roundEvent.ValidateNextRound(blockHeight) + }) + // Register a routine to register next DKG. + con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) { + nextRound := e.Round + 1 + if nextRound < DKGDelayRound { + con.logger.Info("Skip runDKG for round", "round", nextRound) + return + } + // Normally, gov.CRS would return non-nil. Use this for in case of + // unexpected network fluctuation and ensure the robustness. + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Debug("unable to prepare CRS for DKG set", + "round", nextRound) + return + } + nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) + if err != nil { + con.logger.Error("Error getting DKG set for next round", + "round", nextRound, + "error", err) + return + } + if _, exist := nextDkgSet[con.ID]; !exist { + con.logger.Info("Not selected as DKG set", "round", nextRound) + return + } + con.logger.Info("Selected as DKG set", "round", nextRound) + nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, + con.logger) + con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold( + nextConfig)) + con.event.RegisterHeight(e.NextDKGPreparationHeight(), + func(uint64) { + func() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + con.dkgRunning = 0 + }() + con.runDKG(nextRound, nextConfig) + }) + }) + }) + con.roundEvent.TriggerInitEvent() return } @@ -686,27 +825,9 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) { }() } -func (con *Consensus) runCRS(round uint64) { - for { - con.logger.Debug("Calling Governance.CRS to check if already proposed", - "round", round+1) - if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Debug("CRS already proposed", "round", round+1) - return - } - con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", - "round", round) - if con.cfgModule.isDKGFinal(round) { - break - } - con.logger.Debug("DKG is not ready for running CRS. Retry later...", - "round", round) - time.Sleep(500 * time.Millisecond) - } +func (con *Consensus) runCRS(round uint64, hash common.Hash) { // Start running next round CRS. - con.logger.Debug("Calling Governance.CRS", "round", round) - psig, err := con.cfgModule.preparePartialSignature( - round, utils.GetCRSWithPanic(con.gov, round, con.logger)) + psig, err := con.cfgModule.preparePartialSignature(round, hash) if err != nil { con.logger.Error("Failed to prepare partial signature", "error", err) } else if err = con.signer.SignDKGPartialSignature(psig); err != nil { @@ -733,136 +854,16 @@ func (con *Consensus) runCRS(round uint64) { } } -func (con *Consensus) initialRound( - startHeight uint64, round uint64, config *types.Config) { - select { - case <-con.ctx.Done(): - return - default: - } - if round >= DKGDelayRound { - curDkgSet, err := con.nodeSetCache.GetDKGSet(round) - if err != nil { - con.logger.Error("Error getting DKG set", "round", round, "error", err) - curDkgSet = make(map[types.NodeID]struct{}) - } - // Initiate CRS routine. - if _, exist := curDkgSet[con.ID]; exist { - con.event.RegisterHeight( - startHeight+config.RoundLength/2, - func(uint64) { - go func() { - con.runCRS(round) - }() - }) - } - } - // checkCRS is a generator of checker to check if CRS for that round is - // ready or not. - checkCRS := func(round uint64) func() bool { - return func() bool { - nextCRS := con.gov.CRS(round) - if (nextCRS != common.Hash{}) { - return true - } - con.logger.Debug("CRS is not ready yet. Try again later...", - "nodeID", con.ID, - "round", round) - return false - } - } - // Initiate BA modules. - con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) { - go func(nextRound uint64) { - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for baMgr", - "round", nextRound) - return - } - // Notify BA for new round. - nextConfig := utils.GetConfigWithPanic( - con.gov, nextRound, con.logger) - nextCRS := utils.GetCRSWithPanic( - con.gov, nextRound, con.logger) - con.logger.Info("appendConfig for baMgr", "round", nextRound) - if err := con.baMgr.appendConfig( - nextRound, nextConfig, nextCRS); err != nil { - panic(err) - } - }(round + 1) - }) - // Initiate DKG for this round. - con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) { - go func(nextRound uint64) { - if nextRound < DKGDelayRound { - con.logger.Info("Skip runDKG for round", "round", nextRound) - return - } - // Normally, gov.CRS would return non-nil. Use this for in case of - // unexpected network fluctuation and ensure the robustness. - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for DKG set", - "round", nextRound) - return - } - nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) - if err != nil { - con.logger.Error("Error getting DKG set", - "round", nextRound, - "error", err) - return - } - if _, exist := nextDkgSet[con.ID]; !exist { - return - } - con.logger.Info("Selected as DKG set", "round", nextRound) - con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold(config)) - con.event.RegisterHeight(startHeight+config.RoundLength*2/3, - func(uint64) { - func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgRunning = 0 - }() - nextConfig := utils.GetConfigWithPanic( - con.gov, nextRound, con.logger) - con.runDKG(nextRound, nextConfig) - }) - }(round + 1) - }) - // Prepare blockChain module for next round and next "initialRound" routine. - con.event.RegisterHeight(startHeight+config.RoundLength, func(uint64) { - // Change round. - // Get configuration for next round. - nextRound := round + 1 - nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger) - con.initialRound( - startHeight+config.RoundLength, nextRound, nextConfig) - }) - // Touch nodeSetCache for next round. - con.event.RegisterHeight(startHeight+config.RoundLength*9/10, func(uint64) { - go func() { - // TODO(jimmy): check DKGResetCount and do not touch if nextRound is reset. - if err := con.nodeSetCache.Touch(round + 1); err != nil { - con.logger.Warn("Failed to update nodeSetCache", - "round", round+1, "error", err) - } - if _, _, err := con.bcModule.vGetter.UpdateAndGet(round + 1); err != nil { - con.logger.Warn("Failed to update tsigVerifierCache", - "round", round+1, "error", err) - } - }() - }) -} - // Stop the Consensus core. func (con *Consensus) Stop() { con.ctxCancel() con.baMgr.stop() con.event.Reset() con.waitGroup.Wait() + if nbApp, ok := con.app.(*nonBlocking); ok { + fmt.Println("Stopping nonBlocking App") + nbApp.wait() + } } func (con *Consensus) deliverNetworkMsg() { @@ -1014,62 +1015,64 @@ func (con *Consensus) ProcessAgreementResult( con.logger.Debug("Rebroadcast AgreementResult", "result", rand) con.network.BroadcastAgreementResult(rand) + go con.prepareRandomnessResult(rand) + return nil +} - go func() { - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - con.logger.Error("Failed to get dkg set", - "round", rand.Position.Round, "error", err) - return - } - if _, exist := dkgSet[con.ID]; !exist { - return - } - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - con.logger.Error("Failed to prepare psig", - "round", rand.Position.Round, - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.signer.SignDKGPartialSignature(psig); err != nil { - con.logger.Error("Failed to sign psig", +func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) { + dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) + if err != nil { + con.logger.Error("Failed to get dkg set", + "round", rand.Position.Round, "error", err) + return + } + if _, exist := dkgSet[con.ID]; !exist { + return + } + con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash) + psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) + if err != nil { + con.logger.Error("Failed to prepare psig", + "round", rand.Position.Round, + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.signer.SignDKGPartialSignature(psig); err != nil { + con.logger.Error("Failed to sign psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.cfgModule.processPartialSignature(psig); err != nil { + con.logger.Error("Failed process psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", + "proposer", psig.ProposerID, + "round", psig.Round, + "hash", psig.Hash.String()[:6]) + con.network.BroadcastDKGPartialSignature(psig) + tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) + if err != nil { + if err != ErrTSigAlreadyRunning { + con.logger.Error("Failed to run TSIG", + "position", rand.Position, "hash", rand.BlockHash.String()[:6], "error", err) - return } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - con.logger.Error("Failed process psig", - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash.String()[:6]) - con.network.BroadcastDKGPartialSignature(psig) - tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) - if err != nil { - if err != ErrTSigAlreadyRunning { - con.logger.Error("Failed to run TSIG", - "position", rand.Position, - "hash", rand.BlockHash.String()[:6], - "error", err) - } - return - } - result := &types.BlockRandomnessResult{ - BlockHash: rand.BlockHash, - Position: rand.Position, - Randomness: tsig.Signature, - } - // ProcessBlockRandomnessResult is not thread-safe so we put the result in - // the message channnel to be processed in the main thread. - con.msgChan <- result - }() - return nil + return + } + result := &types.BlockRandomnessResult{ + BlockHash: rand.BlockHash, + Position: rand.Position, + Randomness: tsig.Signature, + } + // ProcessBlockRandomnessResult is not thread-safe so we put the result in + // the message channnel to be processed in the main thread. + con.msgChan <- result } // ProcessBlockRandomnessResult processes the randomness result. @@ -1094,14 +1097,6 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - var exist bool - exist, err = con.nodeSetCache.Exists(b.Position.Round, b.ProposerID) - if err != nil { - return - } - if !exist { - return ErrProposerNotInNodeSet - } err = con.baMgr.processBlock(b) if err == nil && con.debugApp != nil { con.debugApp.BlockReceived(b.Hash) @@ -1137,7 +1132,10 @@ func (con *Consensus) deliveryGuard() { defer con.waitGroup.Done() time.Sleep(con.dMoment.Sub(time.Now())) // Node takes time to start. - time.Sleep(60 * time.Second) + select { + case <-con.ctx.Done(): + case <-time.After(60 * time.Second): + } for { select { case <-con.ctx.Done(): @@ -1176,24 +1174,6 @@ func (con *Consensus) deliverBlock(b *types.Block) { con.cfgModule.untouchTSigHash(b.Hash) con.logger.Debug("Calling Application.BlockDelivered", "block", b) con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone()) - if b.Position.Round == con.roundForNewConfig { - // Get configuration for the round next to next round. Configuration - // for that round should be ready at this moment and is required for - // blockChain module. This logic is related to: - // - roundShift - // - notifyGenesisRound - futureRound := con.roundForNewConfig + 1 - futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger) - con.logger.Debug("Append Config", "round", futureRound) - if err := con.bcModule.appendConfig( - futureRound, futureConfig); err != nil { - con.logger.Debug("Unable to append config", - "round", futureRound, - "error", err) - panic(err) - } - con.roundForNewConfig++ - } if con.debugApp != nil { con.debugApp.BlockReady(b.Hash) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index 45a1fc7d5..ddd6c3bb9 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -101,8 +101,12 @@ type Governance interface { // Return the genesis configuration if round == 0. Configuration(round uint64) *types.Config - // CRS returns the CRS for a given round. - // Return the genesis CRS if round == 0. + // CRS returns the CRS for a given round. Return the genesis CRS if + // round == 0. + // + // The CRS returned is the proposed or latest reseted one, it would be + // changed later if corresponding DKG set failed to generate group public + // key. CRS(round uint64) common.Hash // Propose a CRS of round. @@ -162,3 +166,12 @@ type Ticker interface { // Retart the ticker and clear all internal data. Restart() } + +// Recovery interface for interacting with recovery information. +type Recovery interface { + // ProposeSkipBlock proposes a skip block. + ProposeSkipBlock(height uint64) error + + // Votes gets the number of votes of given height. + Votes(height uint64) (uint64, error) +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 25911ce5f..f2f8f9e66 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -69,12 +69,14 @@ type Consensus struct { configs []*types.Config roundBeginHeights []uint64 agreementRoundCut uint64 + heightEvt *common.Event + roundEvt *utils.RoundEvent // lock for accessing all fields. lock sync.RWMutex duringBuffering bool latestCRSRound uint64 - moduleWaitGroup sync.WaitGroup + waitGroup sync.WaitGroup agreementWaitGroup sync.WaitGroup pullChan chan common.Hash receiveChan chan *types.Block @@ -82,6 +84,7 @@ type Consensus struct { ctxCancel context.CancelFunc syncedLastBlock *types.Block syncedConsensus *core.Consensus + syncedSkipNext bool dummyCancel context.CancelFunc dummyFinished <-chan struct{} dummyMsgBuffer []interface{} @@ -115,6 +118,7 @@ func NewConsensus( receiveChan: make(chan *types.Block, 1000), pullChan: make(chan common.Hash, 1000), randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult), + heightEvt: common.NewEvent(), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) _, con.initChainTipHeight = db.GetCompactionChainTipInfo() @@ -142,9 +146,73 @@ func (con *Consensus) assureBuffering() { return } con.duringBuffering = true + // Get latest block to prepare utils.RoundEvent. + var ( + err error + blockHash, height = con.db.GetCompactionChainTipInfo() + ) + if height == 0 { + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger, + uint64(0), uint64(0), uint64(0), core.ConfigRoundShift) + } else { + var b types.Block + if b, err = con.db.GetBlock(blockHash); err == nil { + beginHeight := con.roundBeginHeights[b.Position.Round] + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, + con.logger, b.Position.Round, beginHeight, beginHeight, + core.ConfigRoundShift) + } + } + if err != nil { + panic(err) + } + // Make sure con.roundEvt stopped before stopping con.agreementModule. + con.waitGroup.Add(1) + // Register a round event handler to notify CRS to agreementModule. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + con.waitGroup.Add(1) + go func() { + defer con.waitGroup.Done() + for _, e := range evts { + select { + case <-con.ctx.Done(): + return + default: + } + for func() bool { + select { + case <-con.ctx.Done(): + return false + case con.agreementModule.inputChan <- e.Round: + return false + case <-time.After(500 * time.Millisecond): + con.logger.Warn( + "agreement input channel is full when putting CRS", + "round", e.Round, + ) + return true + } + }() { + } + } + }() + }) + // Register a round event handler to validate next round. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func( + blockHeight uint64) { + select { + case <-con.ctx.Done(): + return + default: + } + con.roundEvt.ValidateNextRound(blockHeight) + }) + }) + con.roundEvt.TriggerInitEvent() con.startAgreement() con.startNetwork() - con.startCRSMonitor() } func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { @@ -180,6 +248,29 @@ func (con *Consensus) buildAllEmptyBlocks() { } } +// ForceSync forces syncer to become synced. +func (con *Consensus) ForceSync(skip bool) { + if con.syncedLastBlock != nil { + return + } + hash, _ := con.db.GetCompactionChainTipInfo() + var block types.Block + block, err := con.db.GetBlock(hash) + if err != nil { + panic(err) + } + con.logger.Info("Force Sync", "block", &block) + con.setupConfigsUntilRound(block.Position.Round + core.ConfigRoundShift - 1) + con.syncedLastBlock = &block + con.stopBuffering() + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + con.syncedSkipNext = skip +} + // SyncBlocks syncs blocks from compaction chain, latest is true if the caller // regards the blocks are the latest ones. Notice that latest can be true for // many times. @@ -241,6 +332,7 @@ func (con *Consensus) SyncBlocks( b.Hash, b.Finalization.Height); err != nil { return } + go con.heightEvt.NotifyHeight(b.Finalization.Height) } if latest { con.assureBuffering() @@ -279,6 +371,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.syncedConsensus, err = core.NewConsensusFromSyncer( con.syncedLastBlock, con.roundBeginHeights[con.syncedLastBlock.Position.Round], + con.syncedSkipNext, con.dMoment, con.app, con.gov, @@ -321,7 +414,10 @@ func (con *Consensus) stopBuffering() { return } con.logger.Trace("stop syncer modules") - con.moduleWaitGroup.Wait() + con.roundEvt.Stop() + con.waitGroup.Done() + // Wait for all routines depends on con.agreementModule stopped. + con.waitGroup.Wait() // Since there is no one waiting for the receive channel of fullnode, we // need to launch a dummy receiver right away. con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( @@ -467,9 +563,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { // startNetwork starts network for receiving blocks and agreement results. func (con *Consensus) startNetwork() { - con.moduleWaitGroup.Add(1) + con.waitGroup.Add(1) go func() { - defer con.moduleWaitGroup.Done() + defer con.waitGroup.Done() loop: for { select { @@ -497,62 +593,6 @@ func (con *Consensus) startNetwork() { }() } -// startCRSMonitor is the dummiest way to verify if the CRS for one round -// is ready or not. -func (con *Consensus) startCRSMonitor() { - var lastNotifiedRound uint64 - // Notify all agreements for new CRS. - notifyNewCRS := func(round uint64) { - con.setupConfigsUntilRound(round) - if round == lastNotifiedRound { - return - } - con.logger.Debug("CRS is ready", "round", round) - lastNotifiedRound = round - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.latestCRSRound = round - }() - for func() bool { - select { - case <-con.ctx.Done(): - return false - case con.agreementModule.inputChan <- round: - return false - case <-time.After(500 * time.Millisecond): - con.logger.Debug( - "agreement input channel is full when putting CRS", - "round", round, - ) - return true - } - }() { - } - } - con.moduleWaitGroup.Add(1) - go func() { - defer con.moduleWaitGroup.Done() - for { - select { - case <-con.ctx.Done(): - return - case <-time.After(500 * time.Millisecond): - } - // Notify agreement modules for the latest round that CRS is - // available if the round is not notified yet. - checked := lastNotifiedRound + 1 - for (con.gov.CRS(checked) != common.Hash{}) { - checked++ - } - checked-- - if checked > lastNotifiedRound { - notifyNewCRS(checked) - } - } - }() -} - func (con *Consensus) stopAgreement() { if con.agreementModule.inputChan != nil { close(con.agreementModule.inputChan) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go new file mode 100644 index 000000000..d08bff9e9 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go @@ -0,0 +1,148 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +type configReader interface { + Configuration(round uint64) *types.Config +} + +// WatchCat is reponsible for signaling if syncer object should be terminated. +type WatchCat struct { + recovery core.Recovery + timeout time.Duration + configReader configReader + feed chan types.Position + polling time.Duration + ctx context.Context + cancel context.CancelFunc + logger common.Logger +} + +// NewWatchCat creats a new WatchCat 🐱 object. +func NewWatchCat( + recovery core.Recovery, + configReader configReader, + polling time.Duration, + timeout time.Duration, + logger common.Logger) *WatchCat { + wc := &WatchCat{ + recovery: recovery, + timeout: timeout, + configReader: configReader, + feed: make(chan types.Position), + polling: polling, + logger: logger, + } + return wc +} + +// Feed the WatchCat so it won't produce the termination signal. +func (wc *WatchCat) Feed(position types.Position) { + wc.feed <- position +} + +// Start the WatchCat. +func (wc *WatchCat) Start() { + wc.Stop() + wc.ctx, wc.cancel = context.WithCancel(context.Background()) + go func() { + var lastPos types.Position + MonitorLoop: + for { + select { + case <-wc.ctx.Done(): + return + default: + } + select { + case <-wc.ctx.Done(): + return + case pos := <-wc.feed: + if !pos.Newer(lastPos) { + wc.logger.Warn("Feed with older height", + "pos", pos, "lastPos", lastPos) + continue + } + lastPos = pos + case <-time.After(wc.timeout): + break MonitorLoop + } + } + go func() { + for { + select { + case <-wc.ctx.Done(): + return + case <-wc.feed: + } + } + }() + defer wc.cancel() + proposed := false + threshold := uint64( + utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger). + NotarySetSize / 2) + wc.logger.Info("Threshold for recovery", "votes", threshold) + ResetLoop: + for { + if !proposed { + wc.logger.Info("Calling Recovery.ProposeSkipBlock", + "height", lastPos.Height) + if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil { + wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err) + } else { + proposed = true + } + } + votes, err := wc.recovery.Votes(lastPos.Height) + if err != nil { + wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err) + } else if votes > threshold { + wc.logger.Info("Threshold for recovery reached!") + break ResetLoop + } + select { + case <-wc.ctx.Done(): + return + case <-time.After(wc.polling): + } + } + }() +} + +// Stop the WatchCat. +func (wc *WatchCat) Stop() { + if wc.cancel != nil { + wc.cancel() + } +} + +// Meow return a closed channel if syncer should be terminated. +func (wc *WatchCat) Meow() <-chan struct{} { + return wc.ctx.Done() +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go index f7dee757f..5742d113a 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go @@ -236,14 +236,14 @@ func checkWithCancel(parentCtx context.Context, interval time.Duration, defer cancel() Loop: for { + if ret = checker(); ret { + return + } select { case <-ctx.Done(): break Loop case <-time.After(interval): } - if ret = checker(); ret { - return - } } return } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go index bab1d32d2..1ce877dda 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go @@ -28,15 +28,15 @@ import ( typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" ) -// ErrUnmatchedBlockHeightWithGov is for invalid parameters for NewRoundEvent. -type ErrUnmatchedBlockHeightWithGov struct { +// ErrUnmatchedBlockHeightWithConfig is for invalid parameters for NewRoundEvent. +type ErrUnmatchedBlockHeightWithConfig struct { round uint64 reset uint64 blockHeight uint64 } -func (e ErrUnmatchedBlockHeightWithGov) Error() string { - return fmt.Sprintf("unsynced block height and gov: round:%d reset:%d h:%d", +func (e ErrUnmatchedBlockHeightWithConfig) Error() string { + return fmt.Sprintf("unsynced block height and cfg: round:%d reset:%d h:%d", e.round, e.reset, e.blockHeight) } @@ -56,11 +56,43 @@ type RoundEventParam struct { CRS common.Hash } -// NextRoundCheckpoint returns the height to check if the next round is ready. -func (e RoundEventParam) NextRoundCheckpoint() uint64 { +// NextRoundValidationHeight returns the height to check if the next round is +// ready. +func (e RoundEventParam) NextRoundValidationHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength*9/10 +} + +// NextCRSProposingHeight returns the height to propose CRS for next round. +func (e RoundEventParam) NextCRSProposingHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength/2 +} + +// NextDKGPreparationHeight returns the height to prepare DKG set for next +// round. +func (e RoundEventParam) NextDKGPreparationHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength*2/3 +} + +// NextRoundHeight returns the height of the beginning of next round. +func (e RoundEventParam) NextRoundHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength +} + +// NextTouchNodeSetCacheHeight returns the height to touch the node set cache. +func (e RoundEventParam) NextTouchNodeSetCacheHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength*9/10 +} + +// NextDKGResetHeight returns the height to reset DKG for next period. +func (e RoundEventParam) NextDKGResetHeight() uint64 { return e.BeginHeight + e.Config.RoundLength*8/10 } +// NextDKGRegisterHeight returns the height to register DKG. +func (e RoundEventParam) NextDKGRegisterHeight() uint64 { + return e.BeginHeight + e.Config.RoundLength/2 +} + // roundEventFn defines the fingerprint of handlers of round events. type roundEventFn func([]RoundEventParam) @@ -131,7 +163,7 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor, e.config.ExtendLength() } if !e.config.Contains(initBlockHeight) { - return nil, ErrUnmatchedBlockHeightWithGov{ + return nil, ErrUnmatchedBlockHeightWithConfig{ round: initRound, reset: resetCount, blockHeight: initBlockHeight, @@ -149,6 +181,22 @@ func (e *RoundEvent) Register(h roundEventFn) { e.handlers = append(e.handlers, h) } +// TriggerInitEvent triggers event from the initial setting. +func (e *RoundEvent) TriggerInitEvent() { + e.lock.Lock() + defer e.lock.Unlock() + events := []RoundEventParam{RoundEventParam{ + Round: e.lastTriggeredRound, + Reset: e.lastTriggeredResetCount, + BeginHeight: e.config.LastPeriodBeginHeight(), + CRS: GetCRSWithPanic(e.gov, e.lastTriggeredRound, e.logger), + Config: GetConfigWithPanic(e.gov, e.lastTriggeredRound, e.logger), + }} + for _, h := range e.handlers { + h(events) + } +} + // ValidateNextRound validate if the DKG set for next round is ready to go or // failed to setup, all registered handlers would be called once some decision // is made on chain. @@ -225,14 +273,6 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) ( "crs", param.CRS.String()[:6], ) }() - // Make sure current last config covers the blockHeight. - if !e.config.Contains(blockHeight) { - panic(ErrUnmatchedBlockHeightWithGov{ - round: e.lastTriggeredRound, - reset: e.lastTriggeredResetCount, - blockHeight: blockHeight, - }) - } nextRound := e.lastTriggeredRound + 1 if nextRound >= startRound+e.roundShift { // Avoid access configuration newer than last confirmed one over diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go index b8bd95ec4..14687d6ac 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go @@ -138,3 +138,9 @@ func LaunchDummyReceiver( func GetDKGThreshold(config *types.Config) int { return int(config.DKGSetSize/3) + 1 } + +// GetNextRoundValidationHeight returns the block height to check if the next +// round is ready. +func GetNextRoundValidationHeight(begin, length uint64) uint64 { + return begin + length*9/10 +} |