diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-03-23 23:41:04 +0800 |
---|---|---|
committer | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-23 23:41:04 +0800 |
commit | d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add (patch) | |
tree | 8bf24cbb04def6851a474bdc941e19bd8ce9c2e7 | |
parent | fb9bbdf2a34aa45c0f032b996f72cafd7bccfa80 (diff) | |
download | dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.tar dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.tar.gz dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.tar.bz2 dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.tar.lz dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.tar.xz dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.tar.zst dexon-consensus-d077a35470cf4b6e7c82bd4b03a1f2b87b0f9add.zip |
core: refine DKG aborting (#512)
* Avoid aborting the DKG protocol registered later
Although that DKG protocol would be registered after
1/2 round, both of them are triggered in separated
go routine and we shouldn't assuem their execution order.
* Capitalize logs
* Add test
* Return aborted when not running
* Log DKG aborting result
* Remove duplicated DKG abort
-rw-r--r-- | core/agreement-mgr.go | 6 | ||||
-rw-r--r-- | core/agreement.go | 4 | ||||
-rw-r--r-- | core/configuration-chain.go | 13 | ||||
-rw-r--r-- | core/configuration-chain_test.go | 37 | ||||
-rw-r--r-- | core/consensus.go | 19 | ||||
-rw-r--r-- | core/syncer/agreement.go | 18 | ||||
-rw-r--r-- | core/syncer/consensus.go | 16 | ||||
-rw-r--r-- | simulation/node.go | 10 |
8 files changed, 81 insertions, 42 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 14aa385..d29863d 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -337,7 +337,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) { if curConfig = mgr.config(nextRound); curConfig != nil { break } else { - mgr.logger.Debug("round is not ready", "round", nextRound) + mgr.logger.Debug("Round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } @@ -350,11 +350,11 @@ func (mgr *agreementMgr) runBA(initRound uint64) { setting.notarySet = notarySet _, isNotary = setting.notarySet[mgr.ID] if isNotary { - mgr.logger.Info("selected as notary set", + mgr.logger.Info("Selected as notary set", "ID", mgr.ID, "round", nextRound) } else { - mgr.logger.Info("not selected as notary set", + mgr.logger.Info("Not selected as notary set", "ID", mgr.ID, "round", nextRound) } diff --git a/core/agreement.go b/core/agreement.go index b0c7734..1919830 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -239,14 +239,14 @@ func (a *agreement) restart( for _, block := range replayBlock { if err := a.processBlock(block); err != nil { - a.logger.Error("failed to process block when restarting agreement", + a.logger.Error("Failed to process block when restarting agreement", "block", block) } } for _, vote := range replayVote { if err := a.processVote(vote); err != nil { - a.logger.Error("failed to process vote when restarting agreement", + a.logger.Error("Failed to process vote when restarting agreement", "vote", vote) } } diff --git a/core/configuration-chain.go b/core/configuration-chain.go index 2c16ac3..48b0f2a 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -114,20 +114,21 @@ func newConfigurationChain( func (cc *configurationChain) abortDKG( parentCtx context.Context, - round, reset uint64) { + round, reset uint64) bool { cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg != nil { - cc.abortDKGNoLock(parentCtx, round, reset) + return cc.abortDKGNoLock(parentCtx, round, reset) } + return false } func (cc *configurationChain) abortDKGNoLock( ctx context.Context, round, reset uint64) bool { if cc.dkg.round > round || - (cc.dkg.round == round && cc.dkg.reset >= reset) { - cc.logger.Error("newer DKG already is registered", + (cc.dkg.round == round && cc.dkg.reset > reset) { + cc.logger.Error("Newer DKG already is registered", "round", round, "reset", reset) return false @@ -163,7 +164,7 @@ func (cc *configurationChain) abortDKGNoLock( cc.logger.Error("Previous DKG aborted", "round", round, "reset", reset) - return true + return cc.dkg == nil } func (cc *configurationChain) registerDKG( @@ -646,7 +647,7 @@ func (cc *configurationChain) runTSig( go func() { for _, psig := range pendingPsig { if err := cc.processPartialSignature(psig); err != nil { - cc.logger.Error("failed to process partial signature", + cc.logger.Error("Failed to process partial signature", "nodeID", cc.ID, "error", err) } diff --git a/core/configuration-chain_test.go b/core/configuration-chain_test.go index 0efafd2..7c73f41 100644 --- a/core/configuration-chain_test.go +++ b/core/configuration-chain_test.go @@ -607,6 +607,7 @@ func (s *ConfigurationChainTestSuite) TestDKGAbort() { s.pubKeys, 100*time.Millisecond, &common.NullLogger{}, true, ), ConfigRoundShift) s.Require().NoError(err) + gov.CatchUpWithRound(round + 1) cache := utils.NewNodeSetCache(gov) dbInst, err := db.NewMemBackedDB() s.Require().NoError(err) @@ -640,10 +641,44 @@ func (s *ConfigurationChainTestSuite) TestDKGAbort() { // The third register shouldn't be blocked, too randHash = common.NewRandomHash() gov.ProposeCRS(round+1, randHash[:]) + randHash = common.NewRandomHash() + gov.ResetDKG(randHash[:]) + <-called + cc.registerDKG(context.Background(), round+1, reset+1, k) + err = <-errs + s.Require().EqualError(ErrDKGAborted, err.Error()) + go func() { + called <- struct{}{} + errs <- cc.runDKG(round+1, reset+1) + }() <-called - cc.registerDKG(context.Background(), round+1, reset, k) + // Abort with older round, shouldn't be aborted. + aborted := cc.abortDKG(context.Background(), round, reset+1) + s.Require().False(aborted) + select { + case err = <-errs: + // Should not aborted yet. + s.Require().False(true) + default: + } + // Abort with older reset, shouldn't be aborted. + aborted = cc.abortDKG(context.Background(), round+1, reset) + s.Require().False(aborted) + select { + case err = <-errs: + // Should not aborted yet. + s.Require().False(true) + default: + } + // Abort with same round/reset, should be aborted. + aborted = cc.abortDKG(context.Background(), round+1, reset+1) + s.Require().True(aborted) err = <-errs s.Require().EqualError(ErrDKGAborted, err.Error()) + // Abort while not running yet, should return "aborted". + cc.registerDKG(context.Background(), round+1, reset+1, k) + aborted = cc.abortDKG(context.Background(), round+1, reset+1) + s.Require().True(aborted) } func TestConfigurationChain(t *testing.T) { diff --git a/core/consensus.go b/core/consensus.go index 28e8379..d74a4a2 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -99,7 +99,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { } block, err := recv.consensus.proposeBlock(recv.agreementModule.agreementID()) if err != nil || block == nil { - recv.consensus.logger.Error("unable to propose block", "error", err) + recv.consensus.logger.Error("Unable to propose block", "error", err) return types.NullBlockHash } go func() { @@ -658,7 +658,13 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { e := evts[len(evts)-1] go func() { defer elapse("abort-DKG", e)() - con.cfgModule.abortDKG(con.ctx, e.Round+1, e.Reset) + if e.Reset > 0 { + aborted := con.cfgModule.abortDKG(con.ctx, e.Round+1, e.Reset-1) + con.logger.Info("DKG aborting result", + "round", e.Round+1, + "reset", e.Reset-1, + "aborted", aborted) + } }() }) // Register round event handler to update BA and BC modules. @@ -721,10 +727,7 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { return } // Aborting all previous running DKG protocol instance if any. - go func() { - con.cfgModule.abortDKG(con.ctx, nextRound, e.Reset) - con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) - }() + go con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) }) }) // Register round event handler to propose new CRS. @@ -1247,8 +1250,8 @@ func (con *Consensus) deliveryGuard() { return case <-con.resetDeliveryGuardTicker: case <-time.After(60 * time.Second): - con.logger.Error("no blocks delivered for too long", "ID", con.ID) - panic(fmt.Errorf("no blocks delivered for too long")) + con.logger.Error("No blocks delivered for too long", "ID", con.ID) + panic(fmt.Errorf("No blocks delivered for too long")) } } } diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index 9f1abca..f172b3b 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -103,7 +103,7 @@ func (a *agreement) processBlock(b *types.Block) { func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { - a.logger.Trace("agreement result already confirmed", "result", r) + a.logger.Trace("Agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { @@ -113,11 +113,11 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { a.pendings[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r - a.logger.Trace("agreement result cached", "result", r) + a.logger.Trace("Agreement result cached", "result", r) return } if err := core.VerifyAgreementResult(r, a.cache); err != nil { - a.logger.Error("agreement result verification failed", + a.logger.Error("Agreement result verification failed", "result", r, "error", err) return @@ -144,12 +144,12 @@ loop: case a.pullChan <- r.BlockHash: break loop case <-a.ctx.Done(): - a.logger.Error("pull request is not sent", + a.logger.Error("Pull request is not sent", "position", &r.Position, "hash", r.BlockHash.String()[:6]) return case <-time.After(500 * time.Millisecond): - a.logger.Debug("pull request is unable to send", + a.logger.Debug("Pull request is unable to send", "position", &r.Position, "hash", r.BlockHash.String()[:6]) } @@ -171,12 +171,12 @@ func (a *agreement) processNewCRS(round uint64) { delete(a.pendings, r) for _, res := range pendingsForRound { if err := core.VerifyAgreementResult(res, a.cache); err != nil { - a.logger.Error("invalid agreement result", + a.logger.Error("Invalid agreement result", "result", res, "error", err) continue } - a.logger.Error("flush agreement result", "result", res) + a.logger.Error("Flush agreement result", "result", res) a.processAgreementResult(res) break } @@ -194,10 +194,10 @@ func (a *agreement) confirm(b *types.Block) { case a.outputChan <- b: break loop case <-a.ctx.Done(): - a.logger.Error("confirmed block is not sent", "block", b) + a.logger.Error("Confirmed block is not sent", "block", b) return case <-time.After(500 * time.Millisecond): - a.logger.Debug("agreement output channel is full", "block", b) + a.logger.Debug("Agreement output channel is full", "block", b) } } a.confirmedBlocks[b.Hash] = struct{}{} diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index d05651e..24c781a 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -191,7 +191,7 @@ func (con *Consensus) assureBuffering() { return false case <-time.After(500 * time.Millisecond): con.logger.Warn( - "agreement input channel is full when notifying new round", + "Agreement input channel is full when notifying new round", "round", e.Round, ) return true @@ -217,7 +217,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { con.lock.RLock() defer con.lock.RUnlock() defer func() { - con.logger.Debug("syncer synced status", + con.logger.Debug("Syncer synced status", "last-block", blocks[len(blocks)-1], "synced", synced, ) @@ -303,14 +303,14 @@ func (con *Consensus) SyncBlocks( // tip in DB. _, tipHeight := con.db.GetCompactionChainTipInfo() if blocks[0].Finalization.Height != tipHeight+1 { - con.logger.Error("mismatched finalization height", + con.logger.Error("Mismatched finalization height", "now", blocks[0].Finalization.Height, "expected", tipHeight+1, ) err = ErrInvalidSyncingFinalizationHeight return } - con.logger.Trace("syncBlocks", + con.logger.Trace("SyncBlocks", "position", &blocks[0].Position, "final height", blocks[0].Finalization.Height, "len", len(blocks), @@ -404,14 +404,14 @@ func (con *Consensus) stopBuffering() { return } con.duringBuffering = false - con.logger.Trace("syncer is about to stop") + con.logger.Trace("Syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. con.ctxCancel() return }() { return } - con.logger.Trace("stop syncer modules") + con.logger.Trace("Stop syncer modules") con.roundEvt.Stop() con.waitGroup.Done() // Wait for all routines depends on con.agreementModule stopped. @@ -424,9 +424,9 @@ func (con *Consensus) stopBuffering() { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) // Stop agreements. - con.logger.Trace("stop syncer agreement modules") + con.logger.Trace("Stop syncer agreement modules") con.stopAgreement() - con.logger.Trace("syncer stopped") + con.logger.Trace("Syncer stopped") return } diff --git a/simulation/node.go b/simulation/node.go index b5b261c..5bd8ec1 100644 --- a/simulation/node.go +++ b/simulation/node.go @@ -149,12 +149,12 @@ readyLoop: break readyLoop case ntfSelectedAsMaster: n.logger.Info( - "receive 'selected-as-master' notification from server") + "Receive 'selected-as-master' notification from server") for _, c := range n.cfg.Node.Changes { if c.Round <= core.ConfigRoundShift+1 { continue } - n.logger.Info("register config change", "change", c) + n.logger.Info("Register config change", "change", c) if err := c.RegisterChange(n.gov); err != nil { panic(err) } @@ -181,11 +181,11 @@ MainLoop: switch val := msg.(type) { case serverNotification: if val == ntfShutdown { - n.logger.Info("receive shutdown notification from server") + n.logger.Info("Receive shutdown notification from server") break MainLoop } default: - panic(fmt.Errorf("unexpected message from server: %v", val)) + panic(fmt.Errorf("Unexpected message from server: %v", val)) } } // Cleanup. @@ -218,7 +218,7 @@ func (n *node) prepareConfigs() { // These rounds are not safe to be registered as pending state change // requests. for i := uint64(0); i <= core.ConfigRoundShift+1; i++ { - n.logger.Info("prepare config", "round", i) + n.logger.Info("Prepare config", "round", i) prepareConfigs(i, n.cfg.Node.Changes, n.gov) } // This notification is implictly called in full node. |