// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
package core
import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
cryptoDKG "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg"
"github.com/dexon-foundation/dexon-consensus/core/db"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
// Errors for consensus core.
var (
ErrProposerNotInNodeSet = fmt.Errorf(
"proposer is not in node set")
ErrIncorrectHash = fmt.Errorf(
"hash of block is incorrect")
ErrIncorrectSignature = fmt.Errorf(
"signature of block is incorrect")
ErrUnknownBlockProposed = fmt.Errorf(
"unknown block is proposed")
ErrIncorrectAgreementResultPosition = fmt.Errorf(
"incorrect agreement result position")
ErrNotEnoughVotes = fmt.Errorf(
"not enought votes")
ErrCRSNotReady = fmt.Errorf(
"CRS not ready")
ErrConfigurationNotReady = fmt.Errorf(
"Configuration not ready")
ErrIncorrectBlockRandomness = fmt.Errorf(
"randomness of block is incorrect")
ErrCannotVerifyBlockRandomness = fmt.Errorf(
"cannot verify block randomness")
)
// consensusBAReceiver implements agreementReceiver.
type consensusBAReceiver struct {
consensus *Consensus
agreementModule *agreement
emptyBlockHashMap *sync.Map
isNotary bool
restartNotary chan types.Position
npks *typesDKG.NodePublicKeys
psigSigner *dkgShareSecret
}
func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) (
common.Hash, error) {
hashVal, ok := recv.emptyBlockHashMap.Load(pos)
if ok {
return hashVal.(common.Hash), nil
}
emptyBlock, err := recv.consensus.bcModule.prepareBlock(
pos, time.Time{}, true)
if err != nil {
return common.Hash{}, err
}
hash, err := utils.HashBlock(emptyBlock)
if err != nil {
return common.Hash{}, err
}
recv.emptyBlockHashMap.Store(pos, hash)
return hash, nil
}
func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool {
if vote.Position.Round >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash {
if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
if recv.npks == nil {
return false
}
if vote.Position.Round != recv.npks.Round {
return false
}
pubKey, exist := recv.npks.PublicKeys[vote.ProposerID]
if !exist {
return false
}
blockHash := vote.BlockHash
if blockHash == types.NullBlockHash {
var err error
blockHash, err = recv.emptyBlockHash(vote.Position)
if err != nil {
recv.consensus.logger.Error(
"Failed to verify vote for empty block",
"position", vote.Position,
"error", err)
return false
}
}
return pubKey.VerifySignature(
vote.BlockHash, crypto.Signature(vote.PartialSignature))
}
}
return len(vote.PartialSignature.Signature) == 0
}
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
}
if recv.psigSigner != nil &&
vote.BlockHash != types.SkipBlockHash {
if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
if vote.BlockHash == types.NullBlockHash {
hash, err := recv.emptyBlockHash(vote.Position)
if err != nil {
recv.consensus.logger.Error(
"Failed to propose vote for empty block",
"position", vote.Position,
"error", err)
return
}
vote.PartialSignature = recv.psigSigner.sign(hash)
} else {
vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash)
}
}
}
if err := recv.agreementModule.prepareVote(vote); err != nil {
recv.consensus.logger.Error("Failed to prepare vote", "error", err)
return
}
go func() {
if err := recv.agreementModule.processVote(vote); err != nil {
recv.consensus.logger.Error("Failed to process self vote",
"error", err,
"vote", vote)
return
}
recv.consensus.logger.Debug("Calling Network.BroadcastVote",
"vote", vote)
recv.consensus.network.BroadcastVote(vote)
}()
}
func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
if !recv.isNotary {
return common.Hash{}
}
block, err := recv.consensus.proposeBlock(recv.agreementModule.agreementID())
if err != nil || block == nil {
recv.consensus.logger.Error("Unable to propose block", "error", err)
return types.NullBlockHash
}
go func() {
if err := recv.consensus.preProcessBlock(block); err != nil {
recv.consensus.logger.Error("Failed to pre-process block", "error", err)
return
}
recv.consensus.logger.Debug("Calling Network.BroadcastBlock",
"block", block)
recv.consensus.network.BroadcastBlock(block)
}()
return block.Hash
}
func (recv *consensusBAReceiver) ConfirmBlock(
hash common.Hash, votes map[types.NodeID]*types.Vote) {
var (
block *types.Block
aID = recv.agreementModule.agreementID()
)
isEmptyBlockConfirmed := hash == common.Hash{}
if isEmptyBlockConfirmed {
recv.consensus.logger.Info("Empty block is confirmed", "position", aID)
var err error
block, err = recv.consensus.bcModule.addEmptyBlock(aID)
if err != nil {
recv.consensus.logger.Error("Add position for empty failed",
"error", err)
return
}
if block == nil {
// The empty block's parent is not found locally, thus we can't
// propose it at this moment.
//
// We can only rely on block pulling upon receiving
// types.AgreementResult from the next position.
recv.consensus.logger.Warn(
"An empty block is confirmed without its parent",
"position", aID)
return
}
} else {
var exist bool
block, exist = recv.agreementModule.findBlockNoLock(hash)
if !exist {
recv.consensus.logger.Error("Unknown block confirmed",
"hash", hash.String()[:6])
ch := make(chan *types.Block)
func() {
recv.consensus.lock.Lock()
defer recv.consensus.lock.Unlock()
recv.consensus.baConfirmedBlock[hash] = ch
}()
go func() {
hashes := common.Hashes{hash}
PullBlockLoop:
for {
recv.consensus.logger.Debug("Calling Network.PullBlock for BA block",
"hash", hash)
recv.consensus.network.PullBlocks(hashes)
select {
case block = <-ch:
break PullBlockLoop
case <-time.After(1 * time.Second):
}
}
recv.consensus.logger.Info("Receive unknown block",
"hash", hash.String()[:6],
"position", block.Position)
recv.agreementModule.addCandidateBlock(block)
recv.agreementModule.lock.Lock()
defer recv.agreementModule.lock.Unlock()
recv.ConfirmBlock(block.Hash, votes)
}()
return
}
}
if len(votes) == 0 && len(block.Randomness) == 0 {
recv.consensus.logger.Error("No votes to recover randomness",
"block", block)
} else if votes != nil {
voteList := make([]types.Vote, 0, len(votes))
IDs := make(cryptoDKG.IDs, 0, len(votes))
psigs := make([]cryptoDKG.PartialSignature, 0, len(votes))
for _, vote := range votes {
if vote.BlockHash != hash {
continue
}
if block.Position.Round >= DKGDelayRound {
ID, exist := recv.npks.IDMap[vote.ProposerID]
if !exist {
continue
}
IDs = append(IDs, ID)
psigs = append(psigs, vote.PartialSignature)
}
voteList = append(voteList, *vote)
}
if block.Position.Round >= DKGDelayRound {
rand, err := cryptoDKG.RecoverSignature(psigs, IDs)
if err != nil {
recv.consensus.logger.Warn("Unable to recover randomness",
"block", block,
"error", err)
} else {
block.Randomness = rand.Signature[:]
}
} else {
block.Randomness = NoRand
}
if recv.isNotary {
result := &types.AgreementResult{
BlockHash: block.Hash,
Position: block.Position,
Votes: voteList,
IsEmptyBlock: isEmptyBlockConfirmed,
Randomness: block.Randomness,
}
recv.consensus.baMgr.touchAgreementResult(result)
recv.consensus.logger.Debug("Broadcast AgreementResult",
"result", result)
recv.consensus.network.BroadcastAgreementResult(result)
if block.IsEmpty() {
recv.consensus.bcModule.addBlockRandomness(
block.Position, block.Randomness)
}
if block.Position.Round >= DKGDelayRound {
recv.consensus.logger.Debug(
"Broadcast finalized block",
"block", block)
recv.consensus.network.BroadcastBlock(block)
}
}
}
if !block.IsGenesis() &&
!recv.consensus.bcModule.confirmed(block.Position.Height-1) {
go func(hash common.Hash) {
parentHash := hash
for {
recv.consensus.logger.Warn("Parent block not confirmed",
"parent-hash", parentHash.String()[:6],
"cur-position", block.Position)
ch := make(chan *types.Block)
if !func() bool {
recv.consensus.lock.Lock()
defer recv.consensus.lock.Unlock()
if _, exist := recv.consensus.baConfirmedBlock[parentHash]; exist {
return false
}
recv.consensus.baConfirmedBlock[parentHash] = ch
return true
}() {
return
}
var block *types.Block
PullBlockLoop:
for {
recv.consensus.logger.Debug("Calling Network.PullBlock for parent",
"hash", parentHash)
recv.consensus.network.PullBlocks(common.Hashes{parentHash})
select {
case block = <-ch:
break PullBlockLoop
case <-time.After(1 * time.Second):
}
}
recv.consensus.logger.Info("Receive parent block",
"parent-hash", block.ParentHash.String()[:6],
"cur-position", block.Position)
if !block.IsFinalized() {
// TODO(jimmy): use a seperate message to pull finalized
// block. Here, we pull it again as workaround.
continue
}
recv.consensus.processBlockChan <- block
parentHash = block.ParentHash
if block.IsGenesis() || recv.consensus.bcModule.confirmed(
block.Position.Height-1) {
return
}
}
}(block.ParentHash)
}
if !block.IsEmpty() {
recv.consensus.processBlockChan <- block
}
// Clean the restartNotary channel so BA will not stuck by deadlock.
CleanChannelLoop:
for {
select {
case <-recv.restartNotary:
default:
break CleanChannelLoop
}
}
recv.restartNotary <- block.Position
}
func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
if !recv.isNotary {
return
}
recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes)
recv.consensus.network.PullBlocks(hashes)
}
func (recv *consensusBAReceiver) ReportForkVote(v1, v2 *types.Vote) {
recv.consensus.gov.ReportForkVote(v1, v2)
}
func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) {
recv.consensus.gov.ReportForkBlock(b1, b2)
}
// consensusDKGReceiver implements dkgReceiver.
type consensusDKGReceiver struct {
ID types.NodeID
gov Governance
signer *utils.Signer
nodeSetCache *utils.NodeSetCache
cfgModule *configurationChain
network Network
logger common.Logger
}
// ProposeDKGComplaint proposes a DKGComplaint.
func (recv *consensusDKGReceiver) ProposeDKGComplaint(
complaint *typesDKG.Complaint) {
if err := recv.signer.SignDKGComplaint(complaint); err != nil {
recv.logger.Error("Failed to sign DKG complaint", "error", err)
return
}
recv.logger.Debug("Calling Governace.AddDKGComplaint",
"complaint", complaint)
recv.gov.AddDKGComplaint(complaint)
}
// ProposeDKGMasterPublicKey propose a DKGMasterPublicKey.
func (recv *consensusDKGReceiver) ProposeDKGMasterPublicKey(
mpk *typesDKG.MasterPublicKey) {
if err := recv.signer.SignDKGMasterPublicKey(mpk); err != nil {
recv.logger.Error("Failed to sign DKG master public key", "error", err)
return
}
recv.logger.Debug("Calling Governance.AddDKGMasterPublicKey", "key", mpk)
recv.gov.AddDKGMasterPublicKey(mpk)
}
// ProposeDKGPrivateShare propose a DKGPrivateShare.
func (recv *consensusDKGReceiver) ProposeDKGPrivateShare(
prv *typesDKG.PrivateShare) {
if err := recv.signer.SignDKGPrivateShare(prv); err != nil {
recv.logger.Error("Failed to sign DKG private share", "error", err)
return
}
receiverPubKey, exists := recv.nodeSetCache.GetPublicKey(prv.ReceiverID)
if !exists {
recv.logger.Error("Public key for receiver not found",
"receiver", prv.ReceiverID.String()[:6])
return
}
if prv.ReceiverID == recv.ID {
go func() {
if err := recv.cfgModule.processPrivateShare(prv); err != nil {
recv.logger.Error("Failed to process self private share", "prvShare", prv)
}
}()
} else {
recv.logger.Debug("Calling Network.SendDKGPrivateShare",
"receiver", hex.EncodeToString(receiverPubKey.Bytes()))
recv.network.SendDKGPrivateShare(receiverPubKey, prv)
}
}
// ProposeDKGAntiNackComplaint propose a DKGPrivateShare as an anti complaint.
func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint(
prv *typesDKG.PrivateShare) {
if prv.ProposerID == recv.ID {
if err := recv.signer.SignDKGPrivateShare(prv); err != nil {
recv.logger.Error("Failed sign DKG private share", "error", err)
return
}
}
recv.logger.Debug("Calling Network.BroadcastDKGPrivateShare", "share", prv)
recv.network.BroadcastDKGPrivateShare(prv)
}
// ProposeDKGMPKReady propose a DKGMPKReady message.
func (recv *consensusDKGReceiver) ProposeDKGMPKReady(ready *typesDKG.MPKReady) {
if err := recv.signer.SignDKGMPKReady(ready); err != nil {
recv.logger.Error("Failed to sign DKG ready", "error", err)
return
}
recv.logger.Debug("Calling Governance.AddDKGMPKReady", "ready", ready)
recv.gov.AddDKGMPKReady(ready)
}
// ProposeDKGFinalize propose a DKGFinalize message.
func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) {
if err := recv.signer.SignDKGFinalize(final); err != nil {
recv.logger.Error("Failed to sign DKG finalize", "error", err)
return
}
recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final)
recv.gov.AddDKGFinalize(final)
}
// Consensus implements DEXON Consensus algorithm.
type Consensus struct {
// Node Info.
ID types.NodeID
signer *utils.Signer
// BA.
baMgr *agreementMgr
baConfirmedBlock map[common.Hash]chan<- *types.Block
// DKG.
dkgRunning int32
dkgReady *sync.Cond
cfgModule *configurationChain
// Interfaces.
db db.Database
app Application
debugApp Debug
gov Governance
network Network
// Misc.
bcModule *blockChain
dMoment time.Time
nodeSetCache *utils.NodeSetCache
tsigVerifierCache *TSigVerifierCache
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
event *common.Event
roundEvent *utils.RoundEvent
logger common.Logger
resetDeliveryGuardTicker chan struct{}
msgChan chan interface{}
waitGroup sync.WaitGroup
processBlockChan chan *types.Block
// Context of Dummy receiver during switching from syncer.
dummyCancel context.CancelFunc
dummyFinished <-chan struct{}
dummyMsgBuffer []interface{}
}
// NewConsensus construct an Consensus instance.
func NewConsensus(
dMoment time.Time,
app Application,
gov Governance,
db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
return newConsensusForRound(
nil, dMoment, app, gov, db, network, prv, logger, true)
}
// NewConsensusForSimulation creates an instance of Consensus for simulation,
// the only difference with NewConsensus is nonblocking of app.
func NewConsensusForSimulation(
dMoment time.Time,
app Application,
gov Governance,
db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
return newConsensusForRound(
nil, dMoment, app, gov, db, network, prv, logger, false)
}
// NewConsensusFromSyncer constructs an Consensus instance from information
// provided from syncer.
//
// You need to provide the initial block for this newly created Consensus
// instance to bootstrap with. A proper choice is the last finalized block you
// delivered to syncer.
//
// NOTE: those confirmed blocks should be organized by chainID and sorted by
// their positions, in ascending order.
func NewConsensusFromSyncer(
initBlock *types.Block,
startWithEmpty bool,
dMoment time.Time,
app Application,
gov Governance,
db db.Database,
networkModule Network,
prv crypto.PrivateKey,
confirmedBlocks []*types.Block,
cachedMessages []interface{},
logger common.Logger) (*Consensus, error) {
// Setup Consensus instance.
con := newConsensusForRound(initBlock, dMoment, app, gov, db,
networkModule, prv, logger, true)
// Launch a dummy receiver before we start receiving from network module.
con.dummyMsgBuffer = cachedMessages
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
con.ctx, networkModule.ReceiveChan(), func(msg interface{}) {
con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
})
// Dump all BA-confirmed blocks to the consensus instance, make sure these
// added blocks forming a DAG.
refBlock := initBlock
for _, b := range confirmedBlocks {
// Only when its parent block is already added to lattice, we can
// then add this block. If not, our pulling mechanism would stop at
// the block we added, and lost its parent block forever.
if b.Position.Height != refBlock.Position.Height+1 {
break
}
if err := con.processBlock(b); err != nil {
return nil, err
}
refBlock = b
}
if startWithEmpty {
emptyPos := types.Position{
Round: con.bcModule.tipRound(),
Height: initBlock.Position.Height + 1,
}
_, err := con.bcModule.addEmptyBlock(emptyPos)
if err != nil {
panic(err)
}
}
return con, nil
}
// newConsensusForRound creates a Consensus instance.
func newConsensusForRound(
initBlock *types.Block,
dMoment time.Time,
app Application,
gov Governance,
db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger,
usingNonBlocking bool) *Consensus {
// TODO(w): load latest blockHeight from DB, and use config at that height.
nodeSetCache := utils.NewNodeSetCache(gov)
// Setup signer module.
signer := utils.NewSigner(prv)
// Check if the application implement Debug interface.
var debugApp Debug
if a, ok := app.(Debug); ok {
debugApp = a
}
// Get configuration for bootstrap round.
initPos := types.Position{
Round: 0,
Height: types.GenesisHeight,
}
if initBlock != nil {
initPos = initBlock.Position
}
// Init configuration chain.
ID := types.NewNodeID(prv.PublicKey())
recv := &consensusDKGReceiver{
ID: ID,
gov: gov,
signer: signer,
nodeSetCache: nodeSetCache,
network: network,
logger: logger,
}
cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger)
recv.cfgModule = cfgModule
appModule := app
if usingNonBlocking {
appModule = newNonBlocking(app, debugApp)
}
tsigVerifierCache := NewTSigVerifierCache(gov, 7)
bcModule := newBlockChain(ID, dMoment, initBlock, appModule,
tsigVerifierCache, signer, logger)
// Construct Consensus instance.
con := &Consensus{
ID: ID,
app: appModule,
debugApp: debugApp,
gov: gov,
db: db,
network: network,
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
bcModule: bcModule,
dMoment: dMoment,
nodeSetCache: nodeSetCache,
tsigVerifierCache: tsigVerifierCache,
signer: signer,
event: common.NewEvent(),
logger: logger,
resetDeliveryGuardTicker: make(chan struct{}),
msgChan: make(chan interface{}, 1024),
processBlockChan: make(chan *types.Block, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
var err error
con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initPos,
ConfigRoundShift)
if err != nil {
panic(err)
}
if con.baMgr, err = newAgreementMgr(con); err != nil {
panic(err)
}
if err = con.prepare(initBlock); err != nil {
panic(err)
}
return con
}
// prepare the Consensus instance to be ready for blocks after 'initBlock'.
// 'initBlock' could be either:
// - nil
// - the last finalized block
func (con *Consensus) prepare(initBlock *types.Block) (err error) {
// Trigger the round validation method for the next round of the first
// round.
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
initRound := uint64(0)
if initBlock != nil {
initRound = initBlock.Position.Round
}
if initRound == 0 {
if DKGDelayRound == 0 {
panic("not implemented yet")
}
}
// Measure time elapse for each handler of round events.
elapse := func(what string, lastE utils.RoundEventParam) func() {
start := time.Now()
con.logger.Info("Handle round event",
"what", what,
"event", lastE)
return func() {
con.logger.Info("Finish round event",
"what", what,
"event", lastE,
"elapse", time.Since(start))
}
}
// Register round event handler to purge cached node set. To make sure each
// modules see the up-to-date node set, we need to make sure this action
// should be taken as the first one.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
defer elapse("purge-cache", evts[len(evts)-1])()
for _, e := range evts {
if e.Reset == 0 {
continue
}
con.nodeSetCache.Purge(e.Round + 1)
con.tsigVerifierCache.Purge(e.Round + 1)
}
})
// Register round event handler to abort previous running DKG if any.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
go func() {
defer elapse("abort-DKG", e)()
if e.Reset > 0 {
aborted := con.cfgModule.abortDKG(con.ctx, e.Round+1, e.Reset-1)
con.logger.Info("DKG aborting result",
"round", e.Round+1,
"reset", e.Reset-1,
"aborted", aborted)
}
}()
})
// Register round event handler to update BA and BC modules.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
defer elapse("append-config", evts[len(evts)-1])()
// Always updates newer configs to the later modules first in the data
// flow.
if err := con.bcModule.notifyRoundEvents(evts); err != nil {
panic(err)
}
if err := con.baMgr.notifyRoundEvents(evts); err != nil {
panic(err)
}
})
// Register round event handler to reset DKG if the DKG set for next round
// failed to setup.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
defer elapse("reset-DKG", e)()
nextRound := e.Round + 1
if nextRound < DKGDelayRound {
return
}
curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round)
if err != nil {
con.logger.Error("Error getting notary set when proposing CRS",
"round", e.Round,
"error", err)
return
}
if _, exist := curNotarySet[con.ID]; !exist {
return
}
isDKGValid := func() bool {
nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
con.logger)
if !con.gov.IsDKGFinal(nextRound) {
con.logger.Error("Next DKG is not final, reset it",
"round", e.Round,
"reset", e.Reset)
return false
}
gpk, err := typesDKG.NewGroupPublicKey(
nextRound,
con.gov.DKGMasterPublicKeys(nextRound),
con.gov.DKGComplaints(nextRound),
utils.GetDKGThreshold(nextConfig))
if err != nil {
con.logger.Error("Next DKG failed to prepare, reset it",
"round", e.Round,
"reset", e.Reset,
"error", err)
return false
}
if len(gpk.QualifyNodeIDs) < utils.GetDKGValidThreshold(nextConfig) {
return false
}
return true
}
con.event.RegisterHeight(e.NextDKGResetHeight(), func(uint64) {
if isDKGValid() {
return
}
// Aborting all previous running DKG protocol instance if any.
go con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true)
})
})
// Register round event handler to propose new CRS.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
// We don't have to propose new CRS during DKG reset, the reset of DKG
// would be done by the notary set in previous round.
e := evts[len(evts)-1]
defer elapse("propose-CRS", e)()
if e.Reset != 0 || e.Round < DKGDelayRound {
return
}
if curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round); err != nil {
con.logger.Error("Error getting notary set when proposing CRS",
"round", e.Round,
"error", err)
} else {
if _, exist := curNotarySet[con.ID]; !exist {
return
}
con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) {
con.logger.Debug(
"Calling Governance.CRS to check if already proposed",
"round", e.Round+1)
if (con.gov.CRS(e.Round+1) != common.Hash{}) {
con.logger.Debug("CRS already proposed", "round", e.Round+1)
return
}
go con.runCRS(e.Round, e.CRS, false)
})
}
})
// Touch nodeSetCache for next round.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
defer elapse("touch-NodeSetCache", e)()
con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) {
if e.Reset == 0 {
return
}
go func() {
nextRound := e.Round + 1
if err := con.nodeSetCache.Touch(nextRound); err != nil {
con.logger.Warn("Failed to update nodeSetCache",
"round", nextRound,
"error", err)
}
}()
})
})
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
if e.Reset != 0 {
return
}
defer elapse("touch-DKGCache", e)()
go func() {
if _, err :=
con.tsigVerifierCache.Update(e.Round); err != nil {
con.logger.Warn("Failed to update tsig cache",
"round", e.Round,
"error", err)
}
}()
go func() {
threshold := utils.GetDKGThreshold(
utils.GetConfigWithPanic(con.gov, e.Round, con.logger))
// Restore group public key.
con.logger.Debug(
"Calling Governance.DKGMasterPublicKeys for recoverDKGInfo",
"round", e.Round)
con.logger.Debug(
"Calling Governance.DKGComplaints for recoverDKGInfo",
"round", e.Round)
_, qualifies, err := typesDKG.CalcQualifyNodes(
con.gov.DKGMasterPublicKeys(e.Round),
con.gov.DKGComplaints(e.Round),
threshold)
if err != nil {
con.logger.Warn("Failed to calculate dkg set",
"round", e.Round,
"error", err)
return
}
if _, exist := qualifies[con.ID]; !exist {
return
}
if _, _, err :=
con.cfgModule.getDKGInfo(e.Round, true); err != nil {
con.logger.Warn("Failed to recover DKG info",
"round", e.Round,
"error", err)
}
}()
})
// checkCRS is a generator of checker to check if CRS for that round is
// ready or not.
checkCRS := func(round uint64) func() bool {
return func() bool {
nextCRS := con.gov.CRS(round)
if (nextCRS != common.Hash{}) {
return true
}
con.logger.Debug("CRS is not ready yet. Try again later...",
"nodeID", con.ID,
"round", round)
return false
}
}
// Trigger round validation method for next period.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
defer elapse("next-round", e)()
// Register a routine to trigger round events.
con.event.RegisterHeight(e.NextRoundValidationHeight(),
utils.RoundEventRetryHandlerGenerator(con.roundEvent, con.event))
// Register a routine to register next DKG.
con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
nextRound := e.Round + 1
if nextRound < DKGDelayRound {
con.logger.Info("Skip runDKG for round",
"round", nextRound,
"reset", e.Reset)
return
}
go func() {
// Normally, gov.CRS would return non-nil. Use this for in case
// of unexpected network fluctuation and ensure the robustness.
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
con.logger.Debug("unable to prepare CRS for notary set",
"round", nextRound,
"reset", e.Reset)
return
}
nextNotarySet, err := con.nodeSetCache.GetNotarySet(nextRound)
if err != nil {
con.logger.Error("Error getting notary set for next round",
"round", nextRound,
"reset", e.Reset,
"error", err)
return
}
if _, exist := nextNotarySet[con.ID]; !exist {
con.logger.Info("Not selected as notary set",
"round", nextRound,
"reset", e.Reset)
return
}
con.logger.Info("Selected as notary set",
"round", nextRound,
"reset", e.Reset)
nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
con.logger)
con.cfgModule.registerDKG(con.ctx, nextRound, e.Reset,
utils.GetDKGThreshold(nextConfig))
con.event.RegisterHeight(e.NextDKGPreparationHeight(),
func(h uint64) {
func() {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
con.dkgRunning = 0
}()
// We want to skip some of the DKG phases when started.
dkgCurrentHeight := h - e.NextDKGPreparationHeight()
con.runDKG(
nextRound, e.Reset,
e.NextDKGPreparationHeight(), dkgCurrentHeight)
})
}()
})
})
con.roundEvent.TriggerInitEvent()
if initBlock != nil {
con.event.NotifyHeight(initBlock.Position.Height)
}
con.baMgr.prepare()
return
}
// Run starts running DEXON Consensus.
func (con *Consensus) Run() {
// There may have emptys block in blockchain added by force sync.
blocksWithoutRandomness := con.bcModule.pendingBlocksWithoutRandomness()
// Launch BA routines.
con.baMgr.run()
// Launch network handler.
con.logger.Debug("Calling Network.ReceiveChan")
con.waitGroup.Add(1)
go con.deliverNetworkMsg()
con.waitGroup.Add(1)
go con.processMsg()
go con.processBlockLoop()
// Stop dummy receiver if launched.
if con.dummyCancel != nil {
con.logger.Trace("Stop dummy receiver")
con.dummyCancel()
<-con.dummyFinished
// Replay those cached messages.
con.logger.Trace("Dummy receiver stoped, start dumping cached messages",
"count", len(con.dummyMsgBuffer))
for _, msg := range con.dummyMsgBuffer {
loop:
for {
select {
case con.msgChan <- msg:
break loop
case <-time.After(50 * time.Millisecond):
con.logger.Debug(
"internal message channel is full when syncing")
}
}
}
con.logger.Trace("Finish dumping cached messages")
}
con.generateBlockRandomness(blocksWithoutRandomness)
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Take some time to bootstrap.
time.Sleep(3 * time.Second)
con.waitGroup.Add(1)
go con.deliveryGuard()
// Block until done.
select {
case <-con.ctx.Done():
}
}
func (con *Consensus) generateBlockRandomness(blocks []*types.Block) {
con.logger.Debug("Start generating block randomness", "blocks", blocks)
isNotarySet := make(map[uint64]bool)
for _, block := range blocks {
if block.Position.Round < DKGDelayRound {
continue
}
doRun, exist := isNotarySet[block.Position.Round]
if !exist {
curNotarySet, err := con.nodeSetCache.GetNotarySet(block.Position.Round)
if err != nil {
con.logger.Error("Error getting notary set when generate block tsig",
"round", block.Position.Round,
"error", err)
continue
}
_, exist := curNotarySet[con.ID]
isNotarySet[block.Position.Round] = exist
doRun = exist
}
if !doRun {
continue
}
go func(block *types.Block) {
psig, err := con.cfgModule.preparePartialSignature(
block.Position.Round, block.Hash)
if err != nil {
con.logger.Error("Failed to prepare partial signature",
"block", block,
"error", err)
} else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
con.logger.Error("Failed to sign DKG partial signature",
"block", block,
"error", err)
} else if err = con.cfgModule.processPartialSignature(psig); err != nil {
con.logger.Error("Failed to process partial signature",
"block", block,
"error", err)
} else {
con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
"proposer", psig.ProposerID,
"block", block)
con.network.BroadcastDKGPartialSignature(psig)
sig, err := con.cfgModule.runTSig(block.Position.Round, block.Hash)
if err != nil {
con.logger.Error("Failed to run Block Tsig",
"block", block,
"error", err)
return
}
result := &types.AgreementResult{
BlockHash: block.Hash,
Position: block.Position,
Randomness: sig.Signature[:],
}
con.bcModule.addBlockRandomness(block.Position, sig.Signature[:])
con.logger.Debug("Broadcast BlockRandomness",
"block", block,
"result", result)
con.network.BroadcastAgreementResult(result)
}
}(block)
}
}
// runDKG starts running DKG protocol.
func (con *Consensus) runDKG(
round, reset, dkgBeginHeight, dkgHeight uint64) {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
if con.dkgRunning != 0 {
return
}
con.dkgRunning = 1
go func() {
defer func() {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
con.dkgReady.Broadcast()
con.dkgRunning = 2
}()
if err :=
con.cfgModule.runDKG(
round, reset,
con.event, dkgBeginHeight, dkgHeight); err != nil {
con.logger.Error("Failed to runDKG", "error", err)
}
}()
}
func (con *Consensus) runCRS(round uint64, hash common.Hash, reset bool) {
// Start running next round CRS.
psig, err := con.cfgModule.preparePartialSignature(round, hash)
if err != nil {
con.logger.Error("Failed to prepare partial signature", "error", err)
} else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
con.logger.Error("Failed to sign DKG partial signature", "error", err)
} else if err = con.cfgModule.processPartialSignature(psig); err != nil {
con.logger.Error("Failed to process partial signature", "error", err)
} else {
con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
"proposer", psig.ProposerID,
"round", psig.Round,
"hash", psig.Hash)
con.network.BroadcastDKGPartialSignature(psig)
con.logger.Debug("Calling Governance.CRS", "round", round)
crs, err := con.cfgModule.runCRSTSig(round, hash)
if err != nil {
con.logger.Error("Failed to run CRS Tsig", "error", err)
} else {
if reset {
con.logger.Debug("Calling Governance.ResetDKG",
"round", round+1,
"crs", hex.EncodeToString(crs))
con.gov.ResetDKG(crs)
} else {
con.logger.Debug("Calling Governance.ProposeCRS",
"round", round+1,
"crs", hex.EncodeToString(crs))
con.gov.ProposeCRS(round+1, crs)
}
}
}
}
// Stop the Consensus core.
func (con *Consensus) Stop() {
con.ctxCancel()
con.baMgr.stop()
con.event.Reset()
con.waitGroup.Wait()
if nbApp, ok := con.app.(*nonBlocking); ok {
nbApp.wait()
}
}
func (con *Consensus) deliverNetworkMsg() {
defer con.waitGroup.Done()
recv := con.network.ReceiveChan()
for {
select {
case <-con.ctx.Done():
return
default:
}
select {
case msg := <-recv:
innerLoop:
for {
select {
case con.msgChan <- msg:
break innerLoop
case <-time.After(500 * time.Millisecond):
con.logger.Debug("internal message channel is full",
"pending", msg)
}
}
case <-con.ctx.Done():
return
}
}
}
func (con *Consensus) processMsg() {
defer con.waitGroup.Done()
MessageLoop:
for {
select {
case <-con.ctx.Done():
return
default:
}
var msg interface{}
select {
case msg = <-con.msgChan:
case <-con.ctx.Done():
return
}
switch val := msg.(type) {
case *types.Block:
if ch, exist := func() (chan<- *types.Block, bool) {
con.lock.RLock()
defer con.lock.RUnlock()
ch, e := con.baConfirmedBlock[val.Hash]
return ch, e
}(); exist {
if val.IsEmpty() {
hash, err := utils.HashBlock(val)
if err != nil {
con.logger.Error("error verifying empty block hash",
"block", val,
"error, err")
continue MessageLoop
}
if hash != val.Hash {
con.logger.Error("incorrect confirmed empty block hash",
"block", val,
"hash", hash)
continue MessageLoop
}
if _, err := con.bcModule.proposeBlock(
val.Position, time.Time{}, true); err != nil {
con.logger.Error("error adding empty block",
"block", val,
"error", err)
continue MessageLoop
}
} else {
ok, err := con.bcModule.verifyRandomness(
val.Hash, val.Position.Round, val.Randomness)
if err != nil {
con.logger.Error("error verifying confirmed block randomness",
"block", val,
"error", err)
continue MessageLoop
}
if !ok {
con.logger.Error("incorrect confirmed block randomness",
"block", val)
continue MessageLoop
}
if err := utils.VerifyBlockSignature(val); err != nil {
con.logger.Error("VerifyBlockSignature failed",
"block", val,
"error", err)
continue MessageLoop
}
}
func() {
con.lock.Lock()
defer con.lock.Unlock()
// In case of multiple delivered block.
if _, exist := con.baConfirmedBlock[val.Hash]; !exist {
return
}
delete(con.baConfirmedBlock, val.Hash)
ch <- val
}()
} else if val.IsFinalized() {
if err := con.processFinalizedBlock(val); err != nil {
con.logger.Error("Failed to process finalized block",
"block", val,
"error", err)
}
} else {
if err := con.preProcessBlock(val); err != nil {
con.logger.Error("Failed to pre process block",
"block", val,
"error", err)
}
}
case *types.Vote:
if err := con.ProcessVote(val); err != nil {
con.logger.Error("Failed to process vote",
"vote", val,
"error", err)
}
case *types.AgreementResult:
if err := con.ProcessAgreementResult(val); err != nil {
con.logger.Error("Failed to process agreement result",
"result", val,
"error", err)
}
case *typesDKG.PrivateShare:
if err := con.cfgModule.processPrivateShare(val); err != nil {
con.logger.Error("Failed to process private share",
"error", err)
}
case *typesDKG.PartialSignature:
if err := con.cfgModule.processPartialSignature(val); err != nil {
con.logger.Error("Failed to process partial signature",
"error", err)
}
}
}
}
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
err = con.baMgr.processVote(vote)
return
}
// ProcessAgreementResult processes the randomness request.
func (con *Consensus) ProcessAgreementResult(
rand *types.AgreementResult) error {
if !con.baMgr.touchAgreementResult(rand) {
return nil
}
// Sanity Check.
if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
con.baMgr.untouchAgreementResult(rand)
return err
}
if err := con.bcModule.processAgreementResult(rand); err != nil {
con.baMgr.untouchAgreementResult(rand)
if err == ErrSkipButNoError {
return nil
}
return err
}
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
con.baMgr.untouchAgreementResult(rand)
return err
}
con.logger.Debug("Rebroadcast AgreementResult",
"result", rand)
con.network.BroadcastAgreementResult(rand)
return con.deliverFinalizedBlocks()
}
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
err = con.baMgr.processBlock(b)
if err == nil && con.debugApp != nil {
con.debugApp.BlockReceived(b.Hash)
}
return
}
func (con *Consensus) processFinalizedBlock(b *types.Block) (err error) {
if b.Position.Round < DKGDelayRound {
return
}
if err = utils.VerifyBlockSignature(b); err != nil {
return
}
verifier, ok, err := con.tsigVerifierCache.UpdateAndGet(b.Position.Round)
if err != nil {
return
}
if !ok {
err = ErrCannotVerifyBlockRandomness
return
}
if !verifier.VerifySignature(b.Hash, crypto.Signature{
Type: "bls",
Signature: b.Randomness,
}) {
err = ErrIncorrectBlockRandomness
return
}
err = con.baMgr.processFinalizedBlock(b)
if err == nil && con.debugApp != nil {
con.debugApp.BlockReceived(b.Hash)
}
return
}
func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
select {
case <-con.ctx.Done():
case <-time.After(con.dMoment.Sub(time.Now())):
}
// Node takes time to start.
select {
case <-con.ctx.Done():
case <-time.After(60 * time.Second):
}
for {
select {
case <-con.ctx.Done():
return
default:
}
select {
case <-con.ctx.Done():
return
case <-con.resetDeliveryGuardTicker:
case <-time.After(60 * time.Second):
con.logger.Error("No blocks delivered for too long", "ID", con.ID)
panic(fmt.Errorf("No blocks delivered for too long"))
}
}
}
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
select {
case con.resetDeliveryGuardTicker <- struct{}{}:
default:
}
if err := con.db.PutBlock(*b); err != nil {
panic(err)
}
if err := con.db.PutCompactionChainTipInfo(b.Hash,
b.Position.Height); err != nil {
panic(err)
}
con.logger.Debug("Calling Application.BlockDelivered", "block", b)
con.app.BlockDelivered(b.Hash, b.Position, common.CopyBytes(b.Randomness))
if con.debugApp != nil {
con.debugApp.BlockReady(b.Hash)
}
}
// deliverFinalizedBlocks extracts and delivers finalized blocks to application
// layer.
func (con *Consensus) deliverFinalizedBlocks() error {
con.lock.Lock()
defer con.lock.Unlock()
return con.deliverFinalizedBlocksWithoutLock()
}
func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
deliveredBlocks := con.bcModule.extractBlocks()
con.logger.Debug("Last blocks in compaction chain",
"delivered", con.bcModule.lastDeliveredBlock(),
"pending", con.bcModule.lastPendingBlock())
for _, b := range deliveredBlocks {
con.deliverBlock(b)
con.event.NotifyHeight(b.Position.Height)
}
return
}
func (con *Consensus) processBlockLoop() {
for {
select {
case <-con.ctx.Done():
return
default:
}
select {
case <-con.ctx.Done():
return
case block := <-con.processBlockChan:
if err := con.processBlock(block); err != nil {
con.logger.Error("Error processing block",
"block", block,
"error", err)
}
}
}
}
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
// Block processed by blockChain can be out-of-order. But the output from
// blockChain (deliveredBlocks) cannot, thus we need to protect the part
// below with writer lock.
con.lock.Lock()
defer con.lock.Unlock()
if err = con.bcModule.addBlock(block); err != nil {
return
}
if err = con.deliverFinalizedBlocksWithoutLock(); err != nil {
return
}
return
}
// PrepareBlock would setup header fields of block based on its ProposerID.
func (con *Consensus) proposeBlock(position types.Position) (
*types.Block, error) {
b, err := con.bcModule.proposeBlock(position, time.Now().UTC(), false)
if err != nil {
return nil, err
}
con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round)
crs := con.gov.CRS(b.Position.Round)
if crs.Equal(common.Hash{}) {
con.logger.Error("CRS for round is not ready, unable to prepare block",
"position", &b.Position)
return nil, ErrCRSNotReady
}
if err = con.signer.SignCRS(b, crs); err != nil {
return nil, err
}
return b, nil
}