aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/byzantine-lab/dexon-consensus/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/byzantine-lab/dexon-consensus/core/consensus.go')
-rw-r--r--vendor/github.com/byzantine-lab/dexon-consensus/core/consensus.go1567
1 files changed, 1567 insertions, 0 deletions
diff --git a/vendor/github.com/byzantine-lab/dexon-consensus/core/consensus.go b/vendor/github.com/byzantine-lab/dexon-consensus/core/consensus.go
new file mode 100644
index 000000000..8b2b9a048
--- /dev/null
+++ b/vendor/github.com/byzantine-lab/dexon-consensus/core/consensus.go
@@ -0,0 +1,1567 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "context"
+ "encoding/hex"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/byzantine-lab/dexon-consensus/common"
+ "github.com/byzantine-lab/dexon-consensus/core/crypto"
+ cryptoDKG "github.com/byzantine-lab/dexon-consensus/core/crypto/dkg"
+ "github.com/byzantine-lab/dexon-consensus/core/db"
+ "github.com/byzantine-lab/dexon-consensus/core/types"
+ typesDKG "github.com/byzantine-lab/dexon-consensus/core/types/dkg"
+ "github.com/byzantine-lab/dexon-consensus/core/utils"
+)
+
+// Errors for consensus core.
+var (
+ ErrProposerNotInNodeSet = fmt.Errorf(
+ "proposer is not in node set")
+ ErrIncorrectHash = fmt.Errorf(
+ "hash of block is incorrect")
+ ErrIncorrectSignature = fmt.Errorf(
+ "signature of block is incorrect")
+ ErrUnknownBlockProposed = fmt.Errorf(
+ "unknown block is proposed")
+ ErrIncorrectAgreementResultPosition = fmt.Errorf(
+ "incorrect agreement result position")
+ ErrNotEnoughVotes = fmt.Errorf(
+ "not enought votes")
+ ErrCRSNotReady = fmt.Errorf(
+ "CRS not ready")
+ ErrConfigurationNotReady = fmt.Errorf(
+ "Configuration not ready")
+ ErrIncorrectBlockRandomness = fmt.Errorf(
+ "randomness of block is incorrect")
+ ErrCannotVerifyBlockRandomness = fmt.Errorf(
+ "cannot verify block randomness")
+)
+
+type selfAgreementResult types.AgreementResult
+
+// consensusBAReceiver implements agreementReceiver.
+type consensusBAReceiver struct {
+ consensus *Consensus
+ agreementModule *agreement
+ emptyBlockHashMap *sync.Map
+ isNotary bool
+ restartNotary chan types.Position
+ npks *typesDKG.NodePublicKeys
+ psigSigner *dkgShareSecret
+}
+
+func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) (
+ common.Hash, error) {
+ hashVal, ok := recv.emptyBlockHashMap.Load(pos)
+ if ok {
+ return hashVal.(common.Hash), nil
+ }
+ emptyBlock, err := recv.consensus.bcModule.prepareBlock(
+ pos, time.Time{}, true)
+ if err != nil {
+ return common.Hash{}, err
+ }
+ hash, err := utils.HashBlock(emptyBlock)
+ if err != nil {
+ return common.Hash{}, err
+ }
+ recv.emptyBlockHashMap.Store(pos, hash)
+ return hash, nil
+}
+
+func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) (
+ bool, bool) {
+ if vote.Position.Round >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash {
+ if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
+ if recv.npks == nil {
+ recv.consensus.logger.Debug(
+ "Unable to verify psig, npks is nil",
+ "vote", vote)
+ return false, false
+ }
+ if vote.Position.Round != recv.npks.Round {
+ recv.consensus.logger.Debug(
+ "Unable to verify psig, round of npks mismatch",
+ "vote", vote,
+ "npksRound", recv.npks.Round)
+ return false, false
+ }
+ pubKey, exist := recv.npks.PublicKeys[vote.ProposerID]
+ if !exist {
+ recv.consensus.logger.Debug(
+ "Unable to verify psig, proposer is not qualified",
+ "vote", vote)
+ return false, true
+ }
+ blockHash := vote.BlockHash
+ if blockHash == types.NullBlockHash {
+ var err error
+ blockHash, err = recv.emptyBlockHash(vote.Position)
+ if err != nil {
+ recv.consensus.logger.Error(
+ "Failed to verify vote for empty block",
+ "position", vote.Position,
+ "error", err)
+ return false, true
+ }
+ }
+ return pubKey.VerifySignature(
+ blockHash, crypto.Signature(vote.PartialSignature)), true
+ }
+ }
+ return len(vote.PartialSignature.Signature) == 0, true
+}
+
+func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
+ if !recv.isNotary {
+ return
+ }
+ if recv.psigSigner != nil &&
+ vote.BlockHash != types.SkipBlockHash {
+ if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom {
+ if vote.BlockHash == types.NullBlockHash {
+ hash, err := recv.emptyBlockHash(vote.Position)
+ if err != nil {
+ recv.consensus.logger.Error(
+ "Failed to propose vote for empty block",
+ "position", vote.Position,
+ "error", err)
+ return
+ }
+ vote.PartialSignature = recv.psigSigner.sign(hash)
+ } else {
+ vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash)
+ }
+ }
+ }
+ if err := recv.agreementModule.prepareVote(vote); err != nil {
+ recv.consensus.logger.Error("Failed to prepare vote", "error", err)
+ return
+ }
+ go func() {
+ if err := recv.agreementModule.processVote(vote); err != nil {
+ recv.consensus.logger.Error("Failed to process self vote",
+ "error", err,
+ "vote", vote)
+ return
+ }
+ recv.consensus.logger.Debug("Calling Network.BroadcastVote",
+ "vote", vote)
+ recv.consensus.network.BroadcastVote(vote)
+ }()
+}
+
+func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
+ if !recv.isNotary {
+ return common.Hash{}
+ }
+ block, err := recv.consensus.proposeBlock(recv.agreementModule.agreementID())
+ if err != nil || block == nil {
+ recv.consensus.logger.Error("Unable to propose block", "error", err)
+ return types.NullBlockHash
+ }
+ go func() {
+ if err := recv.consensus.preProcessBlock(block); err != nil {
+ recv.consensus.logger.Error("Failed to pre-process block", "error", err)
+ return
+ }
+ recv.consensus.logger.Debug("Calling Network.BroadcastBlock",
+ "block", block)
+ recv.consensus.network.BroadcastBlock(block)
+ }()
+ return block.Hash
+}
+
+func (recv *consensusBAReceiver) ConfirmBlock(
+ hash common.Hash, votes map[types.NodeID]*types.Vote) {
+ var (
+ block *types.Block
+ aID = recv.agreementModule.agreementID()
+ )
+
+ isEmptyBlockConfirmed := hash == common.Hash{}
+ if isEmptyBlockConfirmed {
+ recv.consensus.logger.Info("Empty block is confirmed", "position", aID)
+ var err error
+ block, err = recv.consensus.bcModule.addEmptyBlock(aID)
+ if err != nil {
+ recv.consensus.logger.Error("Add position for empty failed",
+ "error", err)
+ return
+ }
+ if block == nil {
+ // The empty block's parent is not found locally, thus we can't
+ // propose it at this moment.
+ //
+ // We can only rely on block pulling upon receiving
+ // types.AgreementResult from the next position.
+ recv.consensus.logger.Warn(
+ "An empty block is confirmed without its parent",
+ "position", aID)
+ return
+ }
+ } else {
+ var exist bool
+ block, exist = recv.agreementModule.findBlockNoLock(hash)
+ if !exist {
+ recv.consensus.logger.Debug("Unknown block confirmed",
+ "hash", hash.String()[:6])
+ ch := make(chan *types.Block)
+ func() {
+ recv.consensus.lock.Lock()
+ defer recv.consensus.lock.Unlock()
+ recv.consensus.baConfirmedBlock[hash] = ch
+ }()
+ go func() {
+ hashes := common.Hashes{hash}
+ PullBlockLoop:
+ for {
+ recv.consensus.logger.Debug("Calling Network.PullBlock for BA block",
+ "hash", hash)
+ recv.consensus.network.PullBlocks(hashes)
+ select {
+ case block = <-ch:
+ break PullBlockLoop
+ case <-time.After(1 * time.Second):
+ }
+ }
+ recv.consensus.logger.Debug("Receive unknown block",
+ "hash", hash.String()[:6],
+ "position", block.Position)
+ recv.agreementModule.addCandidateBlock(block)
+ recv.agreementModule.lock.Lock()
+ defer recv.agreementModule.lock.Unlock()
+ recv.ConfirmBlock(block.Hash, votes)
+ }()
+ return
+ }
+ }
+
+ if len(votes) == 0 && len(block.Randomness) == 0 {
+ recv.consensus.logger.Error("No votes to recover randomness",
+ "block", block)
+ } else if votes != nil {
+ voteList := make([]types.Vote, 0, len(votes))
+ IDs := make(cryptoDKG.IDs, 0, len(votes))
+ psigs := make([]cryptoDKG.PartialSignature, 0, len(votes))
+ for _, vote := range votes {
+ if vote.BlockHash != hash {
+ continue
+ }
+ if block.Position.Round >= DKGDelayRound {
+ ID, exist := recv.npks.IDMap[vote.ProposerID]
+ if !exist {
+ continue
+ }
+ IDs = append(IDs, ID)
+ psigs = append(psigs, vote.PartialSignature)
+ } else {
+ voteList = append(voteList, *vote)
+ }
+ }
+ if block.Position.Round >= DKGDelayRound {
+ rand, err := cryptoDKG.RecoverSignature(psigs, IDs)
+ if err != nil {
+ recv.consensus.logger.Warn("Unable to recover randomness",
+ "block", block,
+ "error", err)
+ } else {
+ block.Randomness = rand.Signature[:]
+ }
+ } else {
+ block.Randomness = NoRand
+ }
+
+ if recv.isNotary {
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ IsEmptyBlock: isEmptyBlockConfirmed,
+ Randomness: block.Randomness,
+ }
+ // touchAgreementResult does not support concurrent access.
+ go func() {
+ recv.consensus.priorityMsgChan <- (*selfAgreementResult)(result)
+ }()
+ recv.consensus.logger.Debug("Broadcast AgreementResult",
+ "result", result)
+ recv.consensus.network.BroadcastAgreementResult(result)
+ if block.IsEmpty() {
+ recv.consensus.bcModule.addBlockRandomness(
+ block.Position, block.Randomness)
+ }
+ if block.Position.Round >= DKGDelayRound {
+ recv.consensus.logger.Debug(
+ "Broadcast finalized block",
+ "block", block)
+ recv.consensus.network.BroadcastBlock(block)
+ }
+ }
+ }
+
+ if !block.IsGenesis() &&
+ !recv.consensus.bcModule.confirmed(block.Position.Height-1) {
+ go func(hash common.Hash) {
+ parentHash := hash
+ for {
+ recv.consensus.logger.Warn("Parent block not confirmed",
+ "parent-hash", parentHash.String()[:6],
+ "cur-position", block.Position)
+ ch := make(chan *types.Block)
+ if !func() bool {
+ recv.consensus.lock.Lock()
+ defer recv.consensus.lock.Unlock()
+ if _, exist := recv.consensus.baConfirmedBlock[parentHash]; exist {
+ return false
+ }
+ recv.consensus.baConfirmedBlock[parentHash] = ch
+ return true
+ }() {
+ return
+ }
+ var block *types.Block
+ PullBlockLoop:
+ for {
+ recv.consensus.logger.Debug("Calling Network.PullBlock for parent",
+ "hash", parentHash)
+ recv.consensus.network.PullBlocks(common.Hashes{parentHash})
+ select {
+ case block = <-ch:
+ break PullBlockLoop
+ case <-time.After(1 * time.Second):
+ }
+ }
+ recv.consensus.logger.Info("Receive parent block",
+ "parent-hash", block.ParentHash.String()[:6],
+ "cur-position", block.Position)
+ if !block.IsFinalized() {
+ // TODO(jimmy): use a seperate message to pull finalized
+ // block. Here, we pull it again as workaround.
+ continue
+ }
+ recv.consensus.processBlockChan <- block
+ parentHash = block.ParentHash
+ if block.IsGenesis() || recv.consensus.bcModule.confirmed(
+ block.Position.Height-1) {
+ return
+ }
+ }
+ }(block.ParentHash)
+ }
+ if !block.IsEmpty() {
+ recv.consensus.processBlockChan <- block
+ }
+ // Clean the restartNotary channel so BA will not stuck by deadlock.
+CleanChannelLoop:
+ for {
+ select {
+ case <-recv.restartNotary:
+ default:
+ break CleanChannelLoop
+ }
+ }
+ recv.restartNotary <- block.Position
+}
+
+func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) {
+ if !recv.isNotary {
+ return
+ }
+ recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes)
+ recv.consensus.network.PullBlocks(hashes)
+}
+
+func (recv *consensusBAReceiver) ReportForkVote(v1, v2 *types.Vote) {
+ recv.consensus.gov.ReportForkVote(v1, v2)
+}
+
+func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) {
+ b1Clone := b1.Clone()
+ b2Clone := b2.Clone()
+ b1Clone.Payload = []byte{}
+ b2Clone.Payload = []byte{}
+ recv.consensus.gov.ReportForkBlock(b1Clone, b2Clone)
+}
+
+// consensusDKGReceiver implements dkgReceiver.
+type consensusDKGReceiver struct {
+ ID types.NodeID
+ gov Governance
+ signer *utils.Signer
+ nodeSetCache *utils.NodeSetCache
+ cfgModule *configurationChain
+ network Network
+ logger common.Logger
+}
+
+// ProposeDKGComplaint proposes a DKGComplaint.
+func (recv *consensusDKGReceiver) ProposeDKGComplaint(
+ complaint *typesDKG.Complaint) {
+ if err := recv.signer.SignDKGComplaint(complaint); err != nil {
+ recv.logger.Error("Failed to sign DKG complaint", "error", err)
+ return
+ }
+ recv.logger.Debug("Calling Governace.AddDKGComplaint",
+ "complaint", complaint)
+ recv.gov.AddDKGComplaint(complaint)
+}
+
+// ProposeDKGMasterPublicKey propose a DKGMasterPublicKey.
+func (recv *consensusDKGReceiver) ProposeDKGMasterPublicKey(
+ mpk *typesDKG.MasterPublicKey) {
+ if err := recv.signer.SignDKGMasterPublicKey(mpk); err != nil {
+ recv.logger.Error("Failed to sign DKG master public key", "error", err)
+ return
+ }
+ recv.logger.Debug("Calling Governance.AddDKGMasterPublicKey", "key", mpk)
+ recv.gov.AddDKGMasterPublicKey(mpk)
+}
+
+// ProposeDKGPrivateShare propose a DKGPrivateShare.
+func (recv *consensusDKGReceiver) ProposeDKGPrivateShare(
+ prv *typesDKG.PrivateShare) {
+ if err := recv.signer.SignDKGPrivateShare(prv); err != nil {
+ recv.logger.Error("Failed to sign DKG private share", "error", err)
+ return
+ }
+ receiverPubKey, exists := recv.nodeSetCache.GetPublicKey(prv.ReceiverID)
+ if !exists {
+ recv.logger.Error("Public key for receiver not found",
+ "receiver", prv.ReceiverID.String()[:6])
+ return
+ }
+ if prv.ReceiverID == recv.ID {
+ go func() {
+ if err := recv.cfgModule.processPrivateShare(prv); err != nil {
+ recv.logger.Error("Failed to process self private share", "prvShare", prv)
+ }
+ }()
+ } else {
+ recv.logger.Debug("Calling Network.SendDKGPrivateShare",
+ "receiver", hex.EncodeToString(receiverPubKey.Bytes()))
+ recv.network.SendDKGPrivateShare(receiverPubKey, prv)
+ }
+}
+
+// ProposeDKGAntiNackComplaint propose a DKGPrivateShare as an anti complaint.
+func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint(
+ prv *typesDKG.PrivateShare) {
+ if prv.ProposerID == recv.ID {
+ if err := recv.signer.SignDKGPrivateShare(prv); err != nil {
+ recv.logger.Error("Failed sign DKG private share", "error", err)
+ return
+ }
+ }
+ recv.logger.Debug("Calling Network.BroadcastDKGPrivateShare", "share", prv)
+ recv.network.BroadcastDKGPrivateShare(prv)
+}
+
+// ProposeDKGMPKReady propose a DKGMPKReady message.
+func (recv *consensusDKGReceiver) ProposeDKGMPKReady(ready *typesDKG.MPKReady) {
+ if err := recv.signer.SignDKGMPKReady(ready); err != nil {
+ recv.logger.Error("Failed to sign DKG ready", "error", err)
+ return
+ }
+ recv.logger.Debug("Calling Governance.AddDKGMPKReady", "ready", ready)
+ recv.gov.AddDKGMPKReady(ready)
+}
+
+// ProposeDKGFinalize propose a DKGFinalize message.
+func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) {
+ if err := recv.signer.SignDKGFinalize(final); err != nil {
+ recv.logger.Error("Failed to sign DKG finalize", "error", err)
+ return
+ }
+ recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final)
+ recv.gov.AddDKGFinalize(final)
+}
+
+// ProposeDKGSuccess propose a DKGSuccess message.
+func (recv *consensusDKGReceiver) ProposeDKGSuccess(success *typesDKG.Success) {
+ if err := recv.signer.SignDKGSuccess(success); err != nil {
+ recv.logger.Error("Failed to sign DKG successize", "error", err)
+ return
+ }
+ recv.logger.Debug("Calling Governance.AddDKGSuccess", "success", success)
+ recv.gov.AddDKGSuccess(success)
+}
+
+// Consensus implements DEXON Consensus algorithm.
+type Consensus struct {
+ // Node Info.
+ ID types.NodeID
+ signer *utils.Signer
+
+ // BA.
+ baMgr *agreementMgr
+ baConfirmedBlock map[common.Hash]chan<- *types.Block
+
+ // DKG.
+ dkgRunning int32
+ dkgReady *sync.Cond
+ cfgModule *configurationChain
+
+ // Interfaces.
+ db db.Database
+ app Application
+ debugApp Debug
+ gov Governance
+ network Network
+
+ // Misc.
+ bcModule *blockChain
+ dMoment time.Time
+ nodeSetCache *utils.NodeSetCache
+ tsigVerifierCache *TSigVerifierCache
+ lock sync.RWMutex
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ event *common.Event
+ roundEvent *utils.RoundEvent
+ logger common.Logger
+ resetDeliveryGuardTicker chan struct{}
+ msgChan chan types.Msg
+ priorityMsgChan chan interface{}
+ waitGroup sync.WaitGroup
+ processBlockChan chan *types.Block
+
+ // Context of Dummy receiver during switching from syncer.
+ dummyCancel context.CancelFunc
+ dummyFinished <-chan struct{}
+ dummyMsgBuffer []types.Msg
+}
+
+// NewConsensus construct an Consensus instance.
+func NewConsensus(
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+ return newConsensusForRound(
+ nil, dMoment, app, gov, db, network, prv, logger, true)
+}
+
+// NewConsensusForSimulation creates an instance of Consensus for simulation,
+// the only difference with NewConsensus is nonblocking of app.
+func NewConsensusForSimulation(
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+ return newConsensusForRound(
+ nil, dMoment, app, gov, db, network, prv, logger, false)
+}
+
+// NewConsensusFromSyncer constructs an Consensus instance from information
+// provided from syncer.
+//
+// You need to provide the initial block for this newly created Consensus
+// instance to bootstrap with. A proper choice is the last finalized block you
+// delivered to syncer.
+//
+// NOTE: those confirmed blocks should be organized by chainID and sorted by
+// their positions, in ascending order.
+func NewConsensusFromSyncer(
+ initBlock *types.Block,
+ startWithEmpty bool,
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ networkModule Network,
+ prv crypto.PrivateKey,
+ confirmedBlocks []*types.Block,
+ cachedMessages []types.Msg,
+ logger common.Logger) (*Consensus, error) {
+ // Setup Consensus instance.
+ con := newConsensusForRound(initBlock, dMoment, app, gov, db,
+ networkModule, prv, logger, true)
+ // Launch a dummy receiver before we start receiving from network module.
+ con.dummyMsgBuffer = cachedMessages
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ con.ctx, networkModule.ReceiveChan(), func(msg types.Msg) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ // Dump all BA-confirmed blocks to the consensus instance, make sure these
+ // added blocks forming a DAG.
+ refBlock := initBlock
+ for _, b := range confirmedBlocks {
+ // Only when its parent block is already added to lattice, we can
+ // then add this block. If not, our pulling mechanism would stop at
+ // the block we added, and lost its parent block forever.
+ if b.Position.Height != refBlock.Position.Height+1 {
+ break
+ }
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ refBlock = b
+ }
+ if startWithEmpty {
+ emptyPos := types.Position{
+ Round: con.bcModule.tipRound(),
+ Height: initBlock.Position.Height + 1,
+ }
+ _, err := con.bcModule.addEmptyBlock(emptyPos)
+ if err != nil {
+ panic(err)
+ }
+ }
+ return con, nil
+}
+
+// newConsensusForRound creates a Consensus instance.
+func newConsensusForRound(
+ initBlock *types.Block,
+ dMoment time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ network Network,
+ prv crypto.PrivateKey,
+ logger common.Logger,
+ usingNonBlocking bool) *Consensus {
+ // TODO(w): load latest blockHeight from DB, and use config at that height.
+ nodeSetCache := utils.NewNodeSetCache(gov)
+ // Setup signer module.
+ signer := utils.NewSigner(prv)
+ // Check if the application implement Debug interface.
+ var debugApp Debug
+ if a, ok := app.(Debug); ok {
+ debugApp = a
+ }
+ // Get configuration for bootstrap round.
+ initPos := types.Position{
+ Round: 0,
+ Height: types.GenesisHeight,
+ }
+ if initBlock != nil {
+ initPos = initBlock.Position
+ }
+ // Init configuration chain.
+ ID := types.NewNodeID(prv.PublicKey())
+ recv := &consensusDKGReceiver{
+ ID: ID,
+ gov: gov,
+ signer: signer,
+ nodeSetCache: nodeSetCache,
+ network: network,
+ logger: logger,
+ }
+ cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger)
+ recv.cfgModule = cfgModule
+ signer.SetBLSSigner(
+ func(round uint64, hash common.Hash) (crypto.Signature, error) {
+ _, signer, err := cfgModule.getDKGInfo(round, false)
+ if err != nil {
+ return crypto.Signature{}, err
+ }
+ return crypto.Signature(signer.sign(hash)), nil
+ })
+ appModule := app
+ if usingNonBlocking {
+ appModule = newNonBlocking(app, debugApp)
+ }
+ tsigVerifierCache := NewTSigVerifierCache(gov, 7)
+ bcModule := newBlockChain(ID, dMoment, initBlock, appModule,
+ tsigVerifierCache, signer, logger)
+ // Construct Consensus instance.
+ con := &Consensus{
+ ID: ID,
+ app: appModule,
+ debugApp: debugApp,
+ gov: gov,
+ db: db,
+ network: network,
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ bcModule: bcModule,
+ dMoment: dMoment,
+ nodeSetCache: nodeSetCache,
+ tsigVerifierCache: tsigVerifierCache,
+ signer: signer,
+ event: common.NewEvent(),
+ logger: logger,
+ resetDeliveryGuardTicker: make(chan struct{}),
+ msgChan: make(chan types.Msg, 1024),
+ priorityMsgChan: make(chan interface{}, 1024),
+ processBlockChan: make(chan *types.Block, 1024),
+ }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ var err error
+ con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initPos,
+ ConfigRoundShift)
+ if err != nil {
+ panic(err)
+ }
+ if con.baMgr, err = newAgreementMgr(con); err != nil {
+ panic(err)
+ }
+ if err = con.prepare(initBlock); err != nil {
+ panic(err)
+ }
+ return con
+}
+
+// prepare the Consensus instance to be ready for blocks after 'initBlock'.
+// 'initBlock' could be either:
+// - nil
+// - the last finalized block
+func (con *Consensus) prepare(initBlock *types.Block) (err error) {
+ // Trigger the round validation method for the next round of the first
+ // round.
+ // The block past from full node should be delivered already or known by
+ // full node. We don't have to notify it.
+ initRound := uint64(0)
+ if initBlock != nil {
+ initRound = initBlock.Position.Round
+ }
+ if initRound == 0 {
+ if DKGDelayRound == 0 {
+ panic("not implemented yet")
+ }
+ }
+ // Measure time elapse for each handler of round events.
+ elapse := func(what string, lastE utils.RoundEventParam) func() {
+ start := time.Now()
+ con.logger.Info("Handle round event",
+ "what", what,
+ "event", lastE)
+ return func() {
+ con.logger.Info("Finish round event",
+ "what", what,
+ "event", lastE,
+ "elapse", time.Since(start))
+ }
+ }
+ // Register round event handler to purge cached node set. To make sure each
+ // modules see the up-to-date node set, we need to make sure this action
+ // should be taken as the first one.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ defer elapse("purge-cache", evts[len(evts)-1])()
+ for _, e := range evts {
+ if e.Reset == 0 {
+ continue
+ }
+ con.nodeSetCache.Purge(e.Round + 1)
+ con.tsigVerifierCache.Purge(e.Round + 1)
+ }
+ })
+ // Register round event handler to abort previous running DKG if any.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ go func() {
+ defer elapse("abort-DKG", e)()
+ if e.Reset > 0 {
+ aborted := con.cfgModule.abortDKG(con.ctx, e.Round+1, e.Reset-1)
+ con.logger.Info("DKG aborting result",
+ "round", e.Round+1,
+ "reset", e.Reset-1,
+ "aborted", aborted)
+ }
+ }()
+ })
+ // Register round event handler to update BA and BC modules.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ defer elapse("append-config", evts[len(evts)-1])()
+ // Always updates newer configs to the later modules first in the data
+ // flow.
+ if err := con.bcModule.notifyRoundEvents(evts); err != nil {
+ panic(err)
+ }
+ if err := con.baMgr.notifyRoundEvents(evts); err != nil {
+ panic(err)
+ }
+ })
+ // Register round event handler to reset DKG if the DKG set for next round
+ // failed to setup.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ defer elapse("reset-DKG", e)()
+ nextRound := e.Round + 1
+ if nextRound < DKGDelayRound {
+ return
+ }
+ curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round)
+ if err != nil {
+ con.logger.Error("Error getting notary set when proposing CRS",
+ "round", e.Round,
+ "error", err)
+ return
+ }
+ if _, exist := curNotarySet[con.ID]; !exist {
+ return
+ }
+ con.event.RegisterHeight(e.NextDKGResetHeight(), func(uint64) {
+ if ok, _ := utils.IsDKGValid(
+ con.gov, con.logger, nextRound, e.Reset); ok {
+ return
+ }
+ // Aborting all previous running DKG protocol instance if any.
+ go con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true)
+ })
+ })
+ // Register round event handler to propose new CRS.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ // We don't have to propose new CRS during DKG reset, the reset of DKG
+ // would be done by the notary set in previous round.
+ e := evts[len(evts)-1]
+ defer elapse("propose-CRS", e)()
+ if e.Reset != 0 || e.Round < DKGDelayRound {
+ return
+ }
+ if curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round); err != nil {
+ con.logger.Error("Error getting notary set when proposing CRS",
+ "round", e.Round,
+ "error", err)
+ } else {
+ if _, exist := curNotarySet[con.ID]; !exist {
+ return
+ }
+ con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) {
+ con.logger.Debug(
+ "Calling Governance.CRS to check if already proposed",
+ "round", e.Round+1)
+ if (con.gov.CRS(e.Round+1) != common.Hash{}) {
+ con.logger.Debug("CRS already proposed", "round", e.Round+1)
+ return
+ }
+ go con.runCRS(e.Round, e.CRS, false)
+ })
+ }
+ })
+ // Touch nodeSetCache for next round.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ defer elapse("touch-NodeSetCache", e)()
+ con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) {
+ if e.Reset == 0 {
+ return
+ }
+ go func() {
+ nextRound := e.Round + 1
+ if err := con.nodeSetCache.Touch(nextRound); err != nil {
+ con.logger.Warn("Failed to update nodeSetCache",
+ "round", nextRound,
+ "error", err)
+ }
+ }()
+ })
+ })
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ if e.Reset != 0 {
+ return
+ }
+ defer elapse("touch-DKGCache", e)()
+ go func() {
+ if _, err :=
+ con.tsigVerifierCache.Update(e.Round); err != nil {
+ con.logger.Warn("Failed to update tsig cache",
+ "round", e.Round,
+ "error", err)
+ }
+ }()
+ go func() {
+ threshold := utils.GetDKGThreshold(
+ utils.GetConfigWithPanic(con.gov, e.Round, con.logger))
+ // Restore group public key.
+ con.logger.Debug(
+ "Calling Governance.DKGMasterPublicKeys for recoverDKGInfo",
+ "round", e.Round)
+ con.logger.Debug(
+ "Calling Governance.DKGComplaints for recoverDKGInfo",
+ "round", e.Round)
+ _, qualifies, err := typesDKG.CalcQualifyNodes(
+ con.gov.DKGMasterPublicKeys(e.Round),
+ con.gov.DKGComplaints(e.Round),
+ threshold)
+ if err != nil {
+ con.logger.Warn("Failed to calculate dkg set",
+ "round", e.Round,
+ "error", err)
+ return
+ }
+ if _, exist := qualifies[con.ID]; !exist {
+ return
+ }
+ if _, _, err :=
+ con.cfgModule.getDKGInfo(e.Round, true); err != nil {
+ con.logger.Warn("Failed to recover DKG info",
+ "round", e.Round,
+ "error", err)
+ }
+ }()
+ })
+ // checkCRS is a generator of checker to check if CRS for that round is
+ // ready or not.
+ checkCRS := func(round uint64) func() bool {
+ return func() bool {
+ nextCRS := con.gov.CRS(round)
+ if (nextCRS != common.Hash{}) {
+ return true
+ }
+ con.logger.Debug("CRS is not ready yet. Try again later...",
+ "nodeID", con.ID,
+ "round", round)
+ return false
+ }
+ }
+ // Trigger round validation method for next period.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ defer elapse("next-round", e)()
+ // Register a routine to trigger round events.
+ con.event.RegisterHeight(e.NextRoundValidationHeight(),
+ utils.RoundEventRetryHandlerGenerator(con.roundEvent, con.event))
+ // Register a routine to register next DKG.
+ con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
+ nextRound := e.Round + 1
+ if nextRound < DKGDelayRound {
+ con.logger.Info("Skip runDKG for round",
+ "round", nextRound,
+ "reset", e.Reset)
+ return
+ }
+ go func() {
+ // Normally, gov.CRS would return non-nil. Use this for in case
+ // of unexpected network fluctuation and ensure the robustness.
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Debug("unable to prepare CRS for notary set",
+ "round", nextRound,
+ "reset", e.Reset)
+ return
+ }
+ nextNotarySet, err := con.nodeSetCache.GetNotarySet(nextRound)
+ if err != nil {
+ con.logger.Error("Error getting notary set for next round",
+ "round", nextRound,
+ "reset", e.Reset,
+ "error", err)
+ return
+ }
+ if _, exist := nextNotarySet[con.ID]; !exist {
+ con.logger.Info("Not selected as notary set",
+ "round", nextRound,
+ "reset", e.Reset)
+ return
+ }
+ con.logger.Info("Selected as notary set",
+ "round", nextRound,
+ "reset", e.Reset)
+ nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
+ con.logger)
+ con.cfgModule.registerDKG(con.ctx, nextRound, e.Reset,
+ utils.GetDKGThreshold(nextConfig))
+ con.event.RegisterHeight(e.NextDKGPreparationHeight(),
+ func(h uint64) {
+ func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgRunning = 0
+ }()
+ // We want to skip some of the DKG phases when started.
+ dkgCurrentHeight := h - e.NextDKGPreparationHeight()
+ con.runDKG(
+ nextRound, e.Reset,
+ e.NextDKGPreparationHeight(), dkgCurrentHeight)
+ })
+ }()
+ })
+ })
+ con.roundEvent.TriggerInitEvent()
+ if initBlock != nil {
+ con.event.NotifyHeight(initBlock.Position.Height)
+ }
+ con.baMgr.prepare()
+ return
+}
+
+// Run starts running DEXON Consensus.
+func (con *Consensus) Run() {
+ // There may have emptys block in blockchain added by force sync.
+ blocksWithoutRandomness := con.bcModule.pendingBlocksWithoutRandomness()
+ // Launch BA routines.
+ con.baMgr.run()
+ // Launch network handler.
+ con.logger.Debug("Calling Network.ReceiveChan")
+ con.waitGroup.Add(1)
+ go con.deliverNetworkMsg()
+ con.waitGroup.Add(1)
+ go con.processMsg()
+ go con.processBlockLoop()
+ // Stop dummy receiver if launched.
+ if con.dummyCancel != nil {
+ con.logger.Trace("Stop dummy receiver")
+ con.dummyCancel()
+ <-con.dummyFinished
+ // Replay those cached messages.
+ con.logger.Trace("Dummy receiver stoped, start dumping cached messages",
+ "count", len(con.dummyMsgBuffer))
+ for _, msg := range con.dummyMsgBuffer {
+ loop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break loop
+ case <-time.After(50 * time.Millisecond):
+ con.logger.Debug(
+ "internal message channel is full when syncing")
+ }
+ }
+ }
+ con.logger.Trace("Finish dumping cached messages")
+ }
+ con.generateBlockRandomness(blocksWithoutRandomness)
+ // Sleep until dMoment come.
+ time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Take some time to bootstrap.
+ time.Sleep(3 * time.Second)
+ con.waitGroup.Add(1)
+ go con.deliveryGuard()
+ // Block until done.
+ select {
+ case <-con.ctx.Done():
+ }
+}
+
+func (con *Consensus) generateBlockRandomness(blocks []*types.Block) {
+ con.logger.Debug("Start generating block randomness", "blocks", blocks)
+ isNotarySet := make(map[uint64]bool)
+ for _, block := range blocks {
+ if block.Position.Round < DKGDelayRound {
+ continue
+ }
+ doRun, exist := isNotarySet[block.Position.Round]
+ if !exist {
+ curNotarySet, err := con.nodeSetCache.GetNotarySet(block.Position.Round)
+ if err != nil {
+ con.logger.Error("Error getting notary set when generate block tsig",
+ "round", block.Position.Round,
+ "error", err)
+ continue
+ }
+ _, exist := curNotarySet[con.ID]
+ isNotarySet[block.Position.Round] = exist
+ doRun = exist
+ }
+ if !doRun {
+ continue
+ }
+ go func(block *types.Block) {
+ psig, err := con.cfgModule.preparePartialSignature(
+ block.Position.Round, block.Hash)
+ if err != nil {
+ con.logger.Error("Failed to prepare partial signature",
+ "block", block,
+ "error", err)
+ } else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to sign DKG partial signature",
+ "block", block,
+ "error", err)
+ } else if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to process partial signature",
+ "block", block,
+ "error", err)
+ } else {
+ con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
+ "proposer", psig.ProposerID,
+ "block", block)
+ con.network.BroadcastDKGPartialSignature(psig)
+ sig, err := con.cfgModule.runTSig(
+ block.Position.Round,
+ block.Hash,
+ 60*time.Minute,
+ )
+ if err != nil {
+ con.logger.Error("Failed to run Block Tsig",
+ "block", block,
+ "error", err)
+ return
+ }
+ result := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Randomness: sig.Signature[:],
+ }
+ con.bcModule.addBlockRandomness(block.Position, sig.Signature[:])
+ con.logger.Debug("Broadcast BlockRandomness",
+ "block", block,
+ "result", result)
+ con.network.BroadcastAgreementResult(result)
+ if err := con.deliverFinalizedBlocks(); err != nil {
+ con.logger.Error("Failed to deliver finalized block",
+ "error", err)
+ }
+ }
+ }(block)
+ }
+}
+
+// runDKG starts running DKG protocol.
+func (con *Consensus) runDKG(
+ round, reset, dkgBeginHeight, dkgHeight uint64) {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ if con.dkgRunning != 0 {
+ return
+ }
+ con.dkgRunning = 1
+ go func() {
+ defer func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgReady.Broadcast()
+ con.dkgRunning = 2
+ }()
+ if err :=
+ con.cfgModule.runDKG(
+ round, reset,
+ con.event, dkgBeginHeight, dkgHeight); err != nil {
+ con.logger.Error("Failed to runDKG", "error", err)
+ }
+ }()
+}
+
+func (con *Consensus) runCRS(round uint64, hash common.Hash, reset bool) {
+ // Start running next round CRS.
+ psig, err := con.cfgModule.preparePartialSignature(round, hash)
+ if err != nil {
+ con.logger.Error("Failed to prepare partial signature", "error", err)
+ } else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to sign DKG partial signature", "error", err)
+ } else if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to process partial signature", "error", err)
+ } else {
+ con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
+ "proposer", psig.ProposerID,
+ "round", psig.Round,
+ "hash", psig.Hash)
+ con.network.BroadcastDKGPartialSignature(psig)
+ con.logger.Debug("Calling Governance.CRS", "round", round)
+ crs, err := con.cfgModule.runCRSTSig(round, hash)
+ if err != nil {
+ con.logger.Error("Failed to run CRS Tsig", "error", err)
+ } else {
+ if reset {
+ con.logger.Debug("Calling Governance.ResetDKG",
+ "round", round+1,
+ "crs", hex.EncodeToString(crs))
+ con.gov.ResetDKG(crs)
+ } else {
+ con.logger.Debug("Calling Governance.ProposeCRS",
+ "round", round+1,
+ "crs", hex.EncodeToString(crs))
+ con.gov.ProposeCRS(round+1, crs)
+ }
+ }
+ }
+}
+
+// Stop the Consensus core.
+func (con *Consensus) Stop() {
+ con.ctxCancel()
+ con.baMgr.stop()
+ con.event.Reset()
+ con.waitGroup.Wait()
+ if nbApp, ok := con.app.(*nonBlocking); ok {
+ nbApp.wait()
+ }
+}
+
+func (con *Consensus) deliverNetworkMsg() {
+ defer con.waitGroup.Done()
+ recv := con.network.ReceiveChan()
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case msg := <-recv:
+ innerLoop:
+ for {
+ select {
+ case con.msgChan <- msg:
+ break innerLoop
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Debug("internal message channel is full",
+ "pending", msg)
+ }
+ }
+ case <-con.ctx.Done():
+ return
+ }
+ }
+}
+
+func (con *Consensus) processMsg() {
+ defer con.waitGroup.Done()
+MessageLoop:
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ var msg, peer interface{}
+ select {
+ case msg = <-con.priorityMsgChan:
+ default:
+ }
+ if msg == nil {
+ select {
+ case message := <-con.msgChan:
+ msg, peer = message.Payload, message.PeerID
+ case msg = <-con.priorityMsgChan:
+ case <-con.ctx.Done():
+ return
+ }
+ }
+ switch val := msg.(type) {
+ case *selfAgreementResult:
+ con.baMgr.touchAgreementResult((*types.AgreementResult)(val))
+ case *types.Block:
+ if ch, exist := func() (chan<- *types.Block, bool) {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ ch, e := con.baConfirmedBlock[val.Hash]
+ return ch, e
+ }(); exist {
+ if val.IsEmpty() {
+ hash, err := utils.HashBlock(val)
+ if err != nil {
+ con.logger.Error("Error verifying empty block hash",
+ "block", val,
+ "error, err")
+ con.network.ReportBadPeerChan() <- peer
+ continue MessageLoop
+ }
+ if hash != val.Hash {
+ con.logger.Error("Incorrect confirmed empty block hash",
+ "block", val,
+ "hash", hash)
+ con.network.ReportBadPeerChan() <- peer
+ continue MessageLoop
+ }
+ if _, err := con.bcModule.proposeBlock(
+ val.Position, time.Time{}, true); err != nil {
+ con.logger.Error("Error adding empty block",
+ "block", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ continue MessageLoop
+ }
+ } else {
+ if !val.IsFinalized() {
+ con.logger.Warn("Ignore not finalized block",
+ "block", val)
+ continue MessageLoop
+ }
+ ok, err := con.bcModule.verifyRandomness(
+ val.Hash, val.Position.Round, val.Randomness)
+ if err != nil {
+ con.logger.Error("Error verifying confirmed block randomness",
+ "block", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ continue MessageLoop
+ }
+ if !ok {
+ con.logger.Error("Incorrect confirmed block randomness",
+ "block", val)
+ con.network.ReportBadPeerChan() <- peer
+ continue MessageLoop
+ }
+ if err := utils.VerifyBlockSignature(val); err != nil {
+ con.logger.Error("VerifyBlockSignature failed",
+ "block", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ continue MessageLoop
+ }
+ }
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // In case of multiple delivered block.
+ if _, exist := con.baConfirmedBlock[val.Hash]; !exist {
+ return
+ }
+ delete(con.baConfirmedBlock, val.Hash)
+ ch <- val
+ }()
+ } else if val.IsFinalized() {
+ if err := con.processFinalizedBlock(val); err != nil {
+ con.logger.Error("Failed to process finalized block",
+ "block", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ }
+ } else {
+ if err := con.preProcessBlock(val); err != nil {
+ con.logger.Error("Failed to pre process block",
+ "block", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ }
+ }
+ case *types.Vote:
+ if err := con.ProcessVote(val); err != nil {
+ con.logger.Error("Failed to process vote",
+ "vote", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ }
+ case *types.AgreementResult:
+ if err := con.ProcessAgreementResult(val); err != nil {
+ con.logger.Error("Failed to process agreement result",
+ "result", val,
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ }
+ case *typesDKG.PrivateShare:
+ if err := con.cfgModule.processPrivateShare(val); err != nil {
+ con.logger.Error("Failed to process private share",
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ }
+
+ case *typesDKG.PartialSignature:
+ if err := con.cfgModule.processPartialSignature(val); err != nil {
+ con.logger.Error("Failed to process partial signature",
+ "error", err)
+ con.network.ReportBadPeerChan() <- peer
+ }
+ }
+ }
+}
+
+// ProcessVote is the entry point to submit ont vote to a Consensus instance.
+func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
+ err = con.baMgr.processVote(vote)
+ return
+}
+
+// ProcessAgreementResult processes the randomness request.
+func (con *Consensus) ProcessAgreementResult(
+ rand *types.AgreementResult) error {
+ if !con.baMgr.touchAgreementResult(rand) {
+ return nil
+ }
+ // Sanity Check.
+ if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
+ con.baMgr.untouchAgreementResult(rand)
+ return err
+ }
+ if err := con.bcModule.processAgreementResult(rand); err != nil {
+ con.baMgr.untouchAgreementResult(rand)
+ if err == ErrSkipButNoError {
+ return nil
+ }
+ return err
+ }
+ // Syncing BA Module.
+ if err := con.baMgr.processAgreementResult(rand); err != nil {
+ con.baMgr.untouchAgreementResult(rand)
+ return err
+ }
+
+ con.logger.Debug("Rebroadcast AgreementResult",
+ "result", rand)
+ con.network.BroadcastAgreementResult(rand)
+
+ return con.deliverFinalizedBlocks()
+}
+
+// preProcessBlock performs Byzantine Agreement on the block.
+func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
+ err = con.baMgr.processBlock(b)
+ if err == nil && con.debugApp != nil {
+ con.debugApp.BlockReceived(b.Hash)
+ }
+ return
+}
+
+func (con *Consensus) processFinalizedBlock(b *types.Block) (err error) {
+ if b.Position.Round < DKGDelayRound {
+ return
+ }
+ if err = utils.VerifyBlockSignature(b); err != nil {
+ return
+ }
+ verifier, ok, err := con.tsigVerifierCache.UpdateAndGet(b.Position.Round)
+ if err != nil {
+ return
+ }
+ if !ok {
+ err = ErrCannotVerifyBlockRandomness
+ return
+ }
+ if !verifier.VerifySignature(b.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: b.Randomness,
+ }) {
+ err = ErrIncorrectBlockRandomness
+ return
+ }
+ err = con.baMgr.processFinalizedBlock(b)
+ if err == nil && con.debugApp != nil {
+ con.debugApp.BlockReceived(b.Hash)
+ }
+ return
+}
+
+func (con *Consensus) deliveryGuard() {
+ defer con.waitGroup.Done()
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(con.dMoment.Sub(time.Now())):
+ }
+ // Node takes time to start.
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(60 * time.Second):
+ }
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-con.resetDeliveryGuardTicker:
+ case <-time.After(60 * time.Second):
+ con.logger.Error("No blocks delivered for too long", "ID", con.ID)
+ panic(fmt.Errorf("No blocks delivered for too long"))
+ }
+ }
+}
+
+// deliverBlock deliver a block to application layer.
+func (con *Consensus) deliverBlock(b *types.Block) {
+ select {
+ case con.resetDeliveryGuardTicker <- struct{}{}:
+ default:
+ }
+ if err := con.db.PutBlock(*b); err != nil {
+ panic(err)
+ }
+ if err := con.db.PutCompactionChainTipInfo(b.Hash,
+ b.Position.Height); err != nil {
+ panic(err)
+ }
+ con.logger.Debug("Calling Application.BlockDelivered", "block", b)
+ con.app.BlockDelivered(b.Hash, b.Position, common.CopyBytes(b.Randomness))
+ if con.debugApp != nil {
+ con.debugApp.BlockReady(b.Hash)
+ }
+}
+
+// deliverFinalizedBlocks extracts and delivers finalized blocks to application
+// layer.
+func (con *Consensus) deliverFinalizedBlocks() error {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ return con.deliverFinalizedBlocksWithoutLock()
+}
+
+func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
+ deliveredBlocks := con.bcModule.extractBlocks()
+ con.logger.Debug("Last blocks in compaction chain",
+ "delivered", con.bcModule.lastDeliveredBlock(),
+ "pending", con.bcModule.lastPendingBlock())
+ for _, b := range deliveredBlocks {
+ con.deliverBlock(b)
+ con.event.NotifyHeight(b.Position.Height)
+ }
+ return
+}
+
+func (con *Consensus) processBlockLoop() {
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
+ case block := <-con.processBlockChan:
+ if err := con.processBlock(block); err != nil {
+ con.logger.Error("Error processing block",
+ "block", block,
+ "error", err)
+ }
+ }
+ }
+}
+
+// processBlock is the entry point to submit one block to a Consensus instance.
+func (con *Consensus) processBlock(block *types.Block) (err error) {
+ // Block processed by blockChain can be out-of-order. But the output from
+ // blockChain (deliveredBlocks) cannot, thus we need to protect the part
+ // below with writer lock.
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if err = con.bcModule.addBlock(block); err != nil {
+ return
+ }
+ if err = con.deliverFinalizedBlocksWithoutLock(); err != nil {
+ return
+ }
+ return
+}
+
+// PrepareBlock would setup header fields of block based on its ProposerID.
+func (con *Consensus) proposeBlock(position types.Position) (
+ *types.Block, error) {
+ b, err := con.bcModule.proposeBlock(position, time.Now().UTC(), false)
+ if err != nil {
+ return nil, err
+ }
+ con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round)
+ crs := con.gov.CRS(b.Position.Round)
+ if crs.Equal(common.Hash{}) {
+ con.logger.Error("CRS for round is not ready, unable to prepare block",
+ "position", &b.Position)
+ return nil, ErrCRSNotReady
+ }
+ if err = con.signer.SignCRS(b, crs); err != nil {
+ return nil, err
+ }
+ return b, nil
+}