diff options
Diffstat (limited to 'core/agreement-mgr.go')
-rw-r--r-- | core/agreement-mgr.go | 260 |
1 files changed, 93 insertions, 167 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index c3692d8..5f5b9ae 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -33,6 +33,8 @@ import ( // Errors returned from BA modules var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") + ErrRoundOutOfRange = errors.New("round out of range") + ErrInvalidBlock = errors.New("invalid block") ) const maxResultCache = 100 @@ -44,7 +46,7 @@ func genValidLeader( if block.Timestamp.After(time.Now()) { return false, nil } - if err := mgr.lattice.SanityCheck(block, true); err != nil { + if err := mgr.bcModule.sanityCheck(block); err != nil { if err == ErrRetrySanityCheckLater { return false, nil } @@ -64,7 +66,6 @@ func genValidLeader( type agreementMgrConfig struct { beginTime time.Time - numChains uint32 roundInterval time.Duration notarySetSize uint32 lambdaBA time.Duration @@ -72,7 +73,6 @@ type agreementMgrConfig struct { } type baRoundSetting struct { - chainID uint32 notarySet map[types.NodeID]struct{} agr *agreement recv *consensusBAReceiver @@ -90,28 +90,17 @@ type agreementMgr struct { logger common.Logger cache *utils.NodeSetCache signer *utils.Signer - lattice *Lattice + bcModule *blockChain ctx context.Context lastEndTime time.Time initRound uint64 configs []*agreementMgrConfig - baModules []*agreement + baModule *agreement processedBAResult map[types.Position]struct{} - voteFilters []*utils.VoteFilter + voteFilter *utils.VoteFilter waitGroup sync.WaitGroup - pendingVotes map[uint64][]*types.Vote - pendingBlocks map[uint64][]*types.Block isRunning bool - - // This lock should be used when attempting to: - // - add a new baModule. - // - remove all baModules when stopping. In this case, the cleaner need - // to wait for all routines runnning baModules finished. - // - access a method of baModule. - // - append a config from new round. - // The routine running corresponding baModule, however, doesn't have to - // acquire this lock. - lock sync.RWMutex + lock sync.RWMutex } func newAgreementMgr(con *Consensus, initRound uint64, @@ -125,7 +114,7 @@ func newAgreementMgr(con *Consensus, initRound uint64, logger: con.logger, cache: con.nodeSetCache, signer: con.signer, - lattice: con.lattice, + bcModule: con.bcModule, ctx: con.ctx, initRound: initRound, lastEndTime: initRoundBeginTime, @@ -133,6 +122,20 @@ func newAgreementMgr(con *Consensus, initRound uint64, } } +func (mgr *agreementMgr) run() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + if mgr.isRunning { + return + } + mgr.isRunning = true + mgr.waitGroup.Add(1) + go func() { + defer mgr.waitGroup.Done() + mgr.runBA(mgr.initRound) + }() +} + func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { mgr.lock.RLock() defer mgr.lock.RUnlock() @@ -146,22 +149,6 @@ func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { return mgr.configs[roundIndex] } -func (mgr *agreementMgr) run() { - mgr.lock.Lock() - defer mgr.lock.Unlock() - if mgr.isRunning { - return - } - mgr.isRunning = true - for i := uint32(0); i < uint32(len(mgr.baModules)); i++ { - mgr.waitGroup.Add(1) - go func(idx uint32) { - defer mgr.waitGroup.Done() - mgr.runBA(mgr.initRound, idx) - }(i) - } -} - func (mgr *agreementMgr) appendConfig( round uint64, config *types.Config, crs common.Hash) (err error) { mgr.lock.Lock() @@ -171,7 +158,6 @@ func (mgr *agreementMgr) appendConfig( } newConfig := &agreementMgrConfig{ beginTime: mgr.lastEndTime, - numChains: config.NumChains, roundInterval: config.RoundInterval, notarySetSize: config.NotarySetSize, lambdaBA: config.LambdaBA, @@ -179,80 +165,61 @@ func (mgr *agreementMgr) appendConfig( } mgr.configs = append(mgr.configs, newConfig) mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval) - // Create baModule for newly added chain. - for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ { - // Prepare modules. - recv := &consensusBAReceiver{ - consensus: mgr.con, - chainID: i, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, - } - recv.roundValue.Store(uint64(0)) - agrModule := newAgreement( - mgr.con.ID, - recv, - newLeaderSelector(genValidLeader(mgr), mgr.logger), - mgr.signer, - mgr.logger) - // Hacky way to initialize first notarySet. - nodes, err := mgr.cache.GetNodeSet(round) - if err != nil { - return err - } - agrModule.notarySet = nodes.GetSubSet( - int(config.NotarySetSize), - types.NewNotarySetTarget(crs, i)) - // Hacky way to make agreement module self contained. - recv.agreementModule = agrModule - mgr.baModules = append(mgr.baModules, agrModule) - mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter()) - if mgr.isRunning { - mgr.waitGroup.Add(1) - go func(idx uint32) { - defer mgr.waitGroup.Done() - mgr.runBA(round, idx) - }(i) - } + // Prepare modules. + if mgr.baModule != nil { + return nil + } + recv := &consensusBAReceiver{ + consensus: mgr.con, + restartNotary: make(chan types.Position, 1), + roundValue: &atomic.Value{}, } + recv.roundValue.Store(uint64(0)) + agrModule := newAgreement( + mgr.con.ID, + recv, + newLeaderSelector(genValidLeader(mgr), mgr.logger), + mgr.signer, + mgr.logger) + // Hacky way to initialize first notarySet. + nodes, err := mgr.cache.GetNodeSet(round) + if err != nil { + return err + } + agrModule.notarySet = nodes.GetSubSet( + int(config.NotarySetSize), types.NewNotarySetTarget(crs)) + // Hacky way to make agreement module self contained. + recv.agreementModule = agrModule + mgr.baModule = agrModule + mgr.voteFilter = utils.NewVoteFilter() return nil } -func (mgr *agreementMgr) processVote(v *types.Vote) error { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if v.Position.ChainID >= uint32(len(mgr.baModules)) { +func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { + if v.Position.ChainID > 0 { mgr.logger.Error("Process vote for unknown chain to BA", - "position", &v.Position, - "baChain", len(mgr.baModules), - "baRound", len(mgr.configs), + "position", v.Position, "initRound", mgr.initRound) return utils.ErrInvalidChainID } - filter := mgr.voteFilters[v.Position.ChainID] - if filter.Filter(v) { + if mgr.voteFilter.Filter(v) { return nil } - v = v.Clone() - err := mgr.baModules[v.Position.ChainID].processVote(v) - if err == nil { - mgr.baModules[v.Position.ChainID].updateFilter(filter) + if err = mgr.baModule.processVote(v); err == nil { + mgr.baModule.updateFilter(mgr.voteFilter) } - return err + return } func (mgr *agreementMgr) processBlock(b *types.Block) error { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if b.Position.ChainID >= uint32(len(mgr.baModules)) { + if b.Position.ChainID > 0 { mgr.logger.Error("Process block for unknown chain to BA", - "position", &b.Position, - "baChain", len(mgr.baModules), + "position", b.Position, "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } - return mgr.baModules[b.Position.ChainID].processBlock(b) + return mgr.baModule.processBlock(b) } func (mgr *agreementMgr) touchAgreementResult( @@ -280,30 +247,26 @@ func (mgr *agreementMgr) untouchAgreementResult( func (mgr *agreementMgr) processAgreementResult( result *types.AgreementResult) error { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if result.Position.ChainID >= uint32(len(mgr.baModules)) { + if result.Position.ChainID > 0 { mgr.logger.Error("Process unknown result for unknown chain to BA", - "position", &result.Position, - "baChain", len(mgr.baModules), + "position", result.Position, "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } - agreement := mgr.baModules[result.Position.ChainID] - aID := agreement.agreementID() + aID := mgr.baModule.agreementID() if isStop(aID) { return nil } - if result.Position == aID && !agreement.confirmed() { - mgr.logger.Info("Syncing BA", "position", &result.Position) + if result.Position == aID && !mgr.baModule.confirmed() { + mgr.logger.Info("Syncing BA", "position", result.Position) for key := range result.Votes { - if err := agreement.processVote(&result.Votes[key]); err != nil { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err } } - } else if result.Position.Newer(&aID) { - mgr.logger.Info("Fast syncing BA", "position", &result.Position) + } else if result.Position.Newer(aID) { + mgr.logger.Info("Fast syncing BA", "position", result.Position) nodes, err := mgr.cache.GetNodeSet(result.Position.Round) if err != nil { return err @@ -316,9 +279,9 @@ func (mgr *agreementMgr) processAgreementResult( nIDs := nodes.GetSubSet( int(utils.GetConfigWithPanic( mgr.gov, result.Position.Round, mgr.logger).NotarySetSize), - types.NewNotarySetTarget(crs, result.Position.ChainID)) + types.NewNotarySetTarget(crs)) for key := range result.Votes { - if err := agreement.processVote(&result.Votes[key]); err != nil { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err } } @@ -326,7 +289,7 @@ func (mgr *agreementMgr) processAgreementResult( if err != nil { return err } - agreement.restart(nIDs, result.Position, leader, crs) + mgr.baModule.restart(nIDs, result.Position, leader, crs) } return nil } @@ -336,30 +299,20 @@ func (mgr *agreementMgr) stop() { func() { mgr.lock.Lock() defer mgr.lock.Unlock() - for _, agr := range mgr.baModules { - agr.stop() - } + mgr.baModule.stop() }() // Block until all routines are done. mgr.waitGroup.Wait() } -func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { - // Acquire agreement module. - agr, recv := func() (*agreement, *consensusBAReceiver) { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - agr := mgr.baModules[chainID] - return agr, agr.data.recv.(*consensusBAReceiver) - }() +func (mgr *agreementMgr) runBA(initRound uint64) { // These are round based variables. var ( currentRound uint64 nextRound = initRound setting = baRoundSetting{ - chainID: chainID, - agr: agr, - recv: recv, + agr: mgr.baModule, + recv: mgr.baModule.data.recv.(*consensusBAReceiver), } roundBeginTime time.Time roundEndTime time.Time @@ -368,7 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Check if this routine needs to awake in this round and prepare essential // variables when yes. - checkRound := func() (isNotary, isDisabled bool) { + checkRound := func() (isNotary bool) { defer func() { currentRound = nextRound nextRound++ @@ -386,13 +339,8 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Set next checkpoint. roundBeginTime = config.beginTime roundEndTime = config.beginTime.Add(config.roundInterval) - // Check if this chain handled by this routine included in this round. - if chainID >= config.numChains { - isDisabled = true - return - } // Check if this node in notary set of this chain in this round. - notarySet, err := mgr.cache.GetNotarySet(nextRound, chainID) + notarySet, err := mgr.cache.GetNotarySet(nextRound, 0) if err != nil { panic(err) } @@ -402,13 +350,11 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { if isNotary { mgr.logger.Info("selected as notary set", "ID", mgr.ID, - "round", nextRound, - "chainID", chainID) + "round", nextRound) } else { mgr.logger.Info("not selected as notary set", "ID", mgr.ID, - "round", nextRound, - "chainID", chainID) + "round", nextRound) } // Setup ticker if tickDuration != config.lambdaBA { @@ -428,16 +374,7 @@ Loop: default: } now := time.Now().UTC() - var isDisabled bool - setting.recv.isNotary, isDisabled = checkRound() - if isDisabled { - select { - case <-mgr.ctx.Done(): - break Loop - case <-time.After(roundEndTime.Sub(now)): - continue Loop - } - } + setting.recv.isNotary = checkRound() // Sleep until round begin. Here a biased round begin time would be // used instead of the one in config. The reason it to disperse the load // of fullnodes to verify confirmed blocks from each chain. @@ -454,18 +391,17 @@ Loop: <-setting.ticker.Tick() } // Run BA for this round. - recv.roundValue.Store(currentRound) - recv.changeNotaryTime = roundEndTime - recv.restartNotary <- types.Position{ + setting.recv.roundValue.Store(currentRound) + setting.recv.changeNotaryTime = roundEndTime + setting.recv.restartNotary <- types.Position{ Round: setting.recv.round(), ChainID: math.MaxUint32, } - mgr.voteFilters[chainID] = utils.NewVoteFilter() + mgr.voteFilter = utils.NewVoteFilter() if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, - "nodeID", mgr.ID, - "chain", chainID) + "nodeID", mgr.ID) break Loop } } @@ -485,13 +421,13 @@ func (mgr *agreementMgr) baRoutineForOneRound( break default: } - tipRound := mgr.lattice.TipRound(setting.chainID) + tipRound := mgr.bcModule.tipRound() if tipRound > restartPos.Round { // It's a vary rare that this go routine sleeps for entire round. break } else if tipRound != restartPos.Round { - mgr.logger.Debug("Waiting lattice to change round...", - "pos", &restartPos) + mgr.logger.Debug("Waiting blockChain to change round...", + "pos", restartPos) } else { break } @@ -501,7 +437,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( breakLoop = true return } - if restartPos.Older(&oldPos) { + if restartPos.Older(oldPos) { // The restartNotary event is triggered by 'BlockConfirmed' // of some older block. return @@ -510,16 +446,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( var nextHeight uint64 var nextTime time.Time for { - nextHeight, nextTime, err = - mgr.lattice.NextBlock(recv.round(), setting.chainID) - if err != nil { - mgr.logger.Debug("Error getting next height", - "error", err, - "round", recv.round(), - "chainID", setting.chainID) - err = nil - nextHeight = restartPos.Height - } + nextHeight, nextTime = mgr.bcModule.nextBlock() if isStop(oldPos) && nextHeight == 0 { break } @@ -529,14 +456,13 @@ func (mgr *agreementMgr) baRoutineForOneRound( if nextHeight > restartPos.Height { break } - mgr.logger.Debug("Lattice not ready!!!", - "old", &oldPos, "restart", &restartPos, "next", nextHeight) + mgr.logger.Debug("BlockChain not ready!!!", + "old", oldPos, "restart", restartPos, "next", nextHeight) time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round(), - ChainID: setting.chainID, - Height: nextHeight, + Round: recv.round(), + Height: nextHeight, } oldPos = nextPos var leader types.NodeID @@ -591,7 +517,7 @@ Loop: if agr.pullVotes() { pos := agr.agreementID() mgr.logger.Debug("Calling Network.PullVotes for syncing votes", - "position", &pos) + "position", pos) mgr.network.PullVotes(pos) } for i := 0; i < agr.clocks(); i++ { |