diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go')
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go | 1026 |
1 files changed, 0 insertions, 1026 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go deleted file mode 100644 index 7e6934f45..000000000 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go +++ /dev/null @@ -1,1026 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// <http://www.gnu.org/licenses/>. - -package core - -import ( - "context" - "encoding/hex" - "fmt" - "sync" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/blockdb" - "github.com/dexon-foundation/dexon-consensus/core/crypto" - "github.com/dexon-foundation/dexon-consensus/core/types" - typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" -) - -// Errors for consensus core. -var ( - ErrProposerNotInNodeSet = fmt.Errorf( - "proposer is not in node set") - ErrIncorrectHash = fmt.Errorf( - "hash of block is incorrect") - ErrIncorrectSignature = fmt.Errorf( - "signature of block is incorrect") - ErrGenesisBlockNotEmpty = fmt.Errorf( - "genesis block should be empty") - ErrUnknownBlockProposed = fmt.Errorf( - "unknown block is proposed") - ErrIncorrectAgreementResultPosition = fmt.Errorf( - "incorrect agreement result position") - ErrNotEnoughVotes = fmt.Errorf( - "not enought votes") - ErrIncorrectVoteBlockHash = fmt.Errorf( - "incorrect vote block hash") - ErrIncorrectVoteType = fmt.Errorf( - "incorrect vote type") - ErrIncorrectVotePosition = fmt.Errorf( - "incorrect vote position") - ErrIncorrectVoteProposer = fmt.Errorf( - "incorrect vote proposer") - ErrIncorrectBlockRandomnessResult = fmt.Errorf( - "incorrect block randomness result") -) - -// consensusBAReceiver implements agreementReceiver. -type consensusBAReceiver struct { - // TODO(mission): consensus would be replaced by lattice and network. - consensus *Consensus - agreementModule *agreement - chainID uint32 - changeNotaryTime time.Time - round uint64 - restartNotary chan bool -} - -func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { - go func() { - if err := recv.agreementModule.prepareVote(vote); err != nil { - recv.consensus.logger.Error("Failed to prepare vote", "error", err) - return - } - if err := recv.agreementModule.processVote(vote); err != nil { - recv.consensus.logger.Error("Failed to process vote", "error", err) - return - } - recv.consensus.logger.Debug("Calling Network.BroadcastVote", - "vote", vote) - recv.consensus.network.BroadcastVote(vote) - }() -} - -func (recv *consensusBAReceiver) ProposeBlock() common.Hash { - block := recv.consensus.proposeBlock(recv.chainID, recv.round) - if block == nil { - recv.consensus.logger.Error("unable to propose block") - return nullBlockHash - } - if err := recv.consensus.preProcessBlock(block); err != nil { - recv.consensus.logger.Error("Failed to pre-process block", "error", err) - return common.Hash{} - } - recv.consensus.logger.Debug("Calling Network.BroadcastBlock", "block", block) - recv.consensus.network.BroadcastBlock(block) - return block.Hash -} - -func (recv *consensusBAReceiver) ConfirmBlock( - hash common.Hash, votes map[types.NodeID]*types.Vote) { - var block *types.Block - if (hash == common.Hash{}) { - recv.consensus.logger.Info("Empty block is confirmed", - "position", recv.agreementModule.agreementID()) - var err error - block, err = recv.consensus.proposeEmptyBlock(recv.chainID) - if err != nil { - recv.consensus.logger.Error("Propose empty block failed", "error", err) - return - } - } else { - var exist bool - block, exist = recv.consensus.baModules[recv.chainID]. - findCandidateBlockNoLock(hash) - if !exist { - recv.consensus.logger.Error("Unknown block confirmed", - "hash", hash, - "chainID", recv.chainID) - ch := make(chan *types.Block) - func() { - recv.consensus.lock.Lock() - defer recv.consensus.lock.Unlock() - recv.consensus.baConfirmedBlock[hash] = ch - }() - recv.consensus.network.PullBlocks(common.Hashes{hash}) - go func() { - block = <-ch - recv.consensus.logger.Info("Receive unknown block", - "hash", hash, - "chainID", recv.chainID) - recv.agreementModule.addCandidateBlock(block) - recv.ConfirmBlock(block.Hash, votes) - }() - return - } - } - recv.consensus.ccModule.registerBlock(block) - voteList := make([]types.Vote, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue - } - voteList = append(voteList, *vote) - } - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - } - recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult", - "result", result) - recv.consensus.network.BroadcastAgreementResult(result) - if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", "error", err) - return - } - if block.Timestamp.After(recv.changeNotaryTime) { - recv.round++ - recv.restartNotary <- true - } else { - recv.restartNotary <- false - } -} - -func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { - recv.consensus.logger.Debug("Calling Network.PullBlocks", "hashes", hashes) - recv.consensus.network.PullBlocks(hashes) -} - -// consensusDKGReceiver implements dkgReceiver. -type consensusDKGReceiver struct { - ID types.NodeID - gov Governance - authModule *Authenticator - nodeSetCache *NodeSetCache - cfgModule *configurationChain - network Network - logger common.Logger -} - -// ProposeDKGComplaint proposes a DKGComplaint. -func (recv *consensusDKGReceiver) ProposeDKGComplaint( - complaint *typesDKG.Complaint) { - if err := recv.authModule.SignDKGComplaint(complaint); err != nil { - recv.logger.Error("Failed to sign DKG complaint", "error", err) - return - } - recv.logger.Debug("Calling Governace.AddDKGComplaint", - "complaint", complaint) - recv.gov.AddDKGComplaint(complaint.Round, complaint) -} - -// ProposeDKGMasterPublicKey propose a DKGMasterPublicKey. -func (recv *consensusDKGReceiver) ProposeDKGMasterPublicKey( - mpk *typesDKG.MasterPublicKey) { - if err := recv.authModule.SignDKGMasterPublicKey(mpk); err != nil { - recv.logger.Error("Failed to sign DKG master public key", "error", err) - return - } - recv.logger.Debug("Calling Governance.AddDKGMasterPublicKey", "key", mpk) - recv.gov.AddDKGMasterPublicKey(mpk.Round, mpk) -} - -// ProposeDKGPrivateShare propose a DKGPrivateShare. -func (recv *consensusDKGReceiver) ProposeDKGPrivateShare( - prv *typesDKG.PrivateShare) { - if err := recv.authModule.SignDKGPrivateShare(prv); err != nil { - recv.logger.Error("Failed to sign DKG private share", "error", err) - return - } - receiverPubKey, exists := recv.nodeSetCache.GetPublicKey(prv.ReceiverID) - if !exists { - recv.logger.Error("Public key for receiver not found", - "receiver", prv.ReceiverID.String()[:6]) - return - } - if prv.ReceiverID == recv.ID { - go func() { - if err := recv.cfgModule.processPrivateShare(prv); err != nil { - recv.logger.Error("Failed to process self private share", "prvShare", prv) - } - }() - } else { - recv.logger.Debug("Calling Network.SendDKGPrivateShare", - "receiver", hex.EncodeToString(receiverPubKey.Bytes())) - recv.network.SendDKGPrivateShare(receiverPubKey, prv) - } -} - -// ProposeDKGAntiNackComplaint propose a DKGPrivateShare as an anti complaint. -func (recv *consensusDKGReceiver) ProposeDKGAntiNackComplaint( - prv *typesDKG.PrivateShare) { - if prv.ProposerID == recv.ID { - if err := recv.authModule.SignDKGPrivateShare(prv); err != nil { - recv.logger.Error("Failed sign DKG private share", "error", err) - return - } - } - recv.logger.Debug("Calling Network.BroadcastDKGPrivateShare", "share", prv) - recv.network.BroadcastDKGPrivateShare(prv) -} - -// ProposeDKGFinalize propose a DKGFinalize message. -func (recv *consensusDKGReceiver) ProposeDKGFinalize(final *typesDKG.Finalize) { - if err := recv.authModule.SignDKGFinalize(final); err != nil { - recv.logger.Error("Faield to sign DKG finalize", "error", err) - return - } - recv.logger.Debug("Calling Governance.AddDKGFinalize", "final", final) - recv.gov.AddDKGFinalize(final.Round, final) -} - -// Consensus implements DEXON Consensus algorithm. -type Consensus struct { - // Node Info. - ID types.NodeID - authModule *Authenticator - currentConfig *types.Config - - // BA. - baModules []*agreement - receivers []*consensusBAReceiver - baConfirmedBlock map[common.Hash]chan<- *types.Block - - // DKG. - dkgRunning int32 - dkgReady *sync.Cond - cfgModule *configurationChain - - // Dexon consensus v1's modules. - lattice *Lattice - ccModule *compactionChain - - // Interfaces. - db blockdb.BlockDatabase - app Application - gov Governance - network Network - tickerObj Ticker - - // Misc. - dMoment time.Time - nodeSetCache *NodeSetCache - round uint64 - roundToNotify uint64 - lock sync.RWMutex - ctx context.Context - ctxCancel context.CancelFunc - event *common.Event - logger common.Logger -} - -// NewConsensus construct an Consensus instance. -func NewConsensus( - dMoment time.Time, - app Application, - gov Governance, - db blockdb.BlockDatabase, - network Network, - prv crypto.PrivateKey, - logger common.Logger) *Consensus { - - // TODO(w): load latest blockHeight from DB, and use config at that height. - var ( - round uint64 - // round 0 and 1 are decided at beginning. - roundToNotify = round + 2 - ) - logger.Debug("Calling Governance.Configuration", "round", round) - config := gov.Configuration(round) - nodeSetCache := NewNodeSetCache(gov) - logger.Debug("Calling Governance.CRS", "round", round) - crs := gov.CRS(round) - // Setup acking by information returned from Governace. - nodes, err := nodeSetCache.GetNodeSet(round) - if err != nil { - panic(err) - } - // Setup auth module. - authModule := NewAuthenticator(prv) - // Check if the application implement Debug interface. - debugApp, _ := app.(Debug) - // Init lattice. - lattice := NewLattice( - dMoment, config, authModule, app, debugApp, db, logger) - // Init configuration chain. - ID := types.NewNodeID(prv.PublicKey()) - recv := &consensusDKGReceiver{ - ID: ID, - gov: gov, - authModule: authModule, - nodeSetCache: nodeSetCache, - network: network, - logger: logger, - } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - logger) - recv.cfgModule = cfgModule - // Construct Consensus instance. - con := &Consensus{ - ID: ID, - currentConfig: config, - ccModule: newCompactionChain(gov), - lattice: lattice, - app: app, - gov: gov, - db: db, - network: network, - tickerObj: newTicker(gov, round, TickerBA), - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - authModule: authModule, - event: common.NewEvent(), - logger: logger, - roundToNotify: roundToNotify, - } - - validLeader := func(block *types.Block) bool { - return lattice.SanityCheck(block) == nil - } - - con.baModules = make([]*agreement, config.NumChains) - con.receivers = make([]*consensusBAReceiver, config.NumChains) - for i := uint32(0); i < config.NumChains; i++ { - chainID := i - recv := &consensusBAReceiver{ - consensus: con, - chainID: chainID, - restartNotary: make(chan bool, 1), - } - agreementModule := newAgreement( - con.ID, - recv, - nodes.IDs, - newLeaderSelector(crs, validLeader), - con.authModule, - ) - // Hacky way to make agreement module self contained. - recv.agreementModule = agreementModule - recv.changeNotaryTime = dMoment - con.baModules[chainID] = agreementModule - con.receivers[chainID] = recv - } - return con -} - -// Run starts running DEXON Consensus. -func (con *Consensus) Run(initBlock *types.Block) { - // Setup context. - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.ccModule.init(initBlock) - // TODO(jimmy-dexon): change AppendConfig to add config for specific round. - for i := uint64(0); i < initBlock.Position.Round; i++ { - con.logger.Debug("Calling Governance.Configuration", "round", i+1) - cfg := con.gov.Configuration(i + 1) - if err := con.lattice.AppendConfig(i+1, cfg); err != nil { - panic(err) - } - } - con.logger.Debug("Calling Network.ReceiveChan") - go con.processMsg(con.network.ReceiveChan()) - // Sleep until dMoment come. - time.Sleep(con.dMoment.Sub(time.Now().UTC())) - con.cfgModule.registerDKG(con.round, int(con.currentConfig.DKGSetSize)/3+1) - con.event.RegisterTime(con.dMoment.Add(con.currentConfig.RoundInterval/4), - func(time.Time) { - con.runDKGTSIG(con.round) - }) - round1 := uint64(1) - con.logger.Debug("Calling Governance.Configuration", "round", round1) - con.lattice.AppendConfig(round1, con.gov.Configuration(round1)) - con.initialRound(con.dMoment) - ticks := make([]chan struct{}, 0, con.currentConfig.NumChains) - for i := uint32(0); i < con.currentConfig.NumChains; i++ { - tick := make(chan struct{}) - ticks = append(ticks, tick) - go con.runBA(i, tick) - } - - // Reset ticker. - <-con.tickerObj.Tick() - <-con.tickerObj.Tick() - for { - <-con.tickerObj.Tick() - for _, tick := range ticks { - go func(tick chan struct{}) { tick <- struct{}{} }(tick) - } - } -} - -func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) { - // TODO(jimmy-dexon): move this function inside agreement. - agreement := con.baModules[chainID] - recv := con.receivers[chainID] - recv.restartNotary <- true - nIDs := make(map[types.NodeID]struct{}) - // Reset ticker - <-tick -BALoop: - for { - select { - case <-con.ctx.Done(): - break BALoop - default: - } - select { - case newNotary := <-recv.restartNotary: - if newNotary { - recv.changeNotaryTime = - recv.changeNotaryTime.Add(con.currentConfig.RoundInterval) - nodes, err := con.nodeSetCache.GetNodeSet(recv.round) - if err != nil { - panic(err) - } - con.logger.Debug("Calling Governance.Configuration", - "round", recv.round) - con.logger.Debug("Calling Governance.CRS", "round", recv.round) - nIDs = nodes.GetSubSet( - int(con.gov.Configuration(recv.round).NotarySetSize), - types.NewNotarySetTarget(con.gov.CRS(recv.round), chainID)) - } - nextPos := con.lattice.NextPosition(chainID) - nextPos.Round = recv.round - agreement.restart(nIDs, nextPos) - default: - } - if agreement.pullVotes() { - pos := agreement.agreementID() - con.logger.Debug("Calling Network.PullVotes for syncing votes", - "position", pos) - con.network.PullVotes(pos) - } - err := agreement.nextState() - if err != nil { - con.logger.Error("Failed to proceed to next state", - "nodeID", con.ID.String(), - "error", err) - break BALoop - } - for i := 0; i < agreement.clocks(); i++ { - // Priority select for agreement.done(). - select { - case <-agreement.done(): - continue BALoop - default: - } - select { - case <-agreement.done(): - continue BALoop - case <-tick: - } - } - } -} - -// runDKGTSIG starts running DKG+TSIG protocol. -func (con *Consensus) runDKGTSIG(round uint64) { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - if con.dkgRunning != 0 { - return - } - con.dkgRunning = 1 - go func() { - startTime := time.Now().UTC() - defer func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgReady.Broadcast() - con.dkgRunning = 2 - DKGTime := time.Now().Sub(startTime) - if DKGTime.Nanoseconds() >= - con.currentConfig.RoundInterval.Nanoseconds()/2 { - con.logger.Warn("Your computer cannot finish DKG on time!", - "nodeID", con.ID.String()) - } - }() - if err := con.cfgModule.runDKG(round); err != nil { - panic(err) - } - nodes, err := con.nodeSetCache.GetNodeSet(round) - if err != nil { - panic(err) - } - con.logger.Debug("Calling Governance.Configuration", "round", round) - hash := HashConfigurationBlock( - nodes.IDs, - con.gov.Configuration(round), - common.Hash{}, - con.cfgModule.prevHash) - psig, err := con.cfgModule.preparePartialSignature( - round, hash) - if err != nil { - panic(err) - } - if err = con.authModule.SignDKGPartialSignature(psig); err != nil { - panic(err) - } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - panic(err) - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash) - con.network.BroadcastDKGPartialSignature(psig) - if _, err = con.cfgModule.runBlockTSig(round, hash); err != nil { - panic(err) - } - }() -} - -func (con *Consensus) runCRS() { - // Start running next round CRS. - con.logger.Debug("Calling Governance.CRS", "round", con.round) - psig, err := con.cfgModule.preparePartialSignature( - con.round, con.gov.CRS(con.round)) - if err != nil { - con.logger.Error("Failed to prepare partial signature", "error", err) - } else if err = con.authModule.SignDKGPartialSignature(psig); err != nil { - con.logger.Error("Failed to sign DKG partial signature", "error", err) - } else if err = con.cfgModule.processPartialSignature(psig); err != nil { - con.logger.Error("Failed to process partial signature", "error", err) - } else { - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash) - con.network.BroadcastDKGPartialSignature(psig) - con.logger.Debug("Calling Governance.CRS", "round", con.round) - crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round)) - if err != nil { - con.logger.Error("Failed to run CRS Tsig", "error", err) - } else { - con.logger.Debug("Calling Governance.ProposeCRS", - "round", con.round+1, - "crs", hex.EncodeToString(crs)) - con.gov.ProposeCRS(con.round+1, crs) - } - } -} - -func (con *Consensus) initialRound(startTime time.Time) { - select { - case <-con.ctx.Done(): - return - default: - } - con.logger.Debug("Calling Governance.Configuration", "round", con.round) - con.currentConfig = con.gov.Configuration(con.round) - - con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2), - func(time.Time) { - go func() { - con.runCRS() - ticker := newTicker(con.gov, con.round, TickerDKG) - <-ticker.Tick() - // Normally, gov.CRS would return non-nil. Use this for in case of - // unexpected network fluctuation and ensure the robustness. - for (con.gov.CRS(con.round+1) == common.Hash{}) { - con.logger.Info("CRS is not ready yet. Try again later...", - "nodeID", con.ID) - time.Sleep(500 * time.Millisecond) - } - con.cfgModule.registerDKG( - con.round+1, int(con.currentConfig.DKGSetSize/3)+1) - }() - }) - con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval*2/3), - func(time.Time) { - func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgRunning = 0 - }() - con.runDKGTSIG(con.round + 1) - }) - con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval), - func(time.Time) { - // Change round. - con.round++ - con.logger.Debug("Calling Governance.Configuration", - "round", con.round+1) - con.lattice.AppendConfig(con.round+1, con.gov.Configuration(con.round+1)) - con.initialRound(startTime.Add(con.currentConfig.RoundInterval)) - }) -} - -// Stop the Consensus core. -func (con *Consensus) Stop() { - for _, a := range con.baModules { - a.stop() - } - con.event.Reset() - con.ctxCancel() -} - -func (con *Consensus) processMsg(msgChan <-chan interface{}) { -MessageLoop: - for { - var msg interface{} - select { - case msg = <-msgChan: - case <-con.ctx.Done(): - return - } - - switch val := msg.(type) { - case *types.Block: - if ch, exist := func() (chan<- *types.Block, bool) { - con.lock.RLock() - defer con.lock.RUnlock() - ch, e := con.baConfirmedBlock[val.Hash] - return ch, e - }(); exist { - if err := con.lattice.SanityCheck(val); err != nil { - if err == ErrRetrySanityCheckLater { - err = nil - } else { - con.logger.Error("SanityCheck failed", "error", err) - continue MessageLoop - } - } - func() { - con.lock.Lock() - defer con.lock.Unlock() - // In case of multiple delivered block. - if _, exist := con.baConfirmedBlock[val.Hash]; !exist { - return - } - delete(con.baConfirmedBlock, val.Hash) - ch <- val - }() - } else if val.IsFinalized() { - // For sync mode. - if err := con.processFinalizedBlock(val); err != nil { - con.logger.Error("Failed to process finalized block", - "error", err) - } - } else { - if err := con.preProcessBlock(val); err != nil { - con.logger.Error("Failed to pre process block", - "error", err) - } - } - case *types.Vote: - if err := con.ProcessVote(val); err != nil { - con.logger.Error("Failed to process vote", - "error", err) - } - case *types.AgreementResult: - if err := con.ProcessAgreementResult(val); err != nil { - con.logger.Error("Failed to process agreement result", - "error", err) - } - case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val); err != nil { - con.logger.Error("Failed to process block randomness result", - "error", err) - } - case *typesDKG.PrivateShare: - if err := con.cfgModule.processPrivateShare(val); err != nil { - con.logger.Error("Failed to process private share", - "error", err) - } - - case *typesDKG.PartialSignature: - if err := con.cfgModule.processPartialSignature(val); err != nil { - con.logger.Error("Failed to process partial signature", - "error", err) - } - } - } -} - -func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block { - block := &types.Block{ - Position: types.Position{ - ChainID: chainID, - Round: round, - }, - } - if err := con.prepareBlock(block, time.Now().UTC()); err != nil { - con.logger.Error("Failed to prepare block", "error", err) - return nil - } - return block -} - -func (con *Consensus) proposeEmptyBlock( - chainID uint32) (*types.Block, error) { - block := &types.Block{ - Position: types.Position{ - ChainID: chainID, - }, - } - if err := con.lattice.PrepareEmptyBlock(block); err != nil { - return nil, err - } - return block, nil -} - -// ProcessVote is the entry point to submit ont vote to a Consensus instance. -func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - v := vote.Clone() - err = con.baModules[v.Position.ChainID].processVote(v) - return err -} - -// ProcessAgreementResult processes the randomness request. -func (con *Consensus) ProcessAgreementResult( - rand *types.AgreementResult) error { - // Sanity Check. - notarySet, err := con.nodeSetCache.GetNotarySet( - rand.Position.Round, rand.Position.ChainID) - if err != nil { - return err - } - if len(rand.Votes) < len(notarySet)/3*2+1 { - return ErrNotEnoughVotes - } - if len(rand.Votes) > len(notarySet) { - return ErrIncorrectVoteProposer - } - for _, vote := range rand.Votes { - if vote.BlockHash != rand.BlockHash { - return ErrIncorrectVoteBlockHash - } - if vote.Type != types.VoteCom { - return ErrIncorrectVoteType - } - if vote.Position != rand.Position { - return ErrIncorrectVotePosition - } - if _, exist := notarySet[vote.ProposerID]; !exist { - return ErrIncorrectVoteProposer - } - ok, err := verifyVoteSignature(&vote) - if err != nil { - return err - } - if !ok { - return ErrIncorrectVoteSignature - } - } - // Syncing BA Module. - agreement := con.baModules[rand.Position.ChainID] - aID := agreement.agreementID() - if rand.Position.Newer(&aID) { - con.logger.Info("Syncing BA", "position", rand.Position) - nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round) - if err != nil { - return err - } - con.logger.Debug("Calling Network.PullBlocks for syncing BA", - "hash", rand.BlockHash) - con.network.PullBlocks(common.Hashes{rand.BlockHash}) - nIDs := nodes.GetSubSet( - int(con.gov.Configuration(rand.Position.Round).NotarySetSize), - types.NewNotarySetTarget( - con.gov.CRS(rand.Position.Round), rand.Position.ChainID)) - for _, vote := range rand.Votes { - agreement.processVote(&vote) - } - agreement.restart(nIDs, rand.Position) - } - // Calculating randomness. - if rand.Position.Round == 0 { - return nil - } - if !con.ccModule.blockRegistered(rand.BlockHash) { - return nil - } - if DiffUint64(con.round, rand.Position.Round) > 1 { - return nil - } - // Sanity check done. - if !con.cfgModule.touchTSigHash(rand.BlockHash) { - return nil - } - con.logger.Debug("Calling Network.BroadcastAgreementResult", "result", rand) - con.network.BroadcastAgreementResult(rand) - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - return err - } - if _, exist := dkgSet[con.ID]; !exist { - return nil - } - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - return err - } - if err = con.authModule.SignDKGPartialSignature(psig); err != nil { - return err - } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - return err - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash) - con.network.BroadcastDKGPartialSignature(psig) - go func() { - tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) - if err != nil { - if err != ErrTSigAlreadyRunning { - con.logger.Error("Faield to run TSIG", "error", err) - } - return - } - result := &types.BlockRandomnessResult{ - BlockHash: rand.BlockHash, - Position: rand.Position, - Randomness: tsig.Signature, - } - if err := con.ProcessBlockRandomnessResult(result); err != nil { - con.logger.Error("Failed to process randomness result", - "error", err) - return - } - }() - return nil -} - -// ProcessBlockRandomnessResult processes the randomness result. -func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult) error { - if rand.Position.Round == 0 { - return nil - } - if !con.ccModule.blockRegistered(rand.BlockHash) { - return nil - } - round := rand.Position.Round - v, ok, err := con.ccModule.tsigVerifier.UpdateAndGet(round) - if err != nil { - return err - } - if !ok { - return nil - } - if !v.VerifySignature( - rand.BlockHash, crypto.Signature{Signature: rand.Randomness}) { - return ErrIncorrectBlockRandomnessResult - } - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash, - "position", rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) - con.network.BroadcastRandomnessResult(rand) - if err := con.ccModule.processBlockRandomnessResult(rand); err != nil { - if err != ErrBlockNotRegistered { - return err - } - } - return nil -} - -// preProcessBlock performs Byzantine Agreement on the block. -func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - if err = con.lattice.SanityCheck(b); err != nil { - if err != ErrRetrySanityCheckLater { - return - } - } - if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { - return err - } - return -} - -// deliverBlock deliver a block to application layer. -func (con *Consensus) deliverBlock(b *types.Block) { - // TODO(mission): clone types.FinalizationResult - con.logger.Debug("Calling Application.BlockDelivered", "block", b) - con.app.BlockDelivered(b.Hash, b.Finalization) - if b.Position.Round+2 == con.roundToNotify { - // Only the first block delivered of that round would - // trigger this noitification. - con.gov.NotifyRoundHeight( - con.roundToNotify, b.Finalization.Height) - con.roundToNotify++ - } -} - -// processBlock is the entry point to submit one block to a Consensus instance. -func (con *Consensus) processBlock(block *types.Block) (err error) { - if err = con.db.Put(*block); err != nil && err != blockdb.ErrBlockExists { - return - } - con.lock.Lock() - defer con.lock.Unlock() - // Block processed by lattice can be out-of-order. But the output of lattice - // (deliveredBlocks) cannot. - deliveredBlocks, err := con.lattice.ProcessBlock(block) - if err != nil { - return - } - // Pass delivered blocks to compaction chain. - for _, b := range deliveredBlocks { - if err = con.ccModule.processBlock(b); err != nil { - return - } - go con.event.NotifyTime(b.Finalization.Timestamp) - } - deliveredBlocks = con.ccModule.extractBlocks() - for _, b := range deliveredBlocks { - if err = con.db.Update(*b); err != nil { - panic(err) - } - con.cfgModule.untouchTSigHash(b.Hash) - con.deliverBlock(b) - } - if err = con.lattice.PurgeBlocks(deliveredBlocks); err != nil { - return - } - return -} - -// processFinalizedBlock is the entry point for syncing blocks. -func (con *Consensus) processFinalizedBlock(block *types.Block) (err error) { - if err = con.lattice.SanityCheck(block); err != nil { - return - } - con.ccModule.processFinalizedBlock(block) - for { - confirmed := con.ccModule.extractFinalizedBlocks() - if len(confirmed) == 0 { - break - } - if err = con.lattice.ctModule.processBlocks(confirmed); err != nil { - return - } - for _, b := range confirmed { - if err = con.db.Put(*b); err != nil { - if err != blockdb.ErrBlockExists { - return - } - err = nil - } - con.deliverBlock(b) - } - } - return -} - -// PrepareBlock would setup header fields of block based on its ProposerID. -func (con *Consensus) prepareBlock(b *types.Block, - proposeTime time.Time) (err error) { - if err = con.lattice.PrepareBlock(b, proposeTime); err != nil { - return - } - // TODO(mission): decide CRS by block's round, which could be determined by - // block's info (ex. position, timestamp). - con.logger.Debug("Calling Governance.CRS", "round", 0) - if err = con.authModule.SignCRS(b, con.gov.CRS(0)); err != nil { - return - } - return -} - -// PrepareGenesisBlock would setup header fields for genesis block. -func (con *Consensus) PrepareGenesisBlock(b *types.Block, - proposeTime time.Time) (err error) { - if err = con.prepareBlock(b, proposeTime); err != nil { - return - } - if len(b.Payload) != 0 { - err = ErrGenesisBlockNotEmpty - return - } - return -} |