From 8ef4fc213703620fbfa13890dee042d40eea8545 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Wed, 20 Feb 2019 12:53:18 +0800 Subject: core: switch round by block height (#450) --- core/agreement-mgr.go | 158 +++++++++++++++++++------------------------------- 1 file changed, 61 insertions(+), 97 deletions(-) (limited to 'core/agreement-mgr.go') diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 5f5b9ae..88cc432 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -65,13 +65,29 @@ func genValidLeader( } type agreementMgrConfig struct { - beginTime time.Time - roundInterval time.Duration + roundBasedConfig + notarySetSize uint32 lambdaBA time.Duration crs common.Hash } +func (c *agreementMgrConfig) from( + round uint64, config *types.Config, crs common.Hash) { + c.notarySetSize = config.NotarySetSize + c.lambdaBA = config.LambdaBA + c.crs = crs + c.setupRoundBasedFields(round, config) +} + +func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, + crs common.Hash) (c agreementMgrConfig) { + c = agreementMgrConfig{} + c.from(prev.roundID+1, config, crs) + c.setRoundBeginHeight(prev.roundEndHeight) + return +} + type baRoundSetting struct { notarySet map[types.NodeID]struct{} agr *agreement @@ -92,9 +108,8 @@ type agreementMgr struct { signer *utils.Signer bcModule *blockChain ctx context.Context - lastEndTime time.Time initRound uint64 - configs []*agreementMgrConfig + configs []agreementMgrConfig baModule *agreement processedBAResult map[types.Position]struct{} voteFilter *utils.VoteFilter @@ -104,8 +119,8 @@ type agreementMgr struct { } func newAgreementMgr(con *Consensus, initRound uint64, - initRoundBeginTime time.Time) *agreementMgr { - return &agreementMgr{ + initConfig agreementMgrConfig) (mgr *agreementMgr, err error) { + mgr = &agreementMgr{ con: con, ID: con.ID, app: con.app, @@ -117,9 +132,33 @@ func newAgreementMgr(con *Consensus, initRound uint64, bcModule: con.bcModule, ctx: con.ctx, initRound: initRound, - lastEndTime: initRoundBeginTime, processedBAResult: make(map[types.Position]struct{}, maxResultCache), + configs: []agreementMgrConfig{initConfig}, + voteFilter: utils.NewVoteFilter(), } + recv := &consensusBAReceiver{ + consensus: con, + restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, + } + recv.roundValue.Store(uint64(0)) + agr := newAgreement( + mgr.ID, + recv, + newLeaderSelector(genValidLeader(mgr), mgr.logger), + mgr.signer, + mgr.logger) + // Hacky way to initialize first notarySet. + nodes, err := mgr.cache.GetNodeSet(initRound) + if err != nil { + return + } + agr.notarySet = nodes.GetSubSet( + int(initConfig.notarySetSize), types.NewNotarySetTarget(initConfig.crs)) + // Hacky way to make agreement module self contained. + recv.agreementModule = agr + mgr.baModule = agr + return } func (mgr *agreementMgr) run() { @@ -136,7 +175,7 @@ func (mgr *agreementMgr) run() { }() } -func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { +func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig { mgr.lock.RLock() defer mgr.lock.RUnlock() if round < mgr.initRound { @@ -146,7 +185,7 @@ func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { if roundIndex >= uint64(len(mgr.configs)) { return nil } - return mgr.configs[roundIndex] + return &mgr.configs[roundIndex] } func (mgr *agreementMgr) appendConfig( @@ -156,52 +195,12 @@ func (mgr *agreementMgr) appendConfig( if round != uint64(len(mgr.configs))+mgr.initRound { return ErrRoundNotIncreasing } - newConfig := &agreementMgrConfig{ - beginTime: mgr.lastEndTime, - roundInterval: config.RoundInterval, - notarySetSize: config.NotarySetSize, - lambdaBA: config.LambdaBA, - crs: crs, - } - mgr.configs = append(mgr.configs, newConfig) - mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval) - // Prepare modules. - if mgr.baModule != nil { - return nil - } - recv := &consensusBAReceiver{ - consensus: mgr.con, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, - } - recv.roundValue.Store(uint64(0)) - agrModule := newAgreement( - mgr.con.ID, - recv, - newLeaderSelector(genValidLeader(mgr), mgr.logger), - mgr.signer, - mgr.logger) - // Hacky way to initialize first notarySet. - nodes, err := mgr.cache.GetNodeSet(round) - if err != nil { - return err - } - agrModule.notarySet = nodes.GetSubSet( - int(config.NotarySetSize), types.NewNotarySetTarget(crs)) - // Hacky way to make agreement module self contained. - recv.agreementModule = agrModule - mgr.baModule = agrModule - mgr.voteFilter = utils.NewVoteFilter() + mgr.configs = append(mgr.configs, newAgreementMgrConfig( + mgr.configs[len(mgr.configs)-1], config, crs)) return nil } func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { - if v.Position.ChainID > 0 { - mgr.logger.Error("Process vote for unknown chain to BA", - "position", v.Position, - "initRound", mgr.initRound) - return utils.ErrInvalidChainID - } if mgr.voteFilter.Filter(v) { return nil } @@ -212,13 +211,6 @@ func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { } func (mgr *agreementMgr) processBlock(b *types.Block) error { - if b.Position.ChainID > 0 { - mgr.logger.Error("Process block for unknown chain to BA", - "position", b.Position, - "baRound", len(mgr.configs), - "initRound", mgr.initRound) - return utils.ErrInvalidChainID - } return mgr.baModule.processBlock(b) } @@ -247,13 +239,6 @@ func (mgr *agreementMgr) untouchAgreementResult( func (mgr *agreementMgr) processAgreementResult( result *types.AgreementResult) error { - if result.Position.ChainID > 0 { - mgr.logger.Error("Process unknown result for unknown chain to BA", - "position", result.Position, - "baRound", len(mgr.configs), - "initRound", mgr.initRound) - return utils.ErrInvalidChainID - } aID := mgr.baModule.agreementID() if isStop(aID) { return nil @@ -310,13 +295,12 @@ func (mgr *agreementMgr) runBA(initRound uint64) { var ( currentRound uint64 nextRound = initRound + curConfig = mgr.config(initRound) setting = baRoundSetting{ agr: mgr.baModule, recv: mgr.baModule.data.recv.(*consensusBAReceiver), } - roundBeginTime time.Time - roundEndTime time.Time - tickDuration time.Duration + tickDuration time.Duration ) // Check if this routine needs to awake in this round and prepare essential @@ -327,24 +311,20 @@ func (mgr *agreementMgr) runBA(initRound uint64) { nextRound++ }() // Wait until the configuartion for next round is ready. - var config *agreementMgrConfig for { - if config = mgr.getConfig(nextRound); config != nil { + if curConfig = mgr.config(nextRound); curConfig != nil { break } else { mgr.logger.Debug("round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } - // Set next checkpoint. - roundBeginTime = config.beginTime - roundEndTime = config.beginTime.Add(config.roundInterval) // Check if this node in notary set of this chain in this round. - notarySet, err := mgr.cache.GetNotarySet(nextRound, 0) + notarySet, err := mgr.cache.GetNotarySet(nextRound) if err != nil { panic(err) } - setting.crs = config.crs + setting.crs = curConfig.crs setting.notarySet = notarySet _, isNotary = setting.notarySet[mgr.ID] if isNotary { @@ -357,12 +337,12 @@ func (mgr *agreementMgr) runBA(initRound uint64) { "round", nextRound) } // Setup ticker - if tickDuration != config.lambdaBA { + if tickDuration != curConfig.lambdaBA { if setting.ticker != nil { setting.ticker.Stop() } setting.ticker = newTicker(mgr.gov, nextRound, TickerBA) - tickDuration = config.lambdaBA + tickDuration = curConfig.lambdaBA } return } @@ -373,29 +353,13 @@ Loop: break Loop default: } - now := time.Now().UTC() setting.recv.isNotary = checkRound() - // Sleep until round begin. Here a biased round begin time would be - // used instead of the one in config. The reason it to disperse the load - // of fullnodes to verify confirmed blocks from each chain. - if now.Before(pickBiasedTime(roundBeginTime, 4*tickDuration)) { - select { - case <-mgr.ctx.Done(): - break Loop - case <-time.After(roundBeginTime.Sub(now)): - } - // Clean the tick channel after awake: the tick would be queued in - // channel, thus the first few ticks would not tick on expected - // interval. - <-setting.ticker.Tick() - <-setting.ticker.Tick() - } // Run BA for this round. setting.recv.roundValue.Store(currentRound) - setting.recv.changeNotaryTime = roundEndTime + setting.recv.changeNotaryHeight = curConfig.roundEndHeight setting.recv.restartNotary <- types.Position{ - Round: setting.recv.round(), - ChainID: math.MaxUint32, + Round: setting.recv.round(), + Height: math.MaxUint64, } mgr.voteFilter = utils.NewVoteFilter() if err := mgr.baRoutineForOneRound(&setting); err != nil { @@ -450,7 +414,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( if isStop(oldPos) && nextHeight == 0 { break } - if isStop(restartPos) && nextHeight == 0 { + if isStop(restartPos) { break } if nextHeight > restartPos.Height { -- cgit v1.2.3