// Copyright 2018 The dexon-consensus Authors // This file is part of the dexon-consensus library. // // The dexon-consensus library is free software: you can redistribute it // and/or modify it under the terms of the GNU Lesser General Public License as // published by the Free Software Foundation, either version 3 of the License, // or (at your option) any later version. // // The dexon-consensus library is distributed in the hope that it will be // useful, but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser // General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the dexon-consensus library. If not, see // . package core import ( "context" "errors" "math" "sync" "time" lru "github.com/hashicorp/golang-lru" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" "github.com/dexon-foundation/dexon-consensus/core/utils" ) // Errors returned from BA modules var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") ErrRoundOutOfRange = errors.New("round out of range") ErrInvalidBlock = errors.New("invalid block") ErrNoValidLeader = errors.New("no valid leader") ErrIncorrectCRSSignature = errors.New("incorrect CRS signature") ErrBlockTooOld = errors.New("block too old") ) const maxResultCache = 100 const settingLimit = 3 // genValidLeader generate a validLeader function for agreement modules. func genValidLeader( mgr *agreementMgr) validLeaderFn { return func(block *types.Block, crs common.Hash) (bool, error) { if block.Timestamp.After(time.Now()) { return false, nil } if block.Position.Round >= DKGDelayRound { if mgr.recv.npks == nil { return false, nil } if block.Position.Round > mgr.recv.npks.Round { return false, nil } if block.Position.Round < mgr.recv.npks.Round { return false, ErrBlockTooOld } } if !utils.VerifyCRSSignature(block, crs, mgr.recv.npks) { return false, ErrIncorrectCRSSignature } if err := mgr.bcModule.sanityCheck(block); err != nil { if err == ErrRetrySanityCheckLater { return false, nil } return false, err } mgr.logger.Debug("Calling Application.VerifyBlock", "block", block) switch mgr.app.VerifyBlock(block) { case types.VerifyInvalidBlock: return false, ErrInvalidBlock case types.VerifyRetryLater: return false, nil default: } return true, nil } } type agreementMgrConfig struct { utils.RoundBasedConfig notarySetSize uint32 lambdaBA time.Duration crs common.Hash } func (c *agreementMgrConfig) from( round uint64, config *types.Config, crs common.Hash) { c.notarySetSize = config.NotarySetSize c.lambdaBA = config.LambdaBA c.crs = crs c.SetupRoundBasedFields(round, config) } func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, crs common.Hash) (c agreementMgrConfig) { c = agreementMgrConfig{} c.from(prev.RoundID()+1, config, crs) c.AppendTo(prev.RoundBasedConfig) return } type baRoundSetting struct { round uint64 dkgSet map[types.NodeID]struct{} threshold int ticker Ticker crs common.Hash } type agreementMgr struct { // TODO(mission): unbound Consensus instance from this module. con *Consensus ID types.NodeID app Application gov Governance network Network logger common.Logger cache *utils.NodeSetCache signer *utils.Signer bcModule *blockChain ctx context.Context configs []agreementMgrConfig baModule *agreement recv *consensusBAReceiver processedBAResult map[types.Position]struct{} voteFilter *utils.VoteFilter settingCache *lru.Cache curRoundSetting *baRoundSetting waitGroup sync.WaitGroup isRunning bool lock sync.RWMutex } func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) { settingCache, _ := lru.New(settingLimit) mgr = &agreementMgr{ con: con, ID: con.ID, app: con.app, gov: con.gov, network: con.network, logger: con.logger, cache: con.nodeSetCache, signer: con.signer, bcModule: con.bcModule, ctx: con.ctx, processedBAResult: make(map[types.Position]struct{}, maxResultCache), voteFilter: utils.NewVoteFilter(), settingCache: settingCache, } mgr.recv = &consensusBAReceiver{ consensus: con, restartNotary: make(chan types.Position, 1), } return mgr, nil } func (mgr *agreementMgr) prepare() { round := mgr.bcModule.tipRound() agr := newAgreement( mgr.ID, mgr.recv, newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer, mgr.logger) setting := mgr.generateSetting(round) if setting == nil { mgr.logger.Warn("Unable to prepare init setting", "round", round) return } mgr.curRoundSetting = setting agr.notarySet = mgr.curRoundSetting.dkgSet // Hacky way to make agreement module self contained. mgr.recv.agreementModule = agr mgr.baModule = agr if round >= DKGDelayRound { if _, exist := setting.dkgSet[mgr.ID]; exist { mgr.logger.Debug("Preparing signer and npks.", "round", round) npk, signer, err := mgr.con.cfgModule.getDKGInfo(round, false) if err != nil { mgr.logger.Error("Failed to prepare signer and npks.", "round", round, "error", err) } mgr.logger.Debug("Prepared signer and npks.", "round", round, "signer", signer != nil, "npks", npk != nil) } } return } func (mgr *agreementMgr) run() { mgr.lock.Lock() defer mgr.lock.Unlock() if mgr.isRunning { return } mgr.isRunning = true mgr.waitGroup.Add(1) go func() { defer mgr.waitGroup.Done() mgr.runBA(mgr.bcModule.tipRound()) }() } func (mgr *agreementMgr) calcLeader( dkgSet map[types.NodeID]struct{}, crs common.Hash, pos types.Position) ( types.NodeID, error) { nodeSet := types.NewNodeSetFromMap(dkgSet) leader := nodeSet.GetSubSet(1, types.NewNodeLeaderTarget( crs, pos.Height)) for nID := range leader { return nID, nil } return types.NodeID{}, ErrNoValidLeader } func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig { mgr.lock.RLock() defer mgr.lock.RUnlock() if round < mgr.configs[0].RoundID() { panic(ErrRoundOutOfRange) } roundIndex := round - mgr.configs[0].RoundID() if roundIndex >= uint64(len(mgr.configs)) { return nil } return &mgr.configs[roundIndex] } func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { mgr.lock.Lock() defer mgr.lock.Unlock() apply := func(e utils.RoundEventParam) error { if len(mgr.configs) > 0 { lastCfg := mgr.configs[len(mgr.configs)-1] if e.BeginHeight != lastCfg.RoundEndHeight() { return ErrInvalidBlockHeight } if lastCfg.RoundID() == e.Round { mgr.configs[len(mgr.configs)-1].ExtendLength() } else if lastCfg.RoundID()+1 == e.Round { mgr.configs = append(mgr.configs, newAgreementMgrConfig( lastCfg, e.Config, e.CRS)) } else { return ErrInvalidRoundID } } else { c := agreementMgrConfig{} c.from(e.Round, e.Config, e.CRS) c.SetRoundBeginHeight(e.BeginHeight) mgr.configs = append(mgr.configs, c) } return nil } for _, e := range evts { if err := apply(e); err != nil { return err } } return nil } func (mgr *agreementMgr) checkProposer( round uint64, proposerID types.NodeID) error { if round == mgr.curRoundSetting.round { if _, exist := mgr.curRoundSetting.dkgSet[proposerID]; !exist { return ErrNotInNotarySet } } else if round == mgr.curRoundSetting.round+1 { setting := mgr.generateSetting(round) if setting == nil { return ErrConfigurationNotReady } if _, exist := setting.dkgSet[proposerID]; !exist { return ErrNotInNotarySet } } return nil } func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { if !mgr.recv.isNotary { return nil } if mgr.voteFilter.Filter(v) { return nil } if err := mgr.checkProposer(v.Position.Round, v.ProposerID); err != nil { return err } if err = mgr.baModule.processVote(v); err == nil { mgr.baModule.updateFilter(mgr.voteFilter) mgr.voteFilter.AddVote(v) } if err == ErrSkipButNoError { err = nil } return } func (mgr *agreementMgr) processBlock(b *types.Block) error { if err := mgr.checkProposer(b.Position.Round, b.ProposerID); err != nil { return err } return mgr.baModule.processBlock(b) } func (mgr *agreementMgr) touchAgreementResult( result *types.AgreementResult) (first bool) { // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! if _, exist := mgr.processedBAResult[result.Position]; !exist { first = true if len(mgr.processedBAResult) > maxResultCache { for k := range mgr.processedBAResult { // Randomly drop one element. delete(mgr.processedBAResult, k) break } } mgr.processedBAResult[result.Position] = struct{}{} } return } func (mgr *agreementMgr) untouchAgreementResult( result *types.AgreementResult) { // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! delete(mgr.processedBAResult, result.Position) } func (mgr *agreementMgr) processAgreementResult( result *types.AgreementResult) error { aID := mgr.baModule.agreementID() if isStop(aID) { return nil } if result.Position == aID && !mgr.baModule.confirmed() { mgr.logger.Info("Syncing BA", "position", result.Position) if result.Position.Round >= DKGDelayRound { return mgr.baModule.processAgreementResult(result) } for key := range result.Votes { if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err } } } else if result.Position.Newer(aID) { mgr.logger.Info("Fast syncing BA", "position", result.Position) if result.Position.Round < DKGDelayRound { mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA", "hash", result.BlockHash) mgr.network.PullBlocks(common.Hashes{result.BlockHash}) for key := range result.Votes { if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { return err } } } setting := mgr.generateSetting(result.Position.Round) if setting == nil { mgr.logger.Warn("unable to get setting", "round", result.Position.Round) return ErrConfigurationNotReady } mgr.curRoundSetting = setting leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position) if err != nil { return err } mgr.baModule.restart( setting.dkgSet, setting.threshold, result.Position, leader, setting.crs) if result.Position.Round >= DKGDelayRound { return mgr.baModule.processAgreementResult(result) } } return nil } func (mgr *agreementMgr) processFinalizedBlock(block *types.Block) error { aID := mgr.baModule.agreementID() if block.Position.Older(aID) { return nil } mgr.baModule.processFinalizedBlock(block) return nil } func (mgr *agreementMgr) stop() { // Stop all running agreement modules. func() { mgr.lock.Lock() defer mgr.lock.Unlock() mgr.baModule.stop() }() // Block until all routines are done. mgr.waitGroup.Wait() } func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting { if setting, exist := mgr.settingCache.Get(round); exist { return setting.(*baRoundSetting) } curConfig := mgr.config(round) if curConfig == nil { return nil } var dkgSet map[types.NodeID]struct{} if round >= DKGDelayRound { _, qualidifed, err := typesDKG.CalcQualifyNodes( mgr.gov.DKGMasterPublicKeys(round), mgr.gov.DKGComplaints(round), utils.GetDKGThreshold(mgr.gov.Configuration(round)), ) if err != nil { mgr.logger.Error("Failed to get gpk", "round", round, "error", err) return nil } dkgSet = qualidifed } if len(dkgSet) == 0 { var err error dkgSet, err = mgr.cache.GetNotarySet(round) if err != nil { mgr.logger.Error("Failed to get notarySet", "round", round, "error", err) return nil } } setting := &baRoundSetting{ crs: curConfig.crs, dkgSet: dkgSet, round: round, threshold: utils.GetBAThreshold(&types.Config{ NotarySetSize: curConfig.notarySetSize}), } mgr.settingCache.Add(round, setting) return setting } func (mgr *agreementMgr) runBA(initRound uint64) { // These are round based variables. var ( currentRound uint64 nextRound = initRound curConfig = mgr.config(initRound) setting = &baRoundSetting{} tickDuration time.Duration ticker Ticker ) // Check if this routine needs to awake in this round and prepare essential // variables when yes. checkRound := func() (isDKG bool) { defer func() { currentRound = nextRound nextRound++ }() // Wait until the configuartion for next round is ready. for { if setting = mgr.generateSetting(nextRound); setting != nil { break } else { mgr.logger.Debug("Round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } _, isDKG = setting.dkgSet[mgr.ID] if isDKG { mgr.logger.Info("Selected as dkg set", "ID", mgr.ID, "round", nextRound) } else { mgr.logger.Info("Not selected as dkg set", "ID", mgr.ID, "round", nextRound) } // Setup ticker if tickDuration != curConfig.lambdaBA { if ticker != nil { ticker.Stop() } ticker = newTicker(mgr.gov, nextRound, TickerBA) tickDuration = curConfig.lambdaBA } setting.ticker = ticker return } Loop: for { select { case <-mgr.ctx.Done(): break Loop default: } mgr.recv.isNotary = checkRound() mgr.voteFilter = utils.NewVoteFilter() mgr.voteFilter.Position.Round = currentRound mgr.recv.emptyBlockHashMap = &sync.Map{} if currentRound >= DKGDelayRound && mgr.recv.isNotary { var err error mgr.recv.npks, mgr.recv.psigSigner, err = mgr.con.cfgModule.getDKGInfo(currentRound, false) if err != nil { mgr.logger.Warn("cannot get dkg info", "round", currentRound, "error", err) } } else { mgr.recv.npks = nil mgr.recv.psigSigner = nil } // Run BA for this round. mgr.recv.restartNotary <- types.Position{ Round: currentRound, Height: math.MaxUint64, } if err := mgr.baRoutineForOneRound(setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, "nodeID", mgr.ID) break Loop } } } func (mgr *agreementMgr) baRoutineForOneRound( setting *baRoundSetting) (err error) { agr := mgr.baModule recv := mgr.recv oldPos := agr.agreementID() restart := func(restartPos types.Position) (breakLoop bool, err error) { if !isStop(restartPos) { if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() { for { select { case <-mgr.ctx.Done(): break default: } tipRound := mgr.bcModule.tipRound() if tipRound > setting.round { break } else { mgr.logger.Debug("Waiting blockChain to change round...", "curRound", setting.round, "tipRound", tipRound) } time.Sleep(100 * time.Millisecond) } // This round is finished. breakLoop = true return } if restartPos.Older(oldPos) { // The restartNotary event is triggered by 'BlockConfirmed' // of some older block. return } } var nextHeight uint64 var nextTime time.Time for { // Make sure we are stoppable. select { case <-mgr.ctx.Done(): breakLoop = true return default: } nextHeight, nextTime = mgr.bcModule.nextBlock() if nextHeight != notReadyHeight { if isStop(restartPos) { break } if nextHeight > restartPos.Height { break } } mgr.logger.Debug("BlockChain not ready!!!", "old", oldPos, "restart", restartPos, "next", nextHeight) time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ Round: setting.round, Height: nextHeight, } oldPos = nextPos var leader types.NodeID leader, err = mgr.calcLeader(setting.dkgSet, setting.crs, nextPos) if err != nil { return } time.Sleep(nextTime.Sub(time.Now())) setting.ticker.Restart() agr.restart(setting.dkgSet, setting.threshold, nextPos, leader, setting.crs) return } Loop: for { select { case <-mgr.ctx.Done(): break Loop default: } if agr.confirmed() { // Block until receive restartPos select { case restartPos := <-recv.restartNotary: breakLoop, err := restart(restartPos) if err != nil { return err } if breakLoop { break Loop } case <-mgr.ctx.Done(): break Loop } } select { case restartPos := <-recv.restartNotary: breakLoop, err := restart(restartPos) if err != nil { return err } if breakLoop { break Loop } default: } if !mgr.recv.isNotary { select { case <-setting.ticker.Tick(): continue Loop case <-mgr.ctx.Done(): break Loop } } if err = agr.nextState(); err != nil { mgr.logger.Error("Failed to proceed to next state", "nodeID", mgr.ID.String(), "error", err) break Loop } if agr.pullVotes() { pos := agr.agreementID() mgr.logger.Debug("Calling Network.PullVotes for syncing votes", "position", pos) mgr.network.PullVotes(pos) } for i := 0; i < agr.clocks(); i++ { // Priority select for agreement.done(). select { case <-agr.done(): continue Loop default: } select { case <-agr.done(): continue Loop case <-setting.ticker.Tick(): } } } return nil }