diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go | 533 |
1 files changed, 342 insertions, 191 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index d74a4a290..4a95eac6f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -27,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" + cryptoDKG "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg" "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" @@ -51,6 +52,10 @@ var ( "CRS not ready") ErrConfigurationNotReady = fmt.Errorf( "Configuration not ready") + ErrIncorrectBlockRandomness = fmt.Errorf( + "randomness of block is incorrect") + ErrCannotVerifyBlockRandomness = fmt.Errorf( + "cannot verify block randomness") ) // consensusBAReceiver implements agreementReceiver. @@ -60,8 +65,11 @@ type consensusBAReceiver struct { agreementModule *agreement changeNotaryHeightValue *atomic.Value roundValue *atomic.Value + emptyBlockHashMap *sync.Map isNotary bool restartNotary chan types.Position + npks *typesDKG.NodePublicKeys + psigSigner *dkgShareSecret } func (recv *consensusBAReceiver) round() uint64 { @@ -72,10 +80,85 @@ func (recv *consensusBAReceiver) changeNotaryHeight() uint64 { return recv.changeNotaryHeightValue.Load().(uint64) } +func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) ( + common.Hash, error) { + hashVal, ok := recv.emptyBlockHashMap.Load(pos) + if ok { + return hashVal.(common.Hash), nil + } + emptyBlock, err := recv.consensus.bcModule.prepareBlock( + pos, time.Time{}, true) + if err != nil { + return common.Hash{}, err + } + hash, err := utils.HashBlock(emptyBlock) + if err != nil { + return common.Hash{}, err + } + recv.emptyBlockHashMap.Store(pos, hash) + return hash, nil +} + +func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool { + if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { + if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { + if recv.npks == nil || recv.npks.Round != vote.Position.Round { + var err error + recv.npks, _, err = + recv.consensus.cfgModule.getDKGInfo(vote.Position.Round, true) + if err != nil || recv.npks == nil { + recv.consensus.logger.Warn("cannot get npks", + "round", vote.Position.Round, "error", err) + return false + } + } + pubKey, exist := recv.npks.PublicKeys[vote.ProposerID] + if !exist { + return false + } + blockHash := vote.BlockHash + if blockHash == types.NullBlockHash { + var err error + blockHash, err = recv.emptyBlockHash(vote.Position) + if err != nil { + recv.consensus.logger.Error( + "Failed to verify vote for empty block", + "position", vote.Position, + "error", err) + return false + } + } + return pubKey.VerifySignature( + vote.BlockHash, crypto.Signature(vote.PartialSignature)) + } + } + return len(vote.PartialSignature.Signature) == 0 +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return } + if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { + if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { + if recv.psigSigner == nil { + return + } + if vote.BlockHash == types.NullBlockHash { + hash, err := recv.emptyBlockHash(vote.Position) + if err != nil { + recv.consensus.logger.Error( + "Failed to propose vote for empty block", + "position", vote.Position, + "error", err) + return + } + vote.PartialSignature = recv.psigSigner.sign(hash) + } else { + vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash) + } + } + } if err := recv.agreementModule.prepareVote(vote); err != nil { recv.consensus.logger.Error("Failed to prepare vote", "error", err) return @@ -120,6 +203,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block *types.Block aID = recv.agreementModule.agreementID() ) + isEmptyBlockConfirmed := hash == common.Hash{} if isEmptyBlockConfirmed { recv.consensus.logger.Info("Empty block is confirmed", "position", aID) @@ -177,6 +261,72 @@ func (recv *consensusBAReceiver) ConfirmBlock( return } } + + // It's a workaround, the height for application is one-based. + block.Finalization.Height = block.Position.Height + 1 + + if len(votes) == 0 && len(block.Finalization.Randomness) == 0 { + recv.consensus.logger.Error("No votes to recover randomness", + "block", block) + } else if votes != nil { + voteList := make([]types.Vote, 0, len(votes)) + IDs := make(cryptoDKG.IDs, 0, len(votes)) + psigs := make([]cryptoDKG.PartialSignature, 0, len(votes)) + for _, vote := range votes { + if vote.BlockHash != hash { + continue + } + if recv.round() >= DKGDelayRound { + ID, exist := recv.npks.IDMap[vote.ProposerID] + if !exist { + continue + } + IDs = append(IDs, ID) + psigs = append(psigs, vote.PartialSignature) + } + voteList = append(voteList, *vote) + } + if recv.round() >= DKGDelayRound { + rand, err := cryptoDKG.RecoverSignature(psigs, IDs) + if err != nil { + recv.consensus.logger.Warn("Unable to recover randomness", + "block", block, + "error", err) + } else { + block.Finalization.Randomness = rand.Signature[:] + } + } + + if recv.isNotary { + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + FinalizationHeight: block.Finalization.Height, + IsEmptyBlock: isEmptyBlockConfirmed, + Randomness: block.Finalization.Randomness, + } + recv.consensus.logger.Debug("Broadcast AgreementResult", + "result", result) + recv.consensus.network.BroadcastAgreementResult(result) + if block.IsEmpty() { + if err := + recv.consensus.bcModule.processAgreementResult( + result); err != nil { + recv.consensus.logger.Warn( + "Failed to process agreement result", + "result", result) + } + } + if block.Position.Round >= DKGDelayRound { + recv.consensus.logger.Debug( + "Broadcast finalized block", + "block", block) + recv.consensus.network.BroadcastBlock(block) + } + } + } + if block.Position.Height != 0 && !recv.consensus.bcModule.confirmed(block.Position.Height-1) { go func(hash common.Hash) { @@ -212,6 +362,11 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.logger.Info("Receive parent block", "parent-hash", block.ParentHash.String()[:6], "cur-position", block.Position) + if block.Finalization.Height == 0 { + // TODO(jimmy): use a seperate message to pull finalized + // block. Here, we pull it again as workaround. + continue + } recv.consensus.processBlockChan <- block parentHash = block.ParentHash if block.Position.Height == 0 || @@ -222,25 +377,9 @@ func (recv *consensusBAReceiver) ConfirmBlock( } }(block.ParentHash) } - if recv.isNotary { - voteList := make([]types.Vote, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue - } - voteList = append(voteList, *vote) - } - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - IsEmptyBlock: isEmptyBlockConfirmed, - } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.network.BroadcastAgreementResult(result) + if !block.IsEmpty() { + recv.consensus.processBlockChan <- block } - recv.consensus.processBlockChan <- block // Clean the restartNotary channel so BA will not stuck by deadlock. CleanChannelLoop: for { @@ -253,14 +392,14 @@ CleanChannelLoop: newPos := block.Position if block.Position.Height+1 == recv.changeNotaryHeight() { newPos.Round++ - recv.roundValue.Store(newPos.Round) + recv.updateRound(newPos.Round) } currentRound := recv.round() changeNotaryHeight := recv.changeNotaryHeight() if block.Position.Height > changeNotaryHeight && block.Position.Round <= currentRound { panic(fmt.Errorf( - "round not switch when confirmig: %s, %d, should switch at %d, %s", + "round not switch when confirming: %s, %d, should switch at %d, %s", block, currentRound, changeNotaryHeight, newPos)) } recv.restartNotary <- newPos @@ -282,6 +421,18 @@ func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) { recv.consensus.gov.ReportForkBlock(b1, b2) } +func (recv *consensusBAReceiver) updateRound(round uint64) { + recv.roundValue.Store(round) + var err error + _, recv.psigSigner, err = + recv.consensus.cfgModule.getDKGInfo(round, false) + if err != nil { + recv.consensus.logger.Warn("cannot get dkg info", + "round", round, "error", err) + recv.psigSigner = nil + } +} + // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { ID types.NodeID @@ -401,13 +552,13 @@ type Consensus struct { bcModule *blockChain dMoment time.Time nodeSetCache *utils.NodeSetCache + tsigVerifierCache *TSigVerifierCache lock sync.RWMutex ctx context.Context ctxCancel context.CancelFunc event *common.Event roundEvent *utils.RoundEvent logger common.Logger - resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} msgChan chan interface{} waitGroup sync.WaitGroup @@ -465,7 +616,6 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, confirmedBlocks []*types.Block, - randomnessResults []*types.BlockRandomnessResult, cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. @@ -492,30 +642,13 @@ func NewConsensusFromSyncer( } refBlock = b } - // Dump all randomness result to the consensus instance. - for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r, false); err != nil { - con.logger.Error("failed to process randomness result when syncing", - "result", r) - continue - } - } if startWithEmpty { pos := initBlock.Position pos.Height++ - block, err := con.bcModule.addEmptyBlock(pos) + _, err := con.bcModule.addEmptyBlock(pos) if err != nil { panic(err) } - con.processBlockChan <- block - if pos.Round >= DKGDelayRound { - rand := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - IsEmptyBlock: true, - } - go con.prepareRandomnessResult(rand) - } } return con, nil } @@ -566,8 +699,9 @@ func newConsensusForRound( if usingNonBlocking { appModule = newNonBlocking(app, debugApp) } + tsigVerifierCache := NewTSigVerifierCache(gov, 7) bcModule := newBlockChain(ID, dMoment, initBlock, appModule, - NewTSigVerifierCache(gov, 7), signer, logger) + tsigVerifierCache, signer, logger) // Construct Consensus instance. con := &Consensus{ ID: ID, @@ -582,10 +716,10 @@ func newConsensusForRound( bcModule: bcModule, dMoment: dMoment, nodeSetCache: nodeSetCache, + tsigVerifierCache: tsigVerifierCache, signer: signer, event: common.NewEvent(), logger: logger, - resetRandomnessTicker: make(chan struct{}), resetDeliveryGuardTicker: make(chan struct{}), msgChan: make(chan interface{}, 1024), processBlockChan: make(chan *types.Block, 1024), @@ -600,7 +734,7 @@ func newConsensusForRound( baConfig := agreementMgrConfig{} baConfig.from(initRound, initConfig, initCRS) baConfig.SetRoundBeginHeight(gov.GetRoundHeight(initRound)) - con.baMgr, err = newAgreementMgr(con, initRound, baConfig) + con.baMgr, err = newAgreementMgr(con, baConfig) if err != nil { panic(err) } @@ -690,14 +824,14 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { if nextRound < DKGDelayRound { return } - curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round) + curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round) if err != nil { - con.logger.Error("Error getting DKG set when proposing CRS", + con.logger.Error("Error getting notary set when proposing CRS", "round", e.Round, "error", err) return } - if _, exist := curDKGSet[con.ID]; !exist { + if _, exist := curNotarySet[con.ID]; !exist { return } isDKGValid := func() bool { @@ -733,18 +867,18 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { // Register round event handler to propose new CRS. con.roundEvent.Register(func(evts []utils.RoundEventParam) { // We don't have to propose new CRS during DKG reset, the reset of DKG - // would be done by the DKG set in previous round. + // would be done by the notary set in previous round. e := evts[len(evts)-1] defer elapse("propose-CRS", e)() if e.Reset != 0 || e.Round < DKGDelayRound { return } - if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil { - con.logger.Error("Error getting DKG set when proposing CRS", + if curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round); err != nil { + con.logger.Error("Error getting notary set when proposing CRS", "round", e.Round, "error", err) } else { - if _, exist := curDkgSet[con.ID]; !exist { + if _, exist := curNotarySet[con.ID]; !exist { return } con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) { @@ -809,26 +943,26 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { // of unexpected network fluctuation and ensure the robustness. if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for DKG set", + con.logger.Debug("unable to prepare CRS for notary set", "round", nextRound, "reset", e.Reset) return } - nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) + nextNotarySet, err := con.nodeSetCache.GetNotarySet(nextRound) if err != nil { - con.logger.Error("Error getting DKG set for next round", + con.logger.Error("Error getting notary set for next round", "round", nextRound, "reset", e.Reset, "error", err) return } - if _, exist := nextDkgSet[con.ID]; !exist { - con.logger.Info("Not selected as DKG set", + if _, exist := nextNotarySet[con.ID]; !exist { + con.logger.Info("Not selected as notary set", "round", nextRound, "reset", e.Reset) return } - con.logger.Info("Selected as DKG set", + con.logger.Info("Selected as notary set", "round", nextRound, "reset", e.Reset) nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, @@ -851,11 +985,14 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { if initBlock != nil { con.event.NotifyHeight(initBlock.Finalization.Height) } + con.baMgr.prepare() return } // Run starts running DEXON Consensus. func (con *Consensus) Run() { + // There may have emptys block in blockchain added by force sync. + blocksWithoutRandomness := con.bcModule.pendingBlocksWithoutRandomness() // Launch BA routines. con.baMgr.run() // Launch network handler. @@ -865,12 +1002,6 @@ func (con *Consensus) Run() { con.waitGroup.Add(1) go con.processMsg() go con.processBlockLoop() - // Sleep until dMoment come. - time.Sleep(con.dMoment.Sub(time.Now().UTC())) - // Take some time to bootstrap. - time.Sleep(3 * time.Second) - con.waitGroup.Add(1) - go con.pullRandomness() // Stop dummy receiver if launched. if con.dummyCancel != nil { con.logger.Trace("Stop dummy receiver") @@ -893,6 +1024,11 @@ func (con *Consensus) Run() { } con.logger.Trace("Finish dumping cached messages") } + con.generateBlockRandomness(blocksWithoutRandomness) + // Sleep until dMoment come. + time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Take some time to bootstrap. + time.Sleep(3 * time.Second) con.waitGroup.Add(1) go con.deliveryGuard() // Block until done. @@ -901,6 +1037,76 @@ func (con *Consensus) Run() { } } +func (con *Consensus) generateBlockRandomness(blocks []*types.Block) { + con.logger.Debug("Start generating block randomness", "blocks", blocks) + isNotarySet := make(map[uint64]bool) + for _, block := range blocks { + if block.Position.Round < DKGDelayRound { + continue + } + doRun, exist := isNotarySet[block.Position.Round] + if !exist { + curNotarySet, err := con.nodeSetCache.GetNotarySet(block.Position.Round) + if err != nil { + con.logger.Error("Error getting notary set when generate block tsig", + "round", block.Position.Round, + "error", err) + continue + } + _, exist := curNotarySet[con.ID] + isNotarySet[block.Position.Round] = exist + doRun = exist + } + if !doRun { + continue + } + go func(block *types.Block) { + psig, err := con.cfgModule.preparePartialSignature( + block.Position.Round, block.Hash) + if err != nil { + con.logger.Error("Failed to prepare partial signature", + "block", block, + "error", err) + } else if err = con.signer.SignDKGPartialSignature(psig); err != nil { + con.logger.Error("Failed to sign DKG partial signature", + "block", block, + "error", err) + } else if err = con.cfgModule.processPartialSignature(psig); err != nil { + con.logger.Error("Failed to process partial signature", + "block", block, + "error", err) + } else { + con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", + "proposer", psig.ProposerID, + "block", block) + con.network.BroadcastDKGPartialSignature(psig) + sig, err := con.cfgModule.runTSig(block.Position.Round, block.Hash) + if err != nil { + con.logger.Error("Failed to run Block Tsig", + "block", block, + "error", err) + return + } + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Randomness: sig.Signature[:], + } + if err := con.bcModule.processAgreementResult(result); err != nil { + con.logger.Error("Failed to process BlockRandomness", + "result", result, + "error", err) + return + } + con.logger.Debug("Broadcast BlockRandomness", + "block", block, + "result", result) + con.network.BroadcastAgreementResult(result) + } + }(block) + } +} + // runDKG starts running DKG protocol. func (con *Consensus) runDKG(round, reset uint64, config *types.Config) { con.dkgReady.L.Lock() @@ -964,7 +1170,6 @@ func (con *Consensus) Stop() { con.event.Reset() con.waitGroup.Wait() if nbApp, ok := con.app.(*nonBlocking); ok { - fmt.Println("Stopping nonBlocking App") nbApp.wait() } } @@ -1019,11 +1224,47 @@ MessageLoop: ch, e := con.baConfirmedBlock[val.Hash] return ch, e }(); exist { - if err := utils.VerifyBlockSignature(val); err != nil { - con.logger.Error("VerifyBlockSignature failed", - "block", val, - "error", err) - continue MessageLoop + if val.IsEmpty() { + hash, err := utils.HashBlock(val) + if err != nil { + con.logger.Error("error verifying empty block hash", + "block", val, + "error, err") + continue MessageLoop + } + if hash != val.Hash { + con.logger.Error("incorrect confirmed empty block hash", + "block", val, + "hash", hash) + continue MessageLoop + } + if _, err := con.bcModule.proposeBlock( + val.Position, time.Time{}, true); err != nil { + con.logger.Error("error adding empty block", + "block", val, + "error", err) + continue MessageLoop + } + } else { + ok, err := con.bcModule.verifyRandomness( + val.Hash, val.Position.Round, val.Finalization.Randomness) + if err != nil { + con.logger.Error("error verifying confirmed block randomness", + "block", val, + "error", err) + continue MessageLoop + } + if !ok { + con.logger.Error("incorrect confirmed block randomness", + "block", val) + continue MessageLoop + } + if err := utils.VerifyBlockSignature(val); err != nil { + con.logger.Error("VerifyBlockSignature failed", + "block", val, + "error", err) + continue MessageLoop + } } func() { con.lock.Lock() @@ -1036,7 +1277,6 @@ MessageLoop: ch <- val }() } else if val.IsFinalized() { - // For sync mode. if err := con.processFinalizedBlock(val); err != nil { con.logger.Error("Failed to process finalized block", "block", val, @@ -1061,13 +1301,6 @@ MessageLoop: "result", val, "error", err) } - case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val, true); err != nil { - con.logger.Error("Failed to process block randomness result", - "hash", val.BlockHash.String()[:6], - "position", val.Position, - "error", err) - } case *typesDKG.PrivateShare: if err := con.cfgModule.processPrivateShare(val); err != nil { con.logger.Error("Failed to process private share", @@ -1101,139 +1334,67 @@ func (con *Consensus) ProcessAgreementResult( con.baMgr.untouchAgreementResult(rand) return err } + if err := con.bcModule.processAgreementResult(rand); err != nil { + con.baMgr.untouchAgreementResult(rand) + return err + } // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { + con.baMgr.untouchAgreementResult(rand) return err } - // Calculating randomness. - if rand.Position.Round == 0 { - return nil - } - // TODO(mission): find a way to avoid spamming by older agreement results. - // Sanity check done. - if !con.cfgModule.touchTSigHash(rand.BlockHash) { - return nil - } con.logger.Debug("Rebroadcast AgreementResult", "result", rand) con.network.BroadcastAgreementResult(rand) - go con.prepareRandomnessResult(rand) - return nil + + return con.deliverFinalizedBlocks() } -func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) { - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - con.logger.Error("Failed to get dkg set", - "round", rand.Position.Round, "error", err) - return +// preProcessBlock performs Byzantine Agreement on the block. +func (con *Consensus) preProcessBlock(b *types.Block) (err error) { + err = con.baMgr.processBlock(b) + if err == nil && con.debugApp != nil { + con.debugApp.BlockReceived(b.Hash) } - if _, exist := dkgSet[con.ID]; !exist { + return +} + +func (con *Consensus) processFinalizedBlock(b *types.Block) (err error) { + if b.Position.Round < DKGDelayRound { return } - con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash) - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - con.logger.Error("Failed to prepare psig", - "round", rand.Position.Round, - "hash", rand.BlockHash.String()[:6], - "error", err) + if err = utils.VerifyBlockSignature(b); err != nil { return } - if err = con.signer.SignDKGPartialSignature(psig); err != nil { - con.logger.Error("Failed to sign psig", - "hash", rand.BlockHash.String()[:6], - "error", err) + verifier, ok, err := con.tsigVerifierCache.UpdateAndGet(b.Position.Round) + if err != nil { return } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - con.logger.Error("Failed process psig", - "hash", rand.BlockHash.String()[:6], - "error", err) + if !ok { + err = ErrCannotVerifyBlockRandomness return } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash.String()[:6]) - con.network.BroadcastDKGPartialSignature(psig) - tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) - if err != nil { - if err != ErrTSigAlreadyRunning { - con.logger.Error("Failed to run TSIG", - "position", rand.Position, - "hash", rand.BlockHash.String()[:6], - "error", err) - } + if !verifier.VerifySignature(b.Hash, crypto.Signature{ + Type: "bls", + Signature: b.Finalization.Randomness, + }) { + err = ErrIncorrectBlockRandomness return } - result := &types.BlockRandomnessResult{ - BlockHash: rand.BlockHash, - Position: rand.Position, - Randomness: tsig.Signature, - } - // ProcessBlockRandomnessResult is not thread-safe so we put the result in - // the message channnel to be processed in the main thread. - con.msgChan <- result -} - -// ProcessBlockRandomnessResult processes the randomness result. -func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult, needBroadcast bool) error { - if rand.Position.Round == 0 { - return nil - } - if !con.bcModule.shouldAddRandomness(rand) { - return nil - } - if err := con.bcModule.addRandomness(rand); err != nil { - return err - } - if needBroadcast { - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "randomness", rand) - con.network.BroadcastRandomnessResult(rand) - } - return con.deliverFinalizedBlocks() -} - -// preProcessBlock performs Byzantine Agreement on the block. -func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - err = con.baMgr.processBlock(b) + err = con.baMgr.processFinalizedBlock(b) if err == nil && con.debugApp != nil { con.debugApp.BlockReceived(b.Hash) } return } -func (con *Consensus) pullRandomness() { - defer con.waitGroup.Done() - for { - select { - case <-con.ctx.Done(): - return - default: - } - select { - case <-con.ctx.Done(): - return - case <-con.resetRandomnessTicker: - case <-time.After(1500 * time.Millisecond): - // TODO(jimmy): pulling period should be related to lambdaBA. - hashes := con.bcModule.pendingBlocksWithoutRandomness() - if len(hashes) > 0 { - con.logger.Debug( - "Calling Network.PullRandomness", "blocks", hashes) - con.network.PullRandomness(hashes) - } - } - } -} - func (con *Consensus) deliveryGuard() { defer con.waitGroup.Done() - time.Sleep(con.dMoment.Sub(time.Now())) + select { + case <-con.ctx.Done(): + case <-time.After(con.dMoment.Sub(time.Now())): + } // Node takes time to start. select { case <-con.ctx.Done(): @@ -1259,10 +1420,6 @@ func (con *Consensus) deliveryGuard() { // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { select { - case con.resetRandomnessTicker <- struct{}{}: - default: - } - select { case con.resetDeliveryGuardTicker <- struct{}{}: default: } @@ -1274,7 +1431,6 @@ func (con *Consensus) deliverBlock(b *types.Block) { b.Hash, b.Finalization.Height); err != nil { panic(err) } - con.cfgModule.untouchTSigHash(b.Hash) con.logger.Debug("Calling Application.BlockDelivered", "block", b) con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone()) if con.debugApp != nil { @@ -1338,15 +1494,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { return } -// processFinalizedBlock is the entry point for handling finalized blocks. -func (con *Consensus) processFinalizedBlock(block *types.Block) error { - return con.bcModule.processFinalizedBlock(block) -} - // PrepareBlock would setup header fields of block based on its ProposerID. func (con *Consensus) proposeBlock(position types.Position) ( *types.Block, error) { - b, err := con.bcModule.proposeBlock(position, time.Now().UTC()) + b, err := con.bcModule.proposeBlock(position, time.Now().UTC(), false) if err != nil { return nil, err } |