aboutsummaryrefslogblamecommitdiffstats
path: root/core/agreement.go
blob: d90afc61073c3ccd075d93abe3533ca7c6e9871f (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
  
                                                                        



                                                                               
                                                                         




                                                                           
                                                      





                                  
              
              
                     
              
 

                                                                
                                                                

 






                                           

                               



                                                                                 
                                                                                



                                       
                             




                                                                  
                                             

 

                                                
                             




                                                                       
                                             

 

                                                                          
                                  
                                                                 



                      

                                                                
                                     
                                  

                                                                                     
                                                               
                                 

                                            
                                                             

 









                                 

                                                
                              
 
                                 
                         
                                    
                                
                           


                                                              
                                 

                                                  



                                                                                














                                                                        



                                             
                        
                               
                               

                                          
                                
                                     


                                       
                  





                                                                                        
         
                        
                        

 
                        
                            

                                                               
                          
                         

                                     

                                                 
                                                                  


                                            

                                                                                  

                                          

                                                
                                                                              
                                                  
                                 
                                                                   
                                               
                                          
                                                      
                                   
                                                     



                                                
                                                    
                                     
                                   
                                              
                                       
                                                                     



                                             



                           
 



                        















                                                                    






                                                          
                                                              

                                                                 




                                                                                        












                                                                                  
                                                             

                                                                


                                                                                           







                                                                                
                                                             
                                                                                           

                                               

         






                                                                                           
                                         
                                                           
                                                                                          

                                             


         
                            




                                                                      

 
                                      
                                           

 

                                                       

                                   
                                       


                                            



                                




                                                                      

 

                                                                          


                                                  
                                                  
                                                                             

 
                                               
                                                  











                                           

 
                                                  
                                             


                             


                                               




                                                         


                                           
                                                  





                                                



                                                                                       




                                                                        
         

                  
 

                                                     



                                                                               
                                           
                                                                
                                                                         

                                                                                  


                         
              


                               
                                                               
                                       
                                     


              
                                                            


                                    






                                         
                                                       

 

                                                         

                             


                                                   
                              
 

                                        

                                                                            



                                                                          
                                  
                 
                                        
         
                                 
                                             
                                  
                 





                                                                  

                                           

                          


                          
 




                                                            


                                                                                     
                                                                    
                          


                                                         
                                                                                    
                                                     

                                                        










                                                                                                            




                                                                             



                                                         
                         
                                  
                 




                                                           


                                   
                                          



                                                         
                                                                                    
                                                     
                                       
                                                          
                                                       
                                                             


                                                        
                                                            



                                                         
                                          

                         



                                                                                      


                                                                                  

                                                                               

                                                
                                                                                                   



                                                                               

                                               


                                                      
                                                



                                         
                          



                  











                                                               

                                  






                                                 


























                                                                                 
                                            

                             

                                       

                                          

                                            
                 

                                                   


                                                
         


                                 
                         

 


                                      



                                            


                          

                                                            


















                                                                 

                             

                                        
                              



                                                                                 




                                                                     

                                       
         

                                                               
                                                             



                                                                             


                                                                 
                                               
                                        

                                                                                     


                                                


                                                           




                                                                                                     



                                                                



                                                                         
                                                                                                  
















                                                                                                         


                  


                                                           



                                                                 


                                            

                                                


                                      










                                                                               

                                                                           






                                                                                  



















                                                                  








                                                     
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package core

import (
    "fmt"
    "math"
    "sync"
    "sync/atomic"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    "github.com/dexon-foundation/dexon-consensus/core/utils"
)

// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})

func init() {
    close(closedchan)
}

// Errors for agreement module.
var (
    ErrInvalidVote                   = fmt.Errorf("invalid vote")
    ErrNotInNotarySet                = fmt.Errorf("not in notary set")
    ErrIncorrectVoteSignature        = fmt.Errorf("incorrect vote signature")
    ErrIncorrectVotePartialSignature = fmt.Errorf("incorrect vote psig")
    ErrMismatchBlockPosition         = fmt.Errorf("mismatch block position")
)

// ErrFork for fork error in agreement.
type ErrFork struct {
    nID      types.NodeID
    old, new common.Hash
}

func (e *ErrFork) Error() string {
    return fmt.Sprintf("fork is found for %s, old %s, new %s",
        e.nID.String(), e.old, e.new)
}

// ErrForkVote for fork vote error in agreement.
type ErrForkVote struct {
    nID      types.NodeID
    old, new *types.Vote
}

func (e *ErrForkVote) Error() string {
    return fmt.Sprintf("fork vote is found for %s, old %s, new %s",
        e.nID.String(), e.old, e.new)
}

func newVoteListMap() []map[types.NodeID]*types.Vote {
    listMap := make([]map[types.NodeID]*types.Vote, types.MaxVoteType)
    for idx := range listMap {
        listMap[idx] = make(map[types.NodeID]*types.Vote)
    }
    return listMap
}

// agreementReceiver is the interface receiving agreement event.
type agreementReceiver interface {
    ProposeVote(vote *types.Vote)
    ProposeBlock() common.Hash
    // ConfirmBlock is called with lock hold. User can safely use all data within
    // agreement module.
    ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote)
    PullBlocks(common.Hashes)
    ReportForkVote(v1, v2 *types.Vote)
    ReportForkBlock(b1, b2 *types.Block)
    VerifyPartialSignature(vote *types.Vote) (bool, bool)
}

type pendingBlock struct {
    block        *types.Block
    receivedTime time.Time
}

type pendingVote struct {
    vote         *types.Vote
    receivedTime time.Time
}

// agreementData is the data for agreementState.
type agreementData struct {
    recv agreementReceiver

    ID           types.NodeID
    isLeader     bool
    leader       *leaderSelector
    lockValue    common.Hash
    lockIter     uint64
    period       uint64
    requiredVote int
    votes        map[uint64][]map[types.NodeID]*types.Vote
    lock         sync.RWMutex
    blocks       map[types.NodeID]*types.Block
    blocksLock   sync.Mutex
}

// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
    state                  agreementState
    data                   *agreementData
    aID                    *atomic.Value
    doneChan               chan struct{}
    notarySet              map[types.NodeID]struct{}
    hasVoteFast            bool
    hasOutput              bool
    lock                   sync.RWMutex
    pendingBlock           []pendingBlock
    pendingVote            []pendingVote
    pendingAgreementResult map[types.Position]*types.AgreementResult
    candidateBlock         map[common.Hash]*types.Block
    fastForward            chan uint64
    signer                 *utils.Signer
    logger                 common.Logger
}

// newAgreement creates a agreement instance.
func newAgreement(
    ID types.NodeID,
    recv agreementReceiver,
    leader *leaderSelector,
    signer *utils.Signer,
    logger common.Logger) *agreement {
    agreement := &agreement{
        data: &agreementData{
            recv:   recv,
            ID:     ID,
            leader: leader,
        },
        aID:                    &atomic.Value{},
        pendingAgreementResult: make(map[types.Position]*types.AgreementResult),
        candidateBlock:         make(map[common.Hash]*types.Block),
        fastForward:            make(chan uint64, 1),
        signer:                 signer,
        logger:                 logger,
    }
    agreement.stop()
    return agreement
}

// restart the agreement
func (a *agreement) restart(
    notarySet map[types.NodeID]struct{},
    threshold int, aID types.Position, leader types.NodeID,
    crs common.Hash) {
    if !func() bool {
        a.lock.Lock()
        defer a.lock.Unlock()
        if !isStop(aID) {
            oldAID := a.agreementID()
            if !isStop(oldAID) && !aID.Newer(oldAID) {
                return false
            }
        }
        a.logger.Debug("Restarting BA",
            "notarySet", notarySet, "position", aID, "leader", leader)
        a.data.lock.Lock()
        defer a.data.lock.Unlock()
        a.data.blocksLock.Lock()
        defer a.data.blocksLock.Unlock()
        a.data.votes = make(map[uint64][]map[types.NodeID]*types.Vote)
        a.data.votes[1] = newVoteListMap()
        a.data.period = 2
        a.data.blocks = make(map[types.NodeID]*types.Block)
        a.data.requiredVote = threshold
        a.data.leader.restart(crs)
        a.data.lockValue = types.SkipBlockHash
        a.data.lockIter = 0
        a.data.isLeader = a.data.ID == leader
        if a.doneChan != nil {
            close(a.doneChan)
        }
        a.doneChan = make(chan struct{})
        a.fastForward = make(chan uint64, 1)
        a.hasVoteFast = false
        a.hasOutput = false
        a.state = newFastState(a.data)
        a.notarySet = notarySet
        a.candidateBlock = make(map[common.Hash]*types.Block)
        a.aID.Store(struct {
            pos    types.Position
            leader types.NodeID
        }{aID, leader})
        return true
    }() {
        return
    }

    if isStop(aID) {
        return
    }

    var result *types.AgreementResult
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        newPendingAgreementResult := make(
            map[types.Position]*types.AgreementResult)
        for pos, agr := range a.pendingAgreementResult {
            if pos.Newer(aID) {
                newPendingAgreementResult[pos] = agr
            } else if pos == aID {
                result = agr
            }
        }
        a.pendingAgreementResult = newPendingAgreementResult
    }()

    expireTime := time.Now().Add(-10 * time.Second)
    replayBlock := make([]*types.Block, 0)
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        newPendingBlock := make([]pendingBlock, 0)
        for _, pending := range a.pendingBlock {
            if aID.Newer(pending.block.Position) {
                continue
            } else if pending.block.Position == aID {
                if result == nil ||
                    result.Position.Round < DKGDelayRound ||
                    result.BlockHash == pending.block.Hash {
                    replayBlock = append(replayBlock, pending.block)
                }
            } else if pending.receivedTime.After(expireTime) {
                newPendingBlock = append(newPendingBlock, pending)
            }
        }
        a.pendingBlock = newPendingBlock
    }()

    replayVote := make([]*types.Vote, 0)
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        newPendingVote := make([]pendingVote, 0)
        for _, pending := range a.pendingVote {
            if aID.Newer(pending.vote.Position) {
                continue
            } else if pending.vote.Position == aID {
                if result == nil || result.Position.Round < DKGDelayRound {
                    replayVote = append(replayVote, pending.vote)
                }
            } else if pending.receivedTime.After(expireTime) {
                newPendingVote = append(newPendingVote, pending)
            }
        }
        a.pendingVote = newPendingVote
    }()

    for _, block := range replayBlock {
        if err := a.processBlock(block); err != nil {
            a.logger.Error("Failed to process block when restarting agreement",
                "block", block)
        }
    }

    if result != nil {
        if err := a.processAgreementResult(result); err != nil {
            a.logger.Error("Failed to process agreement result when retarting",
                "result", result)
        }
    }

    for _, vote := range replayVote {
        if err := a.processVote(vote); err != nil {
            a.logger.Error("Failed to process vote when restarting agreement",
                "vote", vote)
        }
    }
}

func (a *agreement) stop() {
    a.restart(make(map[types.NodeID]struct{}), int(math.MaxInt32),
        types.Position{
            Height: math.MaxUint64,
        },
        types.NodeID{}, common.Hash{})
}

func isStop(aID types.Position) bool {
    return aID.Height == math.MaxUint64
}

// clocks returns how many time this state is required.
func (a *agreement) clocks() int {
    a.data.lock.RLock()
    defer a.data.lock.RUnlock()
    scale := int(a.data.period) - 1
    if a.state.state() == stateForward {
        scale = 1
    }
    if scale < 1 {
        // just in case.
        scale = 1
    }
    // 10 is a magic number derived from many years of experience.
    if scale > 10 {
        scale = 10
    }
    return a.state.clocks() * scale
}

// pullVotes returns if current agreement requires more votes to continue.
func (a *agreement) pullVotes() bool {
    a.data.lock.RLock()
    defer a.data.lock.RUnlock()
    return a.state.state() == statePullVote ||
        a.state.state() == stateInitial ||
        (a.state.state() == statePreCommit && (a.data.period%3) == 0)
}

// agreementID returns the current agreementID.
func (a *agreement) agreementID() types.Position {
    return a.aID.Load().(struct {
        pos    types.Position
        leader types.NodeID
    }).pos
}

// leader returns the current leader.
func (a *agreement) leader() types.NodeID {
    return a.aID.Load().(struct {
        pos    types.Position
        leader types.NodeID
    }).leader
}

// nextState is called at the specific clock time.
func (a *agreement) nextState() (err error) {
    a.lock.Lock()
    defer a.lock.Unlock()
    if a.hasOutput {
        a.state = newSleepState(a.data)
        return
    }
    a.state, err = a.state.nextState()
    return
}

func (a *agreement) sanityCheck(vote *types.Vote) error {
    if vote.Type >= types.MaxVoteType {
        return ErrInvalidVote
    }
    ok, err := utils.VerifyVoteSignature(vote)
    if err != nil {
        return err
    }
    if !ok {
        return ErrIncorrectVoteSignature
    }
    if vote.Position.Round != a.agreementID().Round {
        // TODO(jimmy): maybe we can verify partial signature at agreement-mgr.
        return nil
    }
    if ok, report := a.data.recv.VerifyPartialSignature(vote); !ok {
        if report {
            return ErrIncorrectVotePartialSignature
        }
        return ErrSkipButNoError
    }
    return nil
}

func (a *agreement) checkForkVote(vote *types.Vote) (
    alreadyExist bool, err error) {
    a.data.lock.RLock()
    defer a.data.lock.RUnlock()
    if votes, exist := a.data.votes[vote.Period]; exist {
        if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist {
            alreadyExist = true
            if vote.BlockHash != oldVote.BlockHash {
                a.data.recv.ReportForkVote(oldVote, vote)
                err = &ErrForkVote{vote.ProposerID, oldVote, vote}
                return
            }
        }
    }
    return
}

// prepareVote prepares a vote.
func (a *agreement) prepareVote(vote *types.Vote) (err error) {
    vote.Position = a.agreementID()
    err = a.signer.SignVote(vote)
    return
}

func (a *agreement) updateFilter(filter *utils.VoteFilter) {
    if isStop(a.agreementID()) {
        return
    }
    a.lock.RLock()
    defer a.lock.RUnlock()
    a.data.lock.RLock()
    defer a.data.lock.RUnlock()
    filter.Confirm = a.hasOutput
    filter.LockIter = a.data.lockIter
    filter.Period = a.data.period
    filter.Position.Height = a.agreementID().Height
}

// processVote is the entry point for processing Vote.
func (a *agreement) processVote(vote *types.Vote) error {
    a.lock.Lock()
    defer a.lock.Unlock()
    if err := a.sanityCheck(vote); err != nil {
        return err
    }
    aID := a.agreementID()

    // Agreement module has stopped.
    if isStop(aID) {
        // Hacky way to not drop first votes when round just begins.
        if vote.Position.Round == aID.Round {
            a.pendingVote = append(a.pendingVote, pendingVote{
                vote:         vote,
                receivedTime: time.Now().UTC(),
            })
            return nil
        }
        return ErrSkipButNoError
    }
    if vote.Position != aID {
        if aID.Newer(vote.Position) {
            return nil
        }
        a.pendingVote = append(a.pendingVote, pendingVote{
            vote:         vote,
            receivedTime: time.Now().UTC(),
        })
        return nil
    }
    exist, err := a.checkForkVote(vote)
    if err != nil {
        return err
    }
    if exist {
        return nil
    }

    a.data.lock.Lock()
    defer a.data.lock.Unlock()
    if _, exist := a.data.votes[vote.Period]; !exist {
        a.data.votes[vote.Period] = newVoteListMap()
    }
    if _, exist := a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist {
        return nil
    }
    a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote
    if !a.hasOutput &&
        (vote.Type == types.VoteCom ||
            vote.Type == types.VoteFast ||
            vote.Type == types.VoteFastCom) {
        if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok &&
            hash != types.SkipBlockHash {
            if vote.Type == types.VoteFast {
                if !a.hasVoteFast {
                    if a.state.state() == stateFast ||
                        a.state.state() == stateFastVote {
                        a.data.recv.ProposeVote(
                            types.NewVote(types.VoteFastCom, hash, vote.Period))
                        a.hasVoteFast = true

                    }
                    if a.data.lockIter == 0 {
                        a.data.lockValue = hash
                        a.data.lockIter = 1
                    }
                }
            } else {
                a.hasOutput = true
                a.data.recv.ConfirmBlock(hash,
                    a.data.votes[vote.Period][vote.Type])
                if a.doneChan != nil {
                    close(a.doneChan)
                    a.doneChan = nil
                }
            }
            return nil
        }
    } else if a.hasOutput {
        return nil
    }

    // Check if the agreement requires fast-forwarding.
    if len(a.fastForward) > 0 {
        return nil
    }
    if vote.Type == types.VotePreCom {
        if vote.Period < a.data.lockIter {
            // This PreCom is useless for us.
            return nil
        }
        if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok &&
            hash != types.SkipBlockHash {
            // Condition 1.
            if vote.Period > a.data.lockIter {
                a.data.lockValue = hash
                a.data.lockIter = vote.Period
            }
            // Condition 2.
            if vote.Period > a.data.period {
                a.fastForward <- vote.Period
                if a.doneChan != nil {
                    close(a.doneChan)
                    a.doneChan = nil
                }
                return nil
            }
        }
    }
    // Condition 3.
    if vote.Type == types.VoteCom && vote.Period >= a.data.period &&
        len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote {
        hashes := common.Hashes{}
        addPullBlocks := func(voteType types.VoteType) {
            for _, vote := range a.data.votes[vote.Period][voteType] {
                if vote.BlockHash == types.NullBlockHash ||
                    vote.BlockHash == types.SkipBlockHash {
                    continue
                }
                if _, found := a.findCandidateBlockNoLock(vote.BlockHash); !found {
                    hashes = append(hashes, vote.BlockHash)
                }
            }
        }
        addPullBlocks(types.VotePreCom)
        addPullBlocks(types.VoteCom)
        if len(hashes) > 0 {
            a.data.recv.PullBlocks(hashes)
        }
        a.fastForward <- vote.Period + 1
        if a.doneChan != nil {
            close(a.doneChan)
            a.doneChan = nil
        }
        return nil
    }
    return nil
}

func (a *agreement) processFinalizedBlock(block *types.Block) {
    a.lock.Lock()
    defer a.lock.Unlock()
    if a.hasOutput {
        return
    }
    aID := a.agreementID()
    if aID.Older(block.Position) {
        return
    }
    a.addCandidateBlockNoLock(block)
    a.hasOutput = true
    a.data.lock.Lock()
    defer a.data.lock.Unlock()
    a.data.recv.ConfirmBlock(block.Hash, nil)
    if a.doneChan != nil {
        close(a.doneChan)
        a.doneChan = nil
    }
}

func (a *agreement) processAgreementResult(result *types.AgreementResult) error {
    a.lock.Lock()
    defer a.lock.Unlock()
    aID := a.agreementID()
    if result.Position.Older(aID) {
        return nil
    } else if result.Position.Newer(aID) {
        a.pendingAgreementResult[result.Position] = result
        return nil
    }
    if a.hasOutput {
        return nil
    }
    a.data.lock.Lock()
    defer a.data.lock.Unlock()
    if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist {
        a.data.recv.PullBlocks(common.Hashes{result.BlockHash})
    }
    a.hasOutput = true
    a.data.recv.ConfirmBlock(result.BlockHash, nil)
    if a.doneChan != nil {
        close(a.doneChan)
        a.doneChan = nil
    }
    return nil
}

func (a *agreement) done() <-chan struct{} {
    a.lock.Lock()
    defer a.lock.Unlock()
    select {
    case period := <-a.fastForward:
        a.data.lock.Lock()
        defer a.data.lock.Unlock()
        if period <= a.data.period {
            break
        }
        a.data.setPeriod(period)
        a.state = newPreCommitState(a.data)
        a.doneChan = make(chan struct{})
        return closedchan
    default:
    }
    if a.doneChan == nil {
        return closedchan
    }
    return a.doneChan
}

func (a *agreement) confirmed() bool {
    a.lock.RLock()
    defer a.lock.RUnlock()
    return a.confirmedNoLock()
}

func (a *agreement) confirmedNoLock() bool {
    return a.hasOutput
}

// processBlock is the entry point for processing Block.
func (a *agreement) processBlock(block *types.Block) error {
    checkSkip := func() bool {
        aID := a.agreementID()
        if block.Position != aID {
            // Agreement module has stopped.
            if !isStop(aID) {
                if aID.Newer(block.Position) {
                    return true
                }
            }
        }
        return false
    }
    if checkSkip() {
        return nil
    }
    if err := utils.VerifyBlockSignature(block); err != nil {
        return err
    }

    a.lock.Lock()
    defer a.lock.Unlock()
    a.data.blocksLock.Lock()
    defer a.data.blocksLock.Unlock()
    aID := a.agreementID()
    // a.agreementID might change during lock, so we need to checkSkip again.
    if checkSkip() {
        return nil
    } else if aID != block.Position {
        a.pendingBlock = append(a.pendingBlock, pendingBlock{
            block:        block,
            receivedTime: time.Now().UTC(),
        })
        return nil
    } else if a.confirmedNoLock() {
        return nil
    }
    if b, exist := a.data.blocks[block.ProposerID]; exist {
        if b.Hash != block.Hash {
            a.data.recv.ReportForkBlock(b, block)
            return &ErrFork{block.ProposerID, b.Hash, block.Hash}
        }
        return nil
    }
    if err := a.data.leader.processBlock(block); err != nil {
        return err
    }
    a.data.blocks[block.ProposerID] = block
    a.addCandidateBlockNoLock(block)
    if block.ProposerID != a.data.ID &&
        (a.state.state() == stateFast || a.state.state() == stateFastVote) &&
        block.ProposerID == a.leader() {
        go func() {
            for func() bool {
                if aID != a.agreementID() {
                    return false
                }
                a.lock.RLock()
                defer a.lock.RUnlock()
                if a.state.state() != stateFast && a.state.state() != stateFastVote {
                    return false
                }
                a.data.lock.RLock()
                defer a.data.lock.RUnlock()
                a.data.blocksLock.Lock()
                defer a.data.blocksLock.Unlock()
                block, exist := a.data.blocks[a.leader()]
                if !exist {
                    return true
                }
                ok, err := a.data.leader.validLeader(block, a.data.leader.hashCRS)
                if err != nil {
                    fmt.Println("Error checking validLeader for Fast BA",
                        "error", err, "block", block)
                    return false
                }
                if ok {
                    a.data.recv.ProposeVote(
                        types.NewVote(types.VoteFast, block.Hash, a.data.period))
                    return false
                }
                return true
            }() {
                // TODO(jimmy): retry interval should be related to configurations.
                time.Sleep(250 * time.Millisecond)
            }
        }()
    }
    return nil
}

func (a *agreement) addCandidateBlock(block *types.Block) {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.addCandidateBlockNoLock(block)
}

func (a *agreement) addCandidateBlockNoLock(block *types.Block) {
    a.candidateBlock[block.Hash] = block
}

func (a *agreement) findCandidateBlockNoLock(
    hash common.Hash) (*types.Block, bool) {
    b, e := a.candidateBlock[hash]
    return b, e
}

// find a block in both candidate blocks and pending blocks in leader-selector.
// A block might be confirmed by others while we can't verify its validity.
func (a *agreement) findBlockNoLock(hash common.Hash) (*types.Block, bool) {
    b, e := a.findCandidateBlockNoLock(hash)
    if !e {
        b, e = a.data.leader.findPendingBlock(hash)
    }
    return b, e
}

func (a *agreementData) countVote(period uint64, voteType types.VoteType) (
    blockHash common.Hash, ok bool) {
    a.lock.RLock()
    defer a.lock.RUnlock()
    return a.countVoteNoLock(period, voteType)
}

func (a *agreementData) countVoteNoLock(
    period uint64, voteType types.VoteType) (blockHash common.Hash, ok bool) {
    votes, exist := a.votes[period]
    if !exist {
        return
    }
    candidate := make(map[common.Hash]int)
    for _, vote := range votes[voteType] {
        if _, exist := candidate[vote.BlockHash]; !exist {
            candidate[vote.BlockHash] = 0
        }
        candidate[vote.BlockHash]++
    }
    for candidateHash, votes := range candidate {
        if votes >= a.requiredVote {
            blockHash = candidateHash
            ok = true
            return
        }
    }
    return
}

func (a *agreementData) setPeriod(period uint64) {
    for i := a.period + 1; i <= period; i++ {
        if _, exist := a.votes[i]; !exist {
            a.votes[i] = newVoteListMap()
        }
    }
    a.period = period
}