// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus library. // // The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus library. If not, see // . package core import ( "context" "errors" "math" "sync" "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" ) // Errors returned from BA modules var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") ) // genValidLeader generate a validLeader function for agreement modules. func genValidLeader( mgr *agreementMgr) func(*types.Block) (bool, error) { return func(block *types.Block) (bool, error) { if block.Timestamp.After(time.Now()) { return false, nil } if err := mgr.lattice.SanityCheck(block); err != nil { if err == ErrRetrySanityCheckLater { return false, nil } return false, err } mgr.logger.Debug("Calling Application.VerifyBlock", "block", block) switch mgr.app.VerifyBlock(block) { case types.VerifyInvalidBlock: return false, ErrInvalidBlock case types.VerifyRetryLater: return false, nil default: } return true, nil } } type agreementMgrConfig struct { beginTime time.Time numChains uint32 roundInterval time.Duration notarySetSize uint32 lambdaBA time.Duration crs common.Hash } type baRoundSetting struct { chainID uint32 notarySet map[types.NodeID]struct{} agr *agreement recv *consensusBAReceiver ticker Ticker crs common.Hash } type agreementMgr struct { // TODO(mission): unbound Consensus instance from this module. con *Consensus ID types.NodeID app Application gov Governance network Network logger common.Logger cache *utils.NodeSetCache signer *utils.Signer lattice *Lattice ctx context.Context lastEndTime time.Time initRound uint64 configs []*agreementMgrConfig baModules []*agreement waitGroup sync.WaitGroup pendingVotes map[uint64][]*types.Vote pendingBlocks map[uint64][]*types.Block isRunning bool // This lock should be used when attempting to: // - add a new baModule. // - remove all baModules when stopping. In this case, the cleaner need // to wait for all routines runnning baModules finished. // - access a method of baModule. // - append a config from new round. // The routine running corresponding baModule, however, doesn't have to // acquire this lock. lock sync.RWMutex } func newAgreementMgr(con *Consensus, initRound uint64, initRoundBeginTime time.Time) *agreementMgr { return &agreementMgr{ con: con, ID: con.ID, app: con.app, gov: con.gov, network: con.network, logger: con.logger, cache: con.nodeSetCache, signer: con.signer, lattice: con.lattice, ctx: con.ctx, initRound: initRound, lastEndTime: initRoundBeginTime, } } func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { mgr.lock.RLock() defer mgr.lock.RUnlock() if round < mgr.initRound { panic(ErrRoundOutOfRange) } roundIndex := round - mgr.initRound if roundIndex >= uint64(len(mgr.configs)) { return nil } return mgr.configs[roundIndex] } func (mgr *agreementMgr) run() { mgr.lock.Lock() defer mgr.lock.Unlock() if mgr.isRunning { return } mgr.isRunning = true for i := uint32(0); i < uint32(len(mgr.baModules)); i++ { go mgr.runBA(mgr.initRound, i) } } func (mgr *agreementMgr) appendConfig( round uint64, config *types.Config, crs common.Hash) (err error) { mgr.lock.Lock() defer mgr.lock.Unlock() if round != uint64(len(mgr.configs))+mgr.initRound { return ErrRoundNotIncreasing } newConfig := &agreementMgrConfig{ beginTime: mgr.lastEndTime, numChains: config.NumChains, roundInterval: config.RoundInterval, notarySetSize: config.NotarySetSize, lambdaBA: config.LambdaBA, crs: crs, } mgr.configs = append(mgr.configs, newConfig) mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval) // Create baModule for newly added chain. for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ { // Prepare modules. recv := &consensusBAReceiver{ consensus: mgr.con, chainID: i, restartNotary: make(chan types.Position, 1), } agrModule := newAgreement( mgr.con.ID, recv, newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer) // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) if mgr.isRunning { go mgr.runBA(round, i) } } return nil } func (mgr *agreementMgr) processVote(v *types.Vote) error { v = v.Clone() mgr.lock.RLock() defer mgr.lock.RUnlock() if v.Position.ChainID >= uint32(len(mgr.baModules)) { mgr.logger.Error("Process vote for unknown chain to BA", "position", &v.Position, "baChain", len(mgr.baModules), "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } return mgr.baModules[v.Position.ChainID].processVote(v) } func (mgr *agreementMgr) processBlock(b *types.Block) error { mgr.lock.RLock() defer mgr.lock.RUnlock() if b.Position.ChainID >= uint32(len(mgr.baModules)) { mgr.logger.Error("Process block for unknown chain to BA", "position", &b.Position, "baChain", len(mgr.baModules), "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } return mgr.baModules[b.Position.ChainID].processBlock(b) } func (mgr *agreementMgr) processAgreementResult( result *types.AgreementResult) error { mgr.lock.RLock() defer mgr.lock.RUnlock() if result.Position.ChainID >= uint32(len(mgr.baModules)) { mgr.logger.Error("Process unknown result for unknown chain to BA", "position", &result.Position, "baChain", len(mgr.baModules), "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } agreement := mgr.baModules[result.Position.ChainID] aID := agreement.agreementID() if isStop(aID) { return nil } if result.Position.Newer(&aID) { mgr.logger.Info("Syncing BA", "position", &result.Position) nodes, err := mgr.cache.GetNodeSet(result.Position.Round) if err != nil { return err } mgr.logger.Debug("Calling Network.PullBlocks for syncing BA", "hash", result.BlockHash) mgr.network.PullBlocks(common.Hashes{result.BlockHash}) mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) nIDs := nodes.GetSubSet( int(utils.GetConfigWithPanic( mgr.gov, result.Position.Round, mgr.logger).NotarySetSize), types.NewNotarySetTarget(crs, result.Position.ChainID)) for key := range result.Votes { if err := agreement.processVote(&result.Votes[key]); err != nil { return err } } agreement.restart(nIDs, result.Position, crs) } return nil } func (mgr *agreementMgr) stop() { // Stop all running agreement modules. func() { mgr.lock.Lock() defer mgr.lock.Unlock() for _, agr := range mgr.baModules { agr.stop() } }() // Block until all routines are done. mgr.waitGroup.Wait() } func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { mgr.waitGroup.Add(1) defer mgr.waitGroup.Done() // Acquire agreement module. agr, recv := func() (*agreement, *consensusBAReceiver) { mgr.lock.RLock() defer mgr.lock.RUnlock() agr := mgr.baModules[chainID] return agr, agr.data.recv.(*consensusBAReceiver) }() // These are round based variables. var ( currentRound uint64 nextRound = initRound setting = baRoundSetting{ chainID: chainID, agr: agr, recv: recv, } roundBeginTime time.Time roundEndTime time.Time tickDuration time.Duration ) // Check if this routine needs to awake in this round and prepare essential // variables when yes. checkRound := func() (isNotary, isDisabled bool) { defer func() { currentRound = nextRound nextRound++ }() // Wait until the configuartion for next round is ready. var config *agreementMgrConfig for { if config = mgr.getConfig(nextRound); config != nil { break } else { mgr.logger.Info("round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } // Set next checkpoint. roundBeginTime = config.beginTime roundEndTime = config.beginTime.Add(config.roundInterval) // Check if this chain handled by this routine included in this round. if chainID >= config.numChains { isDisabled = true return } // Check if this node in notary set of this chain in this round. nodeSet, err := mgr.cache.GetNodeSet(nextRound) if err != nil { panic(err) } setting.crs = config.crs setting.notarySet = nodeSet.GetSubSet( int(config.notarySetSize), types.NewNotarySetTarget(config.crs, chainID)) _, isNotary = setting.notarySet[mgr.ID] if isNotary { mgr.logger.Info("selected as notary set", "ID", mgr.ID, "round", nextRound, "chainID", chainID) } else { mgr.logger.Info("not selected as notary set", "ID", mgr.ID, "round", nextRound, "chainID", chainID) } // Setup ticker if tickDuration != config.lambdaBA { if setting.ticker != nil { setting.ticker.Stop() } setting.ticker = newTicker(mgr.gov, nextRound, TickerBA) tickDuration = config.lambdaBA } return } Loop: for { select { case <-mgr.ctx.Done(): break Loop default: } now := time.Now().UTC() var isDisabled bool setting.recv.isNotary, isDisabled = checkRound() if isDisabled { select { case <-mgr.ctx.Done(): break Loop case <-time.After(roundEndTime.Sub(now)): continue Loop } } // Sleep until round begin. Here a biased round begin time would be // used instead of the one in config. The reason it to disperse the load // of fullnodes to verify confirmed blocks from each chain. if now.Before(pickBiasedTime(roundBeginTime, 4*tickDuration)) { select { case <-mgr.ctx.Done(): break Loop case <-time.After(roundBeginTime.Sub(now)): } // Clean the tick channel after awake: the tick would be queued in // channel, thus the first few ticks would not tick on expected // interval. <-setting.ticker.Tick() <-setting.ticker.Tick() } // Run BA for this round. recv.round = currentRound recv.changeNotaryTime = roundEndTime recv.restartNotary <- types.Position{ChainID: math.MaxUint32} if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, "nodeID", mgr.ID, "chain", chainID) break Loop } } } func (mgr *agreementMgr) baRoutineForOneRound( setting *baRoundSetting) (err error) { agr := setting.agr recv := setting.recv oldPos := agr.agreementID() Loop: for { select { case <-mgr.ctx.Done(): break Loop default: } select { case restartPos := <-recv.restartNotary: if !isStop(restartPos) { if restartPos.Round > oldPos.Round { // This round is finished. break Loop } if restartPos.Older(&oldPos) { // The restartNotary event is triggered by 'BlockConfirmed' // of some older block. break } } var nextHeight uint64 for { nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) if err != nil { mgr.logger.Debug("Error getting next height", "error", err, "round", recv.round, "chainID", setting.chainID) err = nil nextHeight = oldPos.Height } if isStop(oldPos) || nextHeight == 0 { break } if nextHeight > oldPos.Height { break } mgr.logger.Debug("Lattice not ready!!!", "old", &oldPos, "next", nextHeight) time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ Round: recv.round, ChainID: setting.chainID, Height: nextHeight, } oldPos = nextPos agr.restart(setting.notarySet, nextPos, setting.crs) default: } if agr.pullVotes() { pos := agr.agreementID() mgr.logger.Debug("Calling Network.PullVotes for syncing votes", "position", &pos) mgr.network.PullVotes(pos) } if err = agr.nextState(); err != nil { mgr.logger.Error("Failed to proceed to next state", "nodeID", mgr.ID.String(), "error", err) break Loop } for i := 0; i < agr.clocks(); i++ { // Priority select for agreement.done(). select { case <-agr.done(): continue Loop default: } select { case <-agr.done(): continue Loop case <-setting.ticker.Tick(): } } } return nil }