From b02fa5ee430cff9dafc9d9c399099a88d554a083 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 15 Mar 2019 18:17:53 +0800 Subject: core/syncer: add force sync (#468) * core: Add Recovery Interface * core/syncer: modify recovery interface * core: fix Recovery interface * core/syncer: rename terminator to watchcat (#491) * core/syncer: rename terminator to watchcat * Add error log * Rename Pat to Feed * core/syncer: add force sync * run prepareRandomness if round >= DKGDelayRound * Add test for Forcsync --- core/consensus.go | 141 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 80 insertions(+), 61 deletions(-) (limited to 'core/consensus.go') diff --git a/core/consensus.go b/core/consensus.go index 4201cbc..5ee64c2 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -453,6 +453,7 @@ func NewConsensusForSimulation( func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginHeight uint64, + startWithEmpty bool, dMoment time.Time, app Application, gov Governance, @@ -495,6 +496,23 @@ func NewConsensusFromSyncer( continue } } + if startWithEmpty { + pos := initBlock.Position + pos.Height++ + block, err := con.bcModule.addEmptyBlock(pos) + if err != nil { + panic(err) + } + con.processBlockChan <- block + if pos.Round >= DKGDelayRound { + rand := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + IsEmptyBlock: true, + } + go con.prepareRandomnessResult(rand) + } + } return con, nil } @@ -863,6 +881,10 @@ func (con *Consensus) Stop() { con.baMgr.stop() con.event.Reset() con.waitGroup.Wait() + if nbApp, ok := con.app.(*nonBlocking); ok { + fmt.Println("Stopping nonBlocking App") + nbApp.wait() + } } func (con *Consensus) deliverNetworkMsg() { @@ -1014,62 +1036,64 @@ func (con *Consensus) ProcessAgreementResult( con.logger.Debug("Rebroadcast AgreementResult", "result", rand) con.network.BroadcastAgreementResult(rand) + go con.prepareRandomnessResult(rand) + return nil +} - go func() { - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - con.logger.Error("Failed to get dkg set", - "round", rand.Position.Round, "error", err) - return - } - if _, exist := dkgSet[con.ID]; !exist { - return - } - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - con.logger.Error("Failed to prepare psig", - "round", rand.Position.Round, - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.signer.SignDKGPartialSignature(psig); err != nil { - con.logger.Error("Failed to sign psig", - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - con.logger.Error("Failed process psig", +func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) { + dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) + if err != nil { + con.logger.Error("Failed to get dkg set", + "round", rand.Position.Round, "error", err) + return + } + if _, exist := dkgSet[con.ID]; !exist { + return + } + con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash) + psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) + if err != nil { + con.logger.Error("Failed to prepare psig", + "round", rand.Position.Round, + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.signer.SignDKGPartialSignature(psig); err != nil { + con.logger.Error("Failed to sign psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.cfgModule.processPartialSignature(psig); err != nil { + con.logger.Error("Failed process psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", + "proposer", psig.ProposerID, + "round", psig.Round, + "hash", psig.Hash.String()[:6]) + con.network.BroadcastDKGPartialSignature(psig) + tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) + if err != nil { + if err != ErrTSigAlreadyRunning { + con.logger.Error("Failed to run TSIG", + "position", rand.Position, "hash", rand.BlockHash.String()[:6], "error", err) - return - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash.String()[:6]) - con.network.BroadcastDKGPartialSignature(psig) - tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) - if err != nil { - if err != ErrTSigAlreadyRunning { - con.logger.Error("Failed to run TSIG", - "position", rand.Position, - "hash", rand.BlockHash.String()[:6], - "error", err) - } - return } - result := &types.BlockRandomnessResult{ - BlockHash: rand.BlockHash, - Position: rand.Position, - Randomness: tsig.Signature, - } - // ProcessBlockRandomnessResult is not thread-safe so we put the result in - // the message channnel to be processed in the main thread. - con.msgChan <- result - }() - return nil + return + } + result := &types.BlockRandomnessResult{ + BlockHash: rand.BlockHash, + Position: rand.Position, + Randomness: tsig.Signature, + } + // ProcessBlockRandomnessResult is not thread-safe so we put the result in + // the message channnel to be processed in the main thread. + con.msgChan <- result } // ProcessBlockRandomnessResult processes the randomness result. @@ -1094,14 +1118,6 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - var exist bool - exist, err = con.nodeSetCache.Exists(b.Position.Round, b.ProposerID) - if err != nil { - return - } - if !exist { - return ErrProposerNotInNodeSet - } err = con.baMgr.processBlock(b) if err == nil && con.debugApp != nil { con.debugApp.BlockReceived(b.Hash) @@ -1137,7 +1153,10 @@ func (con *Consensus) deliveryGuard() { defer con.waitGroup.Done() time.Sleep(con.dMoment.Sub(time.Now())) // Node takes time to start. - time.Sleep(60 * time.Second) + select { + case <-con.ctx.Done(): + case <-time.After(60 * time.Second): + } for { select { case <-con.ctx.Done(): -- cgit v1.2.3