aboutsummaryrefslogblamecommitdiffstats
path: root/core/agreement.go
blob: d88ddba156fc1d2ab97ecc1dce39a30f464abffa (plain) (tree)






















                                                                               
              

                                                                 
                                                                      
                                                                     



                               
                                                                   
                                                                          



                                       
                             




                                                                  
                                             

 

                                                
                             




                                                                       
                                             

 

                                                                          
                                  
                                                                 



                      

                                                                
                                     
                      
                                 

 









                                 

                                                
                              
 








                                                              



                                                                                


                                     
                                                




                                                   



                                             
                        
                               
                                            
                                            
                                
                                     


                                       
                  
                                                
                                                                   
         
                                                      
                        









                                       


                                                                  






                                                
                                                                              

                                                  

                                                                   
                                       
                                                   

                                                 
                                       








































                                                                                  








                                                       

                                                  

 

                                                


                                                    




                                                         


                                      
                                                        

                            
                                        
         
                                            





                                                

                  
 
                                                           
                                

                                                


                                                                                       
                                                                                           
                                 

                         


                          






                                                                          

                                       






                                                         


                                                   
                                             

                                     





                                                                  



                                                     










                                                                               
                                                                        








                                            





                                                          

                                                            

                                        






                                                                     





                                                                             


                                                                 

                                               


                  












                                                                               























                                                                           
// Copyright 2018 The dexon-consensus-core Authors
// This file is part of the dexon-consensus-core library.
//
// The dexon-consensus-core library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus-core library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus-core library. If not, see
// <http://www.gnu.org/licenses/>.

package core

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "github.com/dexon-foundation/dexon-consensus-core/common"
    "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
    "github.com/dexon-foundation/dexon-consensus-core/core/types"
)

// Errors for agreement module.
var (
    ErrNotInNotarySet         = fmt.Errorf("not in notary set")
    ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature")
)

// ErrFork for fork error in agreement.
type ErrFork struct {
    nID      types.NodeID
    old, new common.Hash
}

func (e *ErrFork) Error() string {
    return fmt.Sprintf("fork is found for %s, old %s, new %s",
        e.nID.String(), e.old, e.new)
}

// ErrForkVote for fork vote error in agreement.
type ErrForkVote struct {
    nID      types.NodeID
    old, new *types.Vote
}

func (e *ErrForkVote) Error() string {
    return fmt.Sprintf("fork vote is found for %s, old %s, new %s",
        e.nID.String(), e.old, e.new)
}

func newVoteListMap() []map[types.NodeID]*types.Vote {
    listMap := make([]map[types.NodeID]*types.Vote, types.MaxVoteType)
    for idx := range listMap {
        listMap[idx] = make(map[types.NodeID]*types.Vote)
    }
    return listMap
}

// agreementReceiver is the interface receiving agreement event.
type agreementReceiver interface {
    ProposeVote(vote *types.Vote)
    ProposeBlock()
    ConfirmBlock(common.Hash)
}

type pendingBlock struct {
    block        *types.Block
    receivedTime time.Time
}

type pendingVote struct {
    vote         *types.Vote
    receivedTime time.Time
}

// agreementData is the data for agreementState.
type agreementData struct {
    recv agreementReceiver

    ID           types.NodeID
    leader       *leaderSelector
    defaultBlock common.Hash
    period       uint64
    requiredVote int
    votes        map[uint64][]map[types.NodeID]*types.Vote
    votesLock    sync.RWMutex
    blocks       map[types.NodeID]*types.Block
    blocksLock   sync.Mutex
}

// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm.
type agreement struct {
    state          agreementState
    data           *agreementData
    aID            *atomic.Value
    notarySet      map[types.NodeID]struct{}
    hasOutput      bool
    lock           sync.RWMutex
    pendingBlock   []pendingBlock
    pendingVote    []pendingVote
    candidateBlock map[common.Hash]*types.Block
}

// newAgreement creates a agreement instance.
func newAgreement(
    ID types.NodeID,
    recv agreementReceiver,
    notarySet map[types.NodeID]struct{},
    leader *leaderSelector) *agreement {
    agreement := &agreement{
        data: &agreementData{
            recv:   recv,
            ID:     ID,
            leader: leader,
        },
        aID:            &atomic.Value{},
        candidateBlock: make(map[common.Hash]*types.Block),
    }
    agreement.restart(notarySet, types.Position{})
    return agreement
}

// terminate the current running state.
func (a *agreement) terminate() {
    if a.state != nil {
        a.state.terminate()
    }
}

// restart the agreement
func (a *agreement) restart(
    notarySet map[types.NodeID]struct{}, aID types.Position) {

    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.data.votesLock.Lock()
        defer a.data.votesLock.Unlock()
        a.data.blocksLock.Lock()
        defer a.data.blocksLock.Unlock()
        a.data.votes = make(map[uint64][]map[types.NodeID]*types.Vote)
        a.data.votes[1] = newVoteListMap()
        a.data.period = 1
        a.data.blocks = make(map[types.NodeID]*types.Block)
        a.data.requiredVote = len(notarySet)/3*2 + 1
        a.data.leader.restart()
        a.data.defaultBlock = common.Hash{}
        a.hasOutput = false
        a.state = newPrepareState(a.data)
        a.notarySet = notarySet
        a.candidateBlock = make(map[common.Hash]*types.Block)
        a.aID.Store(aID)
    }()

    expireTime := time.Now().Add(-10 * time.Second)
    replayBlock := make([]*types.Block, 0)
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        newPendingBlock := make([]pendingBlock, 0)
        for _, pending := range a.pendingBlock {
            if pending.block.Position == aID {
                replayBlock = append(replayBlock, pending.block)
            } else if pending.receivedTime.After(expireTime) {
                newPendingBlock = append(newPendingBlock, pending)
            }
        }
        a.pendingBlock = newPendingBlock
    }()

    replayVote := make([]*types.Vote, 0)
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        newPendingVote := make([]pendingVote, 0)
        for _, pending := range a.pendingVote {
            if pending.vote.Position == aID {
                replayVote = append(replayVote, pending.vote)
            } else if pending.receivedTime.After(expireTime) {
                newPendingVote = append(newPendingVote, pending)
            }
        }
        a.pendingVote = newPendingVote
    }()

    for _, block := range replayBlock {
        a.processBlock(block)
    }

    for _, vote := range replayVote {
        a.processVote(vote)
    }
}

// clocks returns how many time this state is required.
func (a *agreement) clocks() int {
    return a.state.clocks()
}

// agreementID returns the current agreementID.
func (a *agreement) agreementID() types.Position {
    return a.aID.Load().(types.Position)
}

// nextState is called at the spcifi clock time.
func (a *agreement) nextState() (err error) {
    if err = a.state.receiveVote(); err != nil {
        return
    }
    a.state, err = a.state.nextState()
    return
}

func (a *agreement) sanityCheck(vote *types.Vote) error {
    if exist := func() bool {
        a.lock.RLock()
        defer a.lock.RUnlock()
        _, exist := a.notarySet[vote.ProposerID]
        return exist
    }(); !exist {
        return ErrNotInNotarySet
    }
    ok, err := verifyVoteSignature(vote)
    if err != nil {
        return err
    }
    if !ok {
        return ErrIncorrectVoteSignature
    }
    return nil
}

func (a *agreement) checkForkVote(vote *types.Vote) error {
    if err := func() error {
        a.data.votesLock.RLock()
        defer a.data.votesLock.RUnlock()
        if votes, exist := a.data.votes[vote.Period]; exist {
            if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist {
                if vote.BlockHash != oldVote.BlockHash {
                    return &ErrForkVote{vote.ProposerID, oldVote, vote}
                }
            }
        }
        return nil
    }(); err != nil {
        return err
    }
    return nil
}

// prepareVote prepares a vote.
func (a *agreement) prepareVote(vote *types.Vote, prv crypto.PrivateKey) (
    err error) {
    vote.ProposerID = a.data.ID
    vote.Position = a.agreementID()
    hash := hashVote(vote)
    vote.Signature, err = prv.Sign(hash)
    return
}

// processVote is the entry point for processing Vote.
func (a *agreement) processVote(vote *types.Vote) error {
    if err := a.sanityCheck(vote); err != nil {
        return err
    }
    if vote.Position != a.agreementID() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.pendingVote = append(a.pendingVote, pendingVote{
            vote:         vote,
            receivedTime: time.Now().UTC(),
        })
        return nil
    }
    if err := a.checkForkVote(vote); err != nil {
        return err
    }

    if func() bool {
        a.data.votesLock.Lock()
        defer a.data.votesLock.Unlock()
        if _, exist := a.data.votes[vote.Period]; !exist {
            a.data.votes[vote.Period] = newVoteListMap()
        }
        a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote
        if !a.hasOutput && vote.Type == types.VoteConfirm {
            if len(a.data.votes[vote.Period][types.VoteConfirm]) >=
                a.data.requiredVote {
                a.hasOutput = true
                a.data.recv.ConfirmBlock(vote.BlockHash)
            }
        }
        return true
    }() {
        return a.state.receiveVote()
    }
    return nil
}

// prepareBlok prepares a block.
func (a *agreement) prepareBlock(
    block *types.Block, prv crypto.PrivateKey) error {
    return a.data.leader.prepareBlock(block, prv)
}

// processBlock is the entry point for processing Block.
func (a *agreement) processBlock(block *types.Block) error {
    a.data.blocksLock.Lock()
    defer a.data.blocksLock.Unlock()
    if block.Position != a.agreementID() {
        a.pendingBlock = append(a.pendingBlock, pendingBlock{
            block:        block,
            receivedTime: time.Now().UTC(),
        })
        return nil
    }
    if b, exist := a.data.blocks[block.ProposerID]; exist {
        if b.Hash != block.Hash {
            return &ErrFork{block.ProposerID, b.Hash, block.Hash}
        }
        return nil
    }
    if err := a.data.leader.processBlock(block); err != nil {
        return err
    }
    a.data.blocks[block.ProposerID] = block
    a.addCandidateBlock(block)
    return nil
}

func (a *agreement) addCandidateBlock(block *types.Block) {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.candidateBlock[block.Hash] = block
}

func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) {
    a.lock.RLock()
    defer a.lock.RUnlock()
    b, e := a.candidateBlock[hash]
    return b, e
}

func (a *agreementData) countVote(period uint64, voteType types.VoteType) (
    blockHash common.Hash, ok bool) {
    a.votesLock.RLock()
    defer a.votesLock.RUnlock()
    votes, exist := a.votes[period]
    if !exist {
        return
    }
    candidate := make(map[common.Hash]int)
    for _, vote := range votes[voteType] {
        if _, exist := candidate[vote.BlockHash]; !exist {
            candidate[vote.BlockHash] = 0
        }
        candidate[vote.BlockHash]++
    }
    for candidateHash, votes := range candidate {
        if votes >= a.requiredVote {
            blockHash = candidateHash
            ok = true
            return
        }
    }
    return
}