aboutsummaryrefslogblamecommitdiffstats
path: root/core/syncer/consensus.go
blob: 75c106793fc87ca1d3624af08a243f5dba37ba1d (plain) (tree)



























                                                                               
                                                                 
                                                             






                                                                

                                                                  

                                                                         

                                                                              
                                                                


                                                                                 



                                                                                



                                             
                                






                                        
                                            
 



                                                
                                                                         




                                               








                                             

                                          


                                             






                                                                     
                       










                                                            
                                                                   

                                        
                                                           


                                                                 



                                                                                      





                                                                         



                                                               



                                                                  
                                                          
                                                 





                                   
                            



                             
                                                           

                                




                                                                     





                                                          





                                                          

 
                                                                          

                                
             

                                                                


                                                                






                                                            










                                                                       
                              
                 
                                                                     







                                                                                 






                                                                                

                                          





                                                                         

                               



                                                                        


                                      














































                                                                                       
                                                             

                                                    

                                                               







                                                      

                                          




                                                               

                                                                                        
                     
                                                                          
                                                                         
                                                            









                                                                                    
                                                                



                                                                                 
                                                                                    





                                                                                      


                                                                                     






                                                                                
                                                                                   














                                                                                
                                                                                   


















                                                                                       
                                                                                   

















                                                                       



                                                                       



                                                                  


                                                                               









                                                                            



                                                                              
                                                                 
                                 






                                                                      
                                       

                                                    

                             
                      



                                                                                       

                                                  

                 






                                                                                  

                                                         
         
                                      







                                                                            
                                                          





                                                                                     
                                      

                         
                                                          
                                                                    
                              
                 

                                                                   


                                         

                                                                                       

                                                                 
                               



                                                                                       
                              

                                     
                                                                                 

                                                                   
                                                

                                                                               

                                                                                         






                                                                            

                                                                                    
                                               
                                              



                                                           

                                                                                   
                                 






                                                                                   
                                                                        

                                                         
                         




                                                                                            
                                                                   
                                     
                 
         
              



                                                                       

                               







                                                                        



                                                                  
                                                             




                                                                










                                                                        
                                  
                                   






                                                                                
                                                           
                                    
                                                   

                                                                         
                                               

                                  
                                                         
                           
                                          
                  

















                                                                               
                                                            



                                       



                                                        
                                                                    
                                                                               







                                                                                             










                                                                                        

                                              
                                                                 









                                                                             

                 






                                                                                   











                                                                               

                                                                                            
                                                                  




                                                                   




                                                                       
                                        











                                                                                

                                                                                                















                                                                                            
                                                                             



                                                   







                                                                       





                                                              

                      





















                                                                              


                                              

                                                                          
                                  
                   










                                                                
                                                                  
                                                                    
                                                     


                                                     
                                                


                                                                                       




                                                                                                    
                                                                         
                                                            
                                         



                                                                                    












                                                                         
                                                 


                                               
                                                                
                                         
















                                                                                                   

                 
                                  
                   




                                                



                                                                                    






                                                                     









                                                  



                                                  





                                     
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package syncer

import (
    "context"
    "fmt"
    "sort"
    "sync"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core"
    "github.com/dexon-foundation/dexon-consensus/core/crypto"
    "github.com/dexon-foundation/dexon-consensus/core/db"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    "github.com/dexon-foundation/dexon-consensus/core/utils"
)

var (
    // ErrAlreadySynced is reported when syncer is synced.
    ErrAlreadySynced = fmt.Errorf("already synced")
    // ErrNotSynced is reported when syncer is not synced yet.
    ErrNotSynced = fmt.Errorf("not synced yet")
    // ErrGenesisBlockReached is reported when genesis block reached.
    ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
    // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered
    // blocks.
    ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
    // ErrMismatchBlockHashSequence means the delivering sequence is not
    // correct, compared to finalized blocks.
    ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence")
    // ErrInvalidSyncingFinalizationHeight raised when the blocks to sync is
    // not following the compaction chain tip in database.
    ErrInvalidSyncingFinalizationHeight = fmt.Errorf(
        "invalid syncing finalization height")
)

// Consensus is for syncing consensus module.
type Consensus struct {
    db           db.Database
    gov          core.Governance
    dMoment      time.Time
    logger       common.Logger
    app          core.Application
    prv          crypto.PrivateKey
    network      core.Network
    nodeSetCache *utils.NodeSetCache
    tsigVerifier *core.TSigVerifierCache

    lattice              *core.Lattice
    validatedChains      map[uint32]struct{}
    finalizedBlockHashes common.Hashes
    latticeLastRound     uint64
    randomnessResults    map[common.Hash]*types.BlockRandomnessResult
    blocks               []types.ByPosition
    agreements           []*agreement
    configs              []*types.Config
    roundBeginTimes      []time.Time
    agreementRoundCut    uint64

    // lock for accessing all fields.
    lock               sync.RWMutex
    moduleWaitGroup    sync.WaitGroup
    agreementWaitGroup sync.WaitGroup
    pullChan           chan common.Hash
    receiveChan        chan *types.Block
    ctx                context.Context
    ctxCancel          context.CancelFunc
    syncedLastBlock    *types.Block
    syncedConsensus    *core.Consensus
    dummyCancel        context.CancelFunc
    dummyFinished      <-chan struct{}
    dummyMsgBuffer     []interface{}
}

// NewConsensus creates an instance for Consensus (syncer consensus).
func NewConsensus(
    dMoment time.Time,
    app core.Application,
    gov core.Governance,
    db db.Database,
    network core.Network,
    prv crypto.PrivateKey,
    logger common.Logger) *Consensus {

    con := &Consensus{
        dMoment:         dMoment,
        app:             app,
        gov:             gov,
        db:              db,
        network:         network,
        nodeSetCache:    utils.NewNodeSetCache(gov),
        tsigVerifier:    core.NewTSigVerifierCache(gov, 7),
        prv:             prv,
        logger:          logger,
        validatedChains: make(map[uint32]struct{}),
        configs: []*types.Config{
            utils.GetConfigWithPanic(gov, 0, logger),
        },
        roundBeginTimes:   []time.Time{dMoment},
        receiveChan:       make(chan *types.Block, 1000),
        pullChan:          make(chan common.Hash, 1000),
        randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult),
    }
    con.ctx, con.ctxCancel = context.WithCancel(context.Background())
    return con
}

func (con *Consensus) initConsensusObj(initBlock *types.Block) {
    func() {
        con.lock.Lock()
        defer con.lock.Unlock()
        con.latticeLastRound = initBlock.Position.Round
        debugApp, _ := con.app.(core.Debug)
        con.lattice = core.NewLattice(
            con.roundBeginTimes[con.latticeLastRound],
            con.latticeLastRound,
            con.configs[con.latticeLastRound],
            utils.NewSigner(con.prv),
            con.app,
            debugApp,
            con.db,
            con.logger,
        )
    }()
    con.startAgreement()
    con.startNetwork()
    con.startCRSMonitor()
}

func (con *Consensus) checkIfValidated() (validated bool) {
    con.lock.RLock()
    defer con.lock.RUnlock()
    var (
        round               = con.blocks[0][0].Position.Round
        numChains           = con.configs[round].NumChains
        validatedChainCount uint32
    )
    // Make sure we validate some block in all chains.
    for chainID := range con.validatedChains {
        if chainID < numChains {
            validatedChainCount++
        }
    }
    validated = validatedChainCount == numChains
    con.logger.Debug("syncer chain-validation status",
        "validated-chain", validatedChainCount,
        "round", round,
        "valid", validated)
    return
}

func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
    con.lock.RLock()
    defer con.lock.RUnlock()
    var (
        round          = con.blocks[0][0].Position.Round
        numChains      = con.configs[round].NumChains
        compactionTips = make([]*types.Block, numChains)
        overlapCount   = uint32(0)
    )
    defer func() {
        con.logger.Debug("syncer synced status",
            "overlap-count", overlapCount,
            "num-chain", numChains,
            "last-block", blocks[len(blocks)-1],
            "synced", synced)
    }()
    // Find tips (newset blocks) of each chain in compaction chain.
    b := blocks[len(blocks)-1]
    for tipCount := uint32(0); tipCount < numChains; {
        if compactionTips[b.Position.ChainID] == nil {
            // Check chainID for config change.
            if b.Position.ChainID < numChains {
                compactionTips[b.Position.ChainID] = b
                tipCount++
            }
        }
        if (b.Finalization.ParentHash == common.Hash{}) {
            return
        }
        b1, err := con.db.GetBlock(b.Finalization.ParentHash)
        if err != nil {
            panic(err)
        }
        b = &b1
    }
    // Check if chain tips of compaction chain and current cached confirmed
    // blocks are overlapped on each chain, numChains is decided by the round
    // of last block we seen on compaction chain.
    for chainID, b := range compactionTips {
        if len(con.blocks[chainID]) > 0 {
            if !b.Position.Older(&con.blocks[chainID][0].Position) {
                overlapCount++
            }
        }
    }
    synced = overlapCount == numChains
    return
}

// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
// con.blocks are all in the same round, for avoiding config change while
// syncing.
func (con *Consensus) ensureAgreementOverlapRound() bool {
    con.lock.Lock()
    defer con.lock.Unlock()
    defer func() {
        con.logger.Debug("ensureAgreementOverlapRound returned",
            "round", con.agreementRoundCut)
    }()
    if con.agreementRoundCut > 0 {
        return true
    }
    // Clean empty blocks on tips of chains.
    for idx, bs := range con.blocks {
        for len(bs) > 0 && con.isEmptyBlock(bs[0]) {
            bs = bs[1:]
        }
        con.blocks[idx] = bs
    }
    // Build empty blocks.
    for _, bs := range con.blocks {
        for i := range bs {
            if con.isEmptyBlock(bs[i]) {
                if bs[i-1].Position.Height == bs[i].Position.Height-1 {
                    con.buildEmptyBlock(bs[i], bs[i-1])
                }
            }
        }
    }
    var tipRoundMap map[uint64]uint32
    for {
        tipRoundMap = make(map[uint64]uint32)
        for _, bs := range con.blocks {
            if len(bs) > 0 {
                tipRoundMap[bs[0].Position.Round]++
            }
        }
        if len(tipRoundMap) <= 1 {
            break
        }
        // Make all tips in same round.
        var maxRound uint64
        for r := range tipRoundMap {
            if r > maxRound {
                maxRound = r
            }
        }
        for idx, bs := range con.blocks {
            for len(bs) > 0 && bs[0].Position.Round < maxRound {
                bs = bs[1:]
            }
            con.blocks[idx] = bs
        }
    }
    if len(tipRoundMap) == 1 {
        var r uint64
        for r = range tipRoundMap {
            break
        }
        con.logger.Debug("check agreement round cut",
            "tip-round", r,
            "configs", len(con.configs))
        if tipRoundMap[r] == con.configs[r].NumChains {
            con.agreementRoundCut = r
            return true
        }
    }
    return false
}

func (con *Consensus) findLatticeSyncBlock(
    blocks []*types.Block) (*types.Block, error) {
    lastBlock := blocks[len(blocks)-1]
    round := lastBlock.Position.Round
    isConfigChanged := func(prev, cur *types.Config) bool {
        return prev.K != cur.K ||
            prev.NumChains != cur.NumChains ||
            prev.PhiRatio != cur.PhiRatio
    }
    for {
        // Find round r which r-1, r, r+1 are all in same total ordering config.
        for {
            sameAsPrevRound := round == 0 || !isConfigChanged(
                con.configs[round-1], con.configs[round])
            sameAsNextRound := !isConfigChanged(
                con.configs[round], con.configs[round+1])
            if sameAsPrevRound && sameAsNextRound {
                break
            }
            if round == 0 {
                // Unable to find a safe round, wait for new rounds.
                return nil, nil
            }
            round--
        }
        // Find the newset block which round is "round".
        for lastBlock.Position.Round != round {
            if (lastBlock.Finalization.ParentHash == common.Hash{}) {
                return nil, ErrGenesisBlockReached
            }
            b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash)
            if err != nil {
                return nil, err
            }
            lastBlock = &b
        }
        // Find the deliver set by hash for two times. Blocks in a deliver set
        // returned by total ordering is sorted by hash. If a block's parent
        // hash is greater than its hash means there is a cut between deliver
        // sets.
        var curBlock, prevBlock *types.Block
        var deliverSetFirstBlock, deliverSetLastBlock *types.Block
        curBlock = lastBlock
        for {
            if (curBlock.Finalization.ParentHash == common.Hash{}) {
                return nil, ErrGenesisBlockReached
            }
            b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
            if err != nil {
                return nil, err
            }
            prevBlock = &b
            if !prevBlock.Hash.Less(curBlock.Hash) {
                break
            }
            curBlock = prevBlock
        }
        deliverSetLastBlock = prevBlock
        curBlock = prevBlock
        for {
            if (curBlock.Finalization.ParentHash == common.Hash{}) {
                break
            }
            b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
            if err != nil {
                return nil, err
            }
            prevBlock = &b
            if !prevBlock.Hash.Less(curBlock.Hash) {
                break
            }
            curBlock = prevBlock
        }
        deliverSetFirstBlock = curBlock
        // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock
        // are in the same round.
        ok := true
        curBlock = deliverSetLastBlock
        for {
            if curBlock.Position.Round != round {
                ok = false
                break
            }
            b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
            if err != nil {
                return nil, err
            }
            curBlock = &b
            if curBlock.Hash == deliverSetFirstBlock.Hash {
                break
            }
        }
        if ok {
            return deliverSetFirstBlock, nil
        }
        if round == 0 {
            return nil, nil
        }
        round--
    }
}

func (con *Consensus) processFinalizedBlock(block *types.Block) error {
    if con.lattice == nil {
        return nil
    }
    delivered, err := con.lattice.ProcessFinalizedBlock(block)
    if err != nil {
        return err
    }
    con.lock.Lock()
    defer con.lock.Unlock()
    con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
    for idx, b := range delivered {
        if con.finalizedBlockHashes[idx] != b.Hash {
            return ErrMismatchBlockHashSequence
        }
        con.validatedChains[b.Position.ChainID] = struct{}{}
    }
    con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):]
    return nil
}

// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
// NOTICE: parameter "blocks" should be consecutive in compaction height.
// NOTICE: this method is not expected to be called concurrently.
func (con *Consensus) SyncBlocks(
    blocks []*types.Block, latest bool) (synced bool, err error) {
    defer func() {
        con.logger.Debug("SyncBlocks returned",
            "synced", synced,
            "error", err,
            "last-block", con.syncedLastBlock)
    }()
    if con.syncedLastBlock != nil {
        synced, err = true, ErrAlreadySynced
        return
    }
    if len(blocks) == 0 {
        return
    }
    // Check if blocks are consecutive.
    for i := 1; i < len(blocks); i++ {
        if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
            err = ErrInvalidBlockOrder
            return
        }
    }
    // Make sure the first block is the next block of current compaction chain
    // tip in DB.
    _, tipHeight := con.db.GetCompactionChainTipInfo()
    if blocks[0].Finalization.Height != tipHeight+1 {
        con.logger.Error("mismatched finalization height",
            "now", blocks[0].Finalization.Height,
            "expected", tipHeight+1)
        err = ErrInvalidSyncingFinalizationHeight
        return
    }
    con.logger.Trace("syncBlocks",
        "position", &blocks[0].Position,
        "final height", blocks[0].Finalization.Height,
        "len", len(blocks),
        "latest", latest,
    )
    con.setupConfigs(blocks)
    for _, b := range blocks {
        // TODO(haoping) remove this if lattice puts blocks into db.
        if err = con.db.PutBlock(*b); err != nil {
            // A block might be put into db when confirmed by BA, but not
            // finalized yet.
            if err == db.ErrBlockExists {
                err = con.db.UpdateBlock(*b)
            }
            if err != nil {
                return
            }
        }
        if err = con.db.PutCompactionChainTipInfo(
            b.Hash, b.Finalization.Height); err != nil {
            return
        }
        if err = con.processFinalizedBlock(b); err != nil {
            return
        }
    }
    if latest && con.lattice == nil {
        // New Lattice and find the deliver set of total ordering when "latest"
        // is true for first time. Deliver set is found by block hashes.
        var syncBlock *types.Block
        syncBlock, err = con.findLatticeSyncBlock(blocks)
        if err != nil {
            if err == ErrGenesisBlockReached {
                con.logger.Debug("SyncBlocks skip error", "error", err)
                err = nil
            }
            return
        }
        if syncBlock != nil {
            con.logger.Debug("deliver set found", "block", syncBlock)
            // New lattice with the round of syncBlock.
            con.initConsensusObj(syncBlock)
            con.setupConfigs(blocks)
            // Process blocks from syncBlock to blocks' last block.
            b := blocks[len(blocks)-1]
            blocksCount :=
                b.Finalization.Height - syncBlock.Finalization.Height + 1
            blocksToProcess := make([]*types.Block, blocksCount)
            for {
                blocksToProcess[blocksCount-1] = b
                blocksCount--
                if b.Hash == syncBlock.Hash {
                    break
                }
                var b1 types.Block
                b1, err = con.db.GetBlock(b.Finalization.ParentHash)
                if err != nil {
                    return
                }
                b = &b1
            }
            for _, b := range blocksToProcess {
                if err = con.processFinalizedBlock(b); err != nil {
                    return
                }
            }
        }
    }
    if latest && con.ensureAgreementOverlapRound() {
        // Check if compaction and agreements' blocks are overlapped. The
        // overlapping of compaction chain and BA's oldest blocks means the
        // syncing is done.
        if con.checkIfValidated() && con.checkIfSynced(blocks) {
            if err = con.Stop(); err != nil {
                return
            }
            con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
                context.Background(), con.network.ReceiveChan(),
                func(msg interface{}) {
                    con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
                })
            con.syncedLastBlock = blocks[len(blocks)-1]
            synced = true
        }
    }
    return
}

// GetSyncedConsensus returns the core.Consensus instance after synced.
func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
    con.lock.Lock()
    defer con.lock.Unlock()
    if con.syncedConsensus != nil {
        return con.syncedConsensus, nil
    }
    if con.syncedLastBlock == nil {
        return nil, ErrNotSynced
    }
    // flush all blocks in con.blocks into core.Consensus, and build
    // core.Consensus from syncer.
    confirmedBlocks := make([][]*types.Block, len(con.blocks))
    for i, bs := range con.blocks {
        confirmedBlocks[i] = []*types.Block(bs)
    }
    randomnessResults := []*types.BlockRandomnessResult{}
    for _, r := range con.randomnessResults {
        randomnessResults = append(randomnessResults, r)
    }
    con.dummyCancel()
    <-con.dummyFinished
    var err error
    con.syncedConsensus, err = core.NewConsensusFromSyncer(
        con.syncedLastBlock,
        con.roundBeginTimes[con.syncedLastBlock.Position.Round],
        con.app,
        con.gov,
        con.db,
        con.network,
        con.prv,
        con.lattice,
        confirmedBlocks,
        randomnessResults,
        con.dummyMsgBuffer,
        con.logger)
    return con.syncedConsensus, err
}

// Stop the syncer.
//
// This method is mainly for caller to stop the syncer before synced, the syncer
// would call this method automatically after being synced.
func (con *Consensus) Stop() error {
    con.logger.Trace("syncer is about to stop")
    // Stop network and CRS routines, wait until they are all stoped.
    con.ctxCancel()
    con.logger.Trace("stop syncer modules")
    con.moduleWaitGroup.Wait()
    // Stop agreements.
    con.logger.Trace("stop syncer agreement modules")
    con.stopAgreement()
    con.logger.Trace("syncer stopped")
    return nil
}

// isEmptyBlock checks if a block is an empty block by both its hash and parent
// hash are empty.
func (con *Consensus) isEmptyBlock(b *types.Block) bool {
    return b.Hash == common.Hash{} && b.ParentHash == common.Hash{}
}

// buildEmptyBlock builds an empty block in agreement.
func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) {
    cfg := con.configs[b.Position.Round]
    b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval)
    b.Witness.Height = parent.Witness.Height
    b.Witness.Data = make([]byte, len(parent.Witness.Data))
    copy(b.Witness.Data, parent.Witness.Data)
    b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash})
}

func (con *Consensus) setupConfigsUntilRound(round uint64) {
    curMaxNumChains := uint32(0)
    func() {
        con.lock.Lock()
        defer con.lock.Unlock()
        con.logger.Debug("syncer setupConfigs",
            "until-round", round,
            "length", len(con.configs),
            "lattice", con.latticeLastRound)
        for r := uint64(len(con.configs)); r <= round; r++ {
            cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
            con.configs = append(con.configs, cfg)
            con.roundBeginTimes = append(
                con.roundBeginTimes,
                con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval))
            if cfg.NumChains >= curMaxNumChains {
                curMaxNumChains = cfg.NumChains
            }
        }
        // Notify core.Lattice for new configs.
        if con.lattice != nil {
            for con.latticeLastRound+1 <= round {
                con.latticeLastRound++
                if err := con.lattice.AppendConfig(
                    con.latticeLastRound,
                    con.configs[con.latticeLastRound]); err != nil {
                    panic(err)
                }
            }
        }
    }()
    con.resizeByNumChains(curMaxNumChains)
    con.logger.Trace("setupConfgis finished", "round", round)
}

// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
// the first time, setupConfigs setups from round 0.
func (con *Consensus) setupConfigs(blocks []*types.Block) {
    // Find max round in blocks.
    var maxRound uint64
    for _, b := range blocks {
        if b.Position.Round > maxRound {
            maxRound = b.Position.Round
        }
    }
    // Get configs from governance.
    //
    // In fullnode, the notification of new round is yet another TX, which
    // needs to be executed after corresponding block delivered. Thus, the
    // configuration for 'maxRound + core.ConfigRoundShift' won't be ready when
    // seeing this block.
    con.setupConfigsUntilRound(maxRound + core.ConfigRoundShift - 1)
}

// resizeByNumChains resizes fake lattice and agreement if numChains increases.
// Notice the decreasing case is neglected.
func (con *Consensus) resizeByNumChains(numChains uint32) {
    con.lock.Lock()
    defer con.lock.Unlock()
    if numChains > uint32(len(con.blocks)) {
        for i := uint32(len(con.blocks)); i < numChains; i++ {
            // Resize the pool of blocks.
            con.blocks = append(con.blocks, types.ByPosition{})
            // Resize agreement modules.
            a := newAgreement(
                con.receiveChan, con.pullChan, con.nodeSetCache, con.logger)
            con.agreements = append(con.agreements, a)
            con.agreementWaitGroup.Add(1)
            go func() {
                defer con.agreementWaitGroup.Done()
                a.run()
            }()
        }
    }
}

// startAgreement starts agreements for receiving votes and agreements.
func (con *Consensus) startAgreement() {
    // Start a routine for listening receive channel and pull block channel.
    go func() {
        for {
            select {
            case b, ok := <-con.receiveChan:
                if !ok {
                    return
                }
                chainID := b.Position.ChainID
                func() {
                    con.lock.Lock()
                    defer con.lock.Unlock()
                    // If round is cut in agreements, do not add blocks with
                    // round less then cut round.
                    if b.Position.Round < con.agreementRoundCut {
                        return
                    }
                    con.blocks[chainID] = append(con.blocks[chainID], b)
                    sort.Sort(con.blocks[chainID])
                }()
            case h, ok := <-con.pullChan:
                if !ok {
                    return
                }
                con.network.PullBlocks(common.Hashes{h})
            }
        }
    }()
}

func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
    // There is no block randomness at round-0.
    if r.Position.Round == 0 {
        return
    }
    // We only have to cache randomness result after cutting round.
    if r.Position.Round < func() uint64 {
        con.lock.RLock()
        defer con.lock.RUnlock()
        return con.agreementRoundCut
    }() {
        return
    }
    if func() (exists bool) {
        con.lock.RLock()
        defer con.lock.RUnlock()
        _, exists = con.randomnessResults[r.BlockHash]
        return
    }() {
        return
    }
    v, ok, err := con.tsigVerifier.UpdateAndGet(r.Position.Round)
    if err != nil {
        con.logger.Error("Unable to get tsig verifier",
            "hash", r.BlockHash.String()[:6],
            "position", &r.Position,
            "error", err)
        return
    }
    if !ok {
        con.logger.Error("Tsig is not ready", "position", &r.Position)
        return
    }
    if !v.VerifySignature(r.BlockHash, crypto.Signature{
        Type:      "bls",
        Signature: r.Randomness}) {
        con.logger.Info("Block randomness is not valid",
            "position", &r.Position,
            "hash", r.BlockHash.String()[:6])
        return
    }
    con.lock.Lock()
    defer con.lock.Unlock()
    con.randomnessResults[r.BlockHash] = r
}

// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
    con.moduleWaitGroup.Add(1)
    go func() {
        defer con.moduleWaitGroup.Done()
    Loop:
        for {
            select {
            case val := <-con.network.ReceiveChan():
                var pos types.Position
                switch v := val.(type) {
                case *types.Block:
                    pos = v.Position
                case *types.AgreementResult:
                    pos = v.Position
                case *types.BlockRandomnessResult:
                    con.cacheRandomnessResult(v)
                    continue Loop
                default:
                    continue Loop
                }
                if func() bool {
                    con.lock.RLock()
                    defer con.lock.RUnlock()
                    if pos.ChainID >= uint32(len(con.agreements)) {
                        // This error might be easily encountered when the
                        // "latest" parameter of SyncBlocks is turned on too
                        // early.
                        con.logger.Error(
                            "Unknown chainID message received (syncer)",
                            "position", &pos)
                        return false
                    }
                    return true
                }() {
                    con.agreements[pos.ChainID].inputChan <- val
                }
            case <-con.ctx.Done():
                return
            }
        }
    }()
}

// startCRSMonitor is the dummiest way to verify if the CRS for one round
// is ready or not.
func (con *Consensus) startCRSMonitor() {
    var lastNotifiedRound uint64
    // Notify all agreements for new CRS.
    notifyNewCRS := func(round uint64) {
        con.setupConfigsUntilRound(round)
        if round == lastNotifiedRound {
            return
        }
        con.logger.Debug("CRS is ready", "round", round)
        lastNotifiedRound = round
        con.lock.Lock()
        defer con.lock.Unlock()
        for idx, a := range con.agreements {
        loop:
            for {
                select {
                case <-con.ctx.Done():
                    break loop
                case a.inputChan <- round:
                    break loop
                case <-time.After(500 * time.Millisecond):
                    con.logger.Debug(
                        "agreement input channel is full when putting CRS",
                        "chainID", idx,
                        "round", round)
                }
            }
        }
    }
    con.moduleWaitGroup.Add(1)
    go func() {
        defer con.moduleWaitGroup.Done()
        for {
            select {
            case <-con.ctx.Done():
                return
            case <-time.After(500 * time.Millisecond):
            }
            // Notify agreement modules for the latest round that CRS is
            // available if the round is not notified yet.
            checked := lastNotifiedRound + 1
            for (con.gov.CRS(checked) != common.Hash{}) {
                checked++
            }
            checked--
            if checked > lastNotifiedRound {
                notifyNewCRS(checked)
            }
        }
    }()
}

func (con *Consensus) stopAgreement() {
    func() {
        con.lock.Lock()
        defer con.lock.Unlock()
        for _, a := range con.agreements {
            if a.inputChan != nil {
                close(a.inputChan)
                a.inputChan = nil
            }
        }
    }()
    con.agreementWaitGroup.Wait()
    close(con.receiveChan)
    close(con.pullChan)
}