aboutsummaryrefslogblamecommitdiffstats
path: root/core/agreement-mgr.go
blob: 17def674762f78e2bbdc4249d5344e210010ad8a (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
  
                                                                        



                                                                               
                                                                         




                                                                           
                                                      






                                  
              


              

                                             

                                                                
                                                                             





                                                                                    

                                                                        
                                                                     

                                                                             

 
                          
                      
 

                                                                        

                                                                        


                                                      













                                                                         
                                                                       

















                                                                                   
                              
 




                                   




                                                              
                                              




                                                                         

                                             


              
                            
                        

                                           





                                                                      







                                             
                                     
                                         
                                              
                                    
                                              
                                                     
                                           

                                         
                                        
                              
                                      

 
                                                                     
                                                
                            







                                                    
                                                
                                           
                                                                                     
                                                         
                                                
         
                                        

                                                            
         




                                        

                            
                         


                                                                   


                                                                                 

                      

                                                  
                                                             
                                      
                          
                                   
                                                              










                                                                                            
              

 









                                          
                                                  


           












                                                                 
                                                                   

                                
                                             

                                         
                                                      


                                                   
                                       

 
                                                                                

                               







                                                                              

















                                                                                        
         


                  



                                                                               

                                                

                                                        


                                                       
                                                                   


                                                












                                                                                 

                                                          
                                         
         


                                     
              


                                                             


                                                                                 
                                           

 












                                                                       
         






                                                                    

 

                                                
                                         


                          

                                                                          


                                                                          
                                               
                                                                                            


                                          

                                                                               







                                                                                                    
                         
                 





                                                                         
                                             
                                                                                           


                                  


                                                             


                                                                          



                  








                                                                          




                                              
                                   




                                             
                                                                        


                                                                 




















                                                                                           
                                                                                                 


                                  
                                   





                                                                 

                                            

 
                                                  



                                           
                                                    
                                                
                                          
                                   



                                                                                   
                                           




                                                                        
                     
                                                                                     

                                     
                                                                                          


                                                           


                                                              
                                             
                                                   
                        
                                                                  
                                             
                                                   
                 
                               
                                                       

                                             
                         
                                                                        
                                                         
                 
                                       








                                      
                                                
                                                      
                                                            
                                                        











                                                                                 




                                                         
                                                                         

                                                             
                                                 






                                              

                           
                                   

                                                                                
                                                                                              





                                                              
                                                                           
                                                                     
                                                     
                                                


                                                                                                         






                                                                          
                                                     







                                                                                           






                                                      
                                                                       






                                                                   
                         

                                                                                         


                                                          
                                              
                                           


                                       
                                                                                  




                                                    
                                                                                            

                      






                                      




                                                                     
                                               
                                                  
                                 

                                                  
                                 

                                              
                         



                                                             


                                          


                                          

                        







                                                     





                                                                           


                                                                                       
                                                

                                                  















                                                                
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package core

import (
    "context"
    "errors"
    "math"
    "sync"
    "time"

    lru "github.com/hashicorp/golang-lru"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
    "github.com/dexon-foundation/dexon-consensus/core/utils"
)

// Errors returned from BA modules
var (
    ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished")
    ErrRoundOutOfRange            = errors.New("round out of range")
    ErrInvalidBlock               = errors.New("invalid block")
    ErrNoValidLeader              = errors.New("no valid leader")
    ErrIncorrectCRSSignature      = errors.New("incorrect CRS signature")
    ErrBlockTooOld                = errors.New("block too old")
)

const maxResultCache = 100
const settingLimit = 3

// genValidLeader generate a validLeader function for agreement modules.
func genValidLeader(
    mgr *agreementMgr) validLeaderFn {
    return func(block *types.Block, crs common.Hash) (bool, error) {
        if block.Timestamp.After(time.Now()) {
            return false, nil
        }
        if block.Position.Round >= DKGDelayRound {
            if mgr.recv.npks == nil {
                return false, nil
            }
            if block.Position.Round > mgr.recv.npks.Round {
                return false, nil
            }
            if block.Position.Round < mgr.recv.npks.Round {
                return false, ErrBlockTooOld
            }
        }
        if !utils.VerifyCRSSignature(block, crs, mgr.recv.npks) {
            return false, ErrIncorrectCRSSignature
        }
        if err := mgr.bcModule.sanityCheck(block); err != nil {
            if err == ErrRetrySanityCheckLater {
                return false, nil
            }
            return false, err
        }
        mgr.logger.Debug("Calling Application.VerifyBlock", "block", block)
        switch mgr.app.VerifyBlock(block) {
        case types.VerifyInvalidBlock:
            return false, ErrInvalidBlock
        case types.VerifyRetryLater:
            return false, nil
        default:
        }
        return true, nil
    }
}

type agreementMgrConfig struct {
    utils.RoundBasedConfig

    notarySetSize uint32
    lambdaBA      time.Duration
    crs           common.Hash
}

func (c *agreementMgrConfig) from(
    round uint64, config *types.Config, crs common.Hash) {
    c.notarySetSize = config.NotarySetSize
    c.lambdaBA = config.LambdaBA
    c.crs = crs
    c.SetupRoundBasedFields(round, config)
}

func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config,
    crs common.Hash) (c agreementMgrConfig) {
    c = agreementMgrConfig{}
    c.from(prev.RoundID()+1, config, crs)
    c.AppendTo(prev.RoundBasedConfig)
    return
}

type baRoundSetting struct {
    round     uint64
    dkgSet    map[types.NodeID]struct{}
    threshold int
    ticker    Ticker
    crs       common.Hash
}

type agreementMgr struct {
    // TODO(mission): unbound Consensus instance from this module.
    con               *Consensus
    ID                types.NodeID
    app               Application
    gov               Governance
    network           Network
    logger            common.Logger
    cache             *utils.NodeSetCache
    signer            *utils.Signer
    bcModule          *blockChain
    ctx               context.Context
    configs           []agreementMgrConfig
    baModule          *agreement
    recv              *consensusBAReceiver
    processedBAResult map[types.Position]struct{}
    voteFilter        *utils.VoteFilter
    settingCache      *lru.Cache
    curRoundSetting   *baRoundSetting
    waitGroup         sync.WaitGroup
    isRunning         bool
    lock              sync.RWMutex
}

func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) {
    settingCache, _ := lru.New(settingLimit)
    mgr = &agreementMgr{
        con:               con,
        ID:                con.ID,
        app:               con.app,
        gov:               con.gov,
        network:           con.network,
        logger:            con.logger,
        cache:             con.nodeSetCache,
        signer:            con.signer,
        bcModule:          con.bcModule,
        ctx:               con.ctx,
        processedBAResult: make(map[types.Position]struct{}, maxResultCache),
        voteFilter:        utils.NewVoteFilter(),
        settingCache:      settingCache,
    }
    mgr.recv = &consensusBAReceiver{
        consensus:     con,
        restartNotary: make(chan types.Position, 1),
    }
    return mgr, nil
}

func (mgr *agreementMgr) prepare() {
    round := mgr.bcModule.tipRound()
    agr := newAgreement(
        mgr.ID,
        mgr.recv,
        newLeaderSelector(genValidLeader(mgr), mgr.logger),
        mgr.signer,
        mgr.logger)
    setting := mgr.generateSetting(round)
    if setting == nil {
        mgr.logger.Warn("Unable to prepare init setting", "round", round)
        return
    }
    mgr.curRoundSetting = setting
    agr.notarySet = mgr.curRoundSetting.dkgSet
    // Hacky way to make agreement module self contained.
    mgr.recv.agreementModule = agr
    mgr.baModule = agr
    if round >= DKGDelayRound {
        if _, exist := setting.dkgSet[mgr.ID]; exist {
            mgr.logger.Debug("Preparing signer and npks.", "round", round)
            npk, signer, err := mgr.con.cfgModule.getDKGInfo(round, false)
            if err != nil {
                mgr.logger.Error("Failed to prepare signer and npks.",
                    "round", round,
                    "error", err)
            }
            mgr.logger.Debug("Prepared signer and npks.",
                "round", round, "signer", signer != nil, "npks", npk != nil)
        }
    }
    return
}

func (mgr *agreementMgr) run() {
    mgr.lock.Lock()
    defer mgr.lock.Unlock()
    if mgr.isRunning {
        return
    }
    mgr.isRunning = true
    mgr.waitGroup.Add(1)
    go func() {
        defer mgr.waitGroup.Done()
        mgr.runBA(mgr.bcModule.tipRound())
    }()
}

func (mgr *agreementMgr) calcLeader(
    dkgSet map[types.NodeID]struct{},
    crs common.Hash, pos types.Position) (
    types.NodeID, error) {
    nodeSet := types.NewNodeSetFromMap(dkgSet)
    leader := nodeSet.GetSubSet(1, types.NewNodeLeaderTarget(
        crs, pos.Height))
    for nID := range leader {
        return nID, nil
    }
    return types.NodeID{}, ErrNoValidLeader
}

func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig {
    mgr.lock.RLock()
    defer mgr.lock.RUnlock()
    if round < mgr.configs[0].RoundID() {
        panic(ErrRoundOutOfRange)
    }
    roundIndex := round - mgr.configs[0].RoundID()
    if roundIndex >= uint64(len(mgr.configs)) {
        return nil
    }
    return &mgr.configs[roundIndex]
}

func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error {
    mgr.lock.Lock()
    defer mgr.lock.Unlock()
    apply := func(e utils.RoundEventParam) error {
        if len(mgr.configs) > 0 {
            lastCfg := mgr.configs[len(mgr.configs)-1]
            if e.BeginHeight != lastCfg.RoundEndHeight() {
                return ErrInvalidBlockHeight
            }
            if lastCfg.RoundID() == e.Round {
                mgr.configs[len(mgr.configs)-1].ExtendLength()
            } else if lastCfg.RoundID()+1 == e.Round {
                mgr.configs = append(mgr.configs, newAgreementMgrConfig(
                    lastCfg, e.Config, e.CRS))
            } else {
                return ErrInvalidRoundID
            }
        } else {
            c := agreementMgrConfig{}
            c.from(e.Round, e.Config, e.CRS)
            c.SetRoundBeginHeight(e.BeginHeight)
            mgr.configs = append(mgr.configs, c)
        }
        return nil
    }
    for _, e := range evts {
        if err := apply(e); err != nil {
            return err
        }
    }
    return nil
}

func (mgr *agreementMgr) checkProposer(
    round uint64, proposerID types.NodeID) error {
    if round == mgr.curRoundSetting.round {
        if _, exist := mgr.curRoundSetting.dkgSet[proposerID]; !exist {
            return ErrNotInNotarySet
        }
    } else if round == mgr.curRoundSetting.round+1 {
        setting := mgr.generateSetting(round)
        if setting == nil {
            return ErrConfigurationNotReady
        }
        if _, exist := setting.dkgSet[proposerID]; !exist {
            return ErrNotInNotarySet
        }
    }
    return nil
}

func (mgr *agreementMgr) processVote(v *types.Vote) (err error) {
    if !mgr.recv.isNotary {
        return nil
    }
    if mgr.voteFilter.Filter(v) {
        return nil
    }
    if err := mgr.checkProposer(v.Position.Round, v.ProposerID); err != nil {
        return err
    }
    if err = mgr.baModule.processVote(v); err == nil {
        mgr.baModule.updateFilter(mgr.voteFilter)
        mgr.voteFilter.AddVote(v)
    }
    if err == ErrSkipButNoError {
        err = nil
    }
    return
}

func (mgr *agreementMgr) processBlock(b *types.Block) error {
    if err := mgr.checkProposer(b.Position.Round, b.ProposerID); err != nil {
        return err
    }
    return mgr.baModule.processBlock(b)
}

func (mgr *agreementMgr) touchAgreementResult(
    result *types.AgreementResult) (first bool) {
    // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!!
    if _, exist := mgr.processedBAResult[result.Position]; !exist {
        first = true
        if len(mgr.processedBAResult) > maxResultCache {
            for k := range mgr.processedBAResult {
                // Randomly drop one element.
                delete(mgr.processedBAResult, k)
                break
            }
        }
        mgr.processedBAResult[result.Position] = struct{}{}
    }
    return
}

func (mgr *agreementMgr) untouchAgreementResult(
    result *types.AgreementResult) {
    // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!!
    delete(mgr.processedBAResult, result.Position)
}

func (mgr *agreementMgr) processAgreementResult(
    result *types.AgreementResult) error {
    aID := mgr.baModule.agreementID()
    if isStop(aID) {
        return nil
    }
    if result.Position == aID && !mgr.baModule.confirmed() {
        mgr.logger.Info("Syncing BA", "position", result.Position)
        if result.Position.Round >= DKGDelayRound {
            return mgr.baModule.processAgreementResult(result)
        }
        for key := range result.Votes {
            if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
                return err
            }
        }
    } else if result.Position.Newer(aID) {
        mgr.logger.Info("Fast syncing BA", "position", result.Position)
        if result.Position.Round < DKGDelayRound {
            mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA",
                "hash", result.BlockHash)
            mgr.network.PullBlocks(common.Hashes{result.BlockHash})
            for key := range result.Votes {
                if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
                    return err
                }
            }
        }
        setting := mgr.generateSetting(result.Position.Round)
        if setting == nil {
            mgr.logger.Warn("unable to get setting", "round",
                result.Position.Round)
            return ErrConfigurationNotReady
        }
        mgr.curRoundSetting = setting
        leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position)
        if err != nil {
            return err
        }
        mgr.baModule.restart(
            setting.dkgSet, setting.threshold,
            result.Position, leader, setting.crs)
        if result.Position.Round >= DKGDelayRound {
            return mgr.baModule.processAgreementResult(result)
        }
    }
    return nil
}

func (mgr *agreementMgr) processFinalizedBlock(block *types.Block) error {
    aID := mgr.baModule.agreementID()
    if block.Position.Older(aID) {
        return nil
    }
    mgr.baModule.processFinalizedBlock(block)
    return nil
}

func (mgr *agreementMgr) stop() {
    // Stop all running agreement modules.
    func() {
        mgr.lock.Lock()
        defer mgr.lock.Unlock()
        mgr.baModule.stop()
    }()
    // Block until all routines are done.
    mgr.waitGroup.Wait()
}

func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting {
    if setting, exist := mgr.settingCache.Get(round); exist {
        return setting.(*baRoundSetting)
    }
    curConfig := mgr.config(round)
    if curConfig == nil {
        return nil
    }
    var dkgSet map[types.NodeID]struct{}
    if round >= DKGDelayRound {
        _, qualidifed, err := typesDKG.CalcQualifyNodes(
            mgr.gov.DKGMasterPublicKeys(round),
            mgr.gov.DKGComplaints(round),
            utils.GetDKGThreshold(mgr.gov.Configuration(round)),
        )
        if err != nil {
            mgr.logger.Error("Failed to get gpk", "round", round, "error", err)
            return nil
        }
        dkgSet = qualidifed
    }
    if len(dkgSet) == 0 {
        var err error
        dkgSet, err = mgr.cache.GetNotarySet(round)
        if err != nil {
            mgr.logger.Error("Failed to get notarySet", "round", round, "error", err)
            return nil
        }
    }
    setting := &baRoundSetting{
        crs:    curConfig.crs,
        dkgSet: dkgSet,
        round:  round,
        threshold: utils.GetBAThreshold(&types.Config{
            NotarySetSize: curConfig.notarySetSize}),
    }
    mgr.settingCache.Add(round, setting)
    return setting
}

func (mgr *agreementMgr) runBA(initRound uint64) {
    // These are round based variables.
    var (
        currentRound uint64
        nextRound    = initRound
        curConfig    = mgr.config(initRound)
        setting      = &baRoundSetting{}
        tickDuration time.Duration
        ticker       Ticker
    )

    // Check if this routine needs to awake in this round and prepare essential
    // variables when yes.
    checkRound := func() (isDKG bool) {
        defer func() {
            currentRound = nextRound
            nextRound++
        }()
        // Wait until the configuartion for next round is ready.
        for {
            if setting = mgr.generateSetting(nextRound); setting != nil {
                break
            } else {
                mgr.logger.Debug("Round is not ready", "round", nextRound)
                time.Sleep(1 * time.Second)
            }
        }
        _, isDKG = setting.dkgSet[mgr.ID]
        if isDKG {
            mgr.logger.Info("Selected as dkg set",
                "ID", mgr.ID,
                "round", nextRound)
        } else {
            mgr.logger.Info("Not selected as dkg set",
                "ID", mgr.ID,
                "round", nextRound)
        }
        // Setup ticker
        if tickDuration != curConfig.lambdaBA {
            if ticker != nil {
                ticker.Stop()
            }
            ticker = newTicker(mgr.gov, nextRound, TickerBA)
            tickDuration = curConfig.lambdaBA
        }
        setting.ticker = ticker
        return
    }
Loop:
    for {
        select {
        case <-mgr.ctx.Done():
            break Loop
        default:
        }
        mgr.recv.isNotary = checkRound()
        mgr.voteFilter = utils.NewVoteFilter()
        mgr.voteFilter.Position.Round = currentRound
        mgr.recv.emptyBlockHashMap = &sync.Map{}
        if currentRound >= DKGDelayRound && mgr.recv.isNotary {
            var err error
            mgr.recv.npks, mgr.recv.psigSigner, err =
                mgr.con.cfgModule.getDKGInfo(currentRound, false)
            if err != nil {
                mgr.logger.Warn("cannot get dkg info",
                    "round", currentRound, "error", err)
            }
        } else {
            mgr.recv.npks = nil
            mgr.recv.psigSigner = nil
        }
        // Run BA for this round.
        mgr.recv.restartNotary <- types.Position{
            Round:  currentRound,
            Height: math.MaxUint64,
        }
        if err := mgr.baRoutineForOneRound(setting); err != nil {
            mgr.logger.Error("BA routine failed",
                "error", err,
                "nodeID", mgr.ID)
            break Loop
        }
    }
}

func (mgr *agreementMgr) baRoutineForOneRound(
    setting *baRoundSetting) (err error) {
    agr := mgr.baModule
    recv := mgr.recv
    oldPos := agr.agreementID()
    restart := func(restartPos types.Position) (breakLoop bool, err error) {
        if !isStop(restartPos) {
            if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() {
                for {
                    select {
                    case <-mgr.ctx.Done():
                        break
                    default:
                    }
                    tipRound := mgr.bcModule.tipRound()
                    if tipRound > setting.round {
                        break
                    } else {
                        mgr.logger.Debug("Waiting blockChain to change round...",
                            "curRound", setting.round,
                            "tipRound", tipRound)
                    }
                    time.Sleep(100 * time.Millisecond)
                }
                // This round is finished.
                breakLoop = true
                return
            }
            if restartPos.Older(oldPos) {
                // The restartNotary event is triggered by 'BlockConfirmed'
                // of some older block.
                return
            }
        }
        var nextHeight uint64
        var nextTime time.Time
        for {
            // Make sure we are stoppable.
            select {
            case <-mgr.ctx.Done():
                breakLoop = true
                return
            default:
            }
            nextHeight, nextTime = mgr.bcModule.nextBlock()
            if nextHeight != notReadyHeight {
                if isStop(restartPos) {
                    break
                }
                if nextHeight > restartPos.Height {
                    break
                }
            }
            mgr.logger.Debug("BlockChain not ready!!!",
                "old", oldPos, "restart", restartPos, "next", nextHeight)
            time.Sleep(100 * time.Millisecond)
        }
        nextPos := types.Position{
            Round:  setting.round,
            Height: nextHeight,
        }
        oldPos = nextPos
        var leader types.NodeID
        leader, err = mgr.calcLeader(setting.dkgSet, setting.crs, nextPos)
        if err != nil {
            return
        }
        time.Sleep(nextTime.Sub(time.Now()))
        setting.ticker.Restart()
        agr.restart(setting.dkgSet, setting.threshold, nextPos, leader, setting.crs)
        return
    }
Loop:
    for {
        select {
        case <-mgr.ctx.Done():
            break Loop
        default:
        }
        if agr.confirmed() {
            // Block until receive restartPos
            select {
            case restartPos := <-recv.restartNotary:
                breakLoop, err := restart(restartPos)
                if err != nil {
                    return err
                }
                if breakLoop {
                    break Loop
                }
            case <-mgr.ctx.Done():
                break Loop
            }
        }
        select {
        case restartPos := <-recv.restartNotary:
            breakLoop, err := restart(restartPos)
            if err != nil {
                return err
            }
            if breakLoop {
                break Loop
            }
        default:
        }
        if !mgr.recv.isNotary {
            select {
            case <-setting.ticker.Tick():
                continue Loop
            case <-mgr.ctx.Done():
                break Loop
            }
        }
        if err = agr.nextState(); err != nil {
            mgr.logger.Error("Failed to proceed to next state",
                "nodeID", mgr.ID.String(),
                "error", err)
            break Loop
        }
        if agr.pullVotes() {
            pos := agr.agreementID()
            mgr.logger.Debug("Calling Network.PullVotes for syncing votes",
                "position", pos)
            mgr.network.PullVotes(pos)
        }
        for i := 0; i < agr.clocks(); i++ {
            // Priority select for agreement.done().
            select {
            case <-agr.done():
                continue Loop
            default:
            }
            select {
            case <-agr.done():
                continue Loop
            case <-setting.ticker.Tick():
            }
        }
    }
    return nil
}