From 3785190d9154acb0492dd22b6d62e953a0bcf453 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Tue, 19 Mar 2019 13:53:26 +0800 Subject: core: abort hang dkg (#500) --- core/configuration-chain.go | 233 ++++++++++++++++++++++++++++++--------- core/configuration-chain_test.go | 44 ++++++++ core/utils/round-event.go | 12 +- 3 files changed, 231 insertions(+), 58 deletions(-) (limited to 'core') diff --git a/core/configuration-chain.go b/core/configuration-chain.go index 31827f2..084f9d0 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -38,20 +38,39 @@ var ( "tsig is already running") ErrDKGNotReady = fmt.Errorf( "DKG is not ready") - ErrSkipButNoError = fmt.Errorf("skip but no error") + ErrSkipButNoError = fmt.Errorf( + "skip but no error") + ErrDKGAborted = fmt.Errorf( + "DKG is aborted") ) +// ErrMismatchDKG represent an attempt to run DKG protocol is failed because +// the register DKG protocol is mismatched, interms of round and resetCount. +type ErrMismatchDKG struct { + expectRound, expectReset uint64 + actualRound, actualReset uint64 +} + +func (e ErrMismatchDKG) Error() string { + return fmt.Sprintf( + "mismatch DKG, abort running: expect(%d %d) actual(%d %d)", + e.expectRound, e.expectReset, e.actualRound, e.actualReset) +} + +type dkgStepFn func(round uint64, reset uint64) (chan<- struct{}, error) + type configurationChain struct { ID types.NodeID recv dkgReceiver gov Governance dkg *dkgProtocol - dkgRunPhases []func(round uint64, reset uint64) error + dkgRunPhases []dkgStepFn logger common.Logger dkgLock sync.RWMutex dkgSigner map[uint64]*dkgShareSecret npks map[uint64]*typesDKG.NodePublicKeys dkgResult sync.RWMutex + abortDKGCh chan chan<- struct{} tsig map[common.Hash]*tsigProtocol tsigTouched map[common.Hash]struct{} tsigReady *sync.Cond @@ -79,6 +98,7 @@ func newConfigurationChain( logger: logger, dkgSigner: make(map[uint64]*dkgShareSecret), npks: make(map[uint64]*typesDKG.NodePublicKeys), + abortDKGCh: make(chan chan<- struct{}, 1), tsig: make(map[common.Hash]*tsigProtocol), tsigTouched: make(map[common.Hash]struct{}), tsigReady: sync.NewCond(&sync.Mutex{}), @@ -90,12 +110,62 @@ func newConfigurationChain( return configurationChain } +func (cc *configurationChain) abortDKG(round, reset uint64) { + cc.dkgLock.Lock() + defer cc.dkgLock.Unlock() + if cc.dkg != nil { + cc.abortDKGNoLock(round, reset) + } +} + +func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool { + if cc.dkg.round > round || + (cc.dkg.round == round && cc.dkg.reset >= reset) { + cc.logger.Error("newer DKG already is registered", + "round", round, + "reset", reset) + return false + } + cc.logger.Error("Previous DKG is not finished", + "round", round, + "reset", reset, + "previous-round", cc.dkg.round, + "previous-reset", cc.dkg.reset) + // Abort DKG routine in previous round. + aborted := make(chan struct{}, 1) + cc.logger.Error("Aborting DKG in previous round", + "round", round, + "previous-round", cc.dkg.round) + cc.dkgLock.Unlock() + // Notify current running DKG protocol to abort. + cc.abortDKGCh <- aborted + // Wait for current running DKG protocol aborting. + <-aborted + cc.logger.Error("Previous DKG aborted", + "round", round, + "reset", reset) + cc.dkgLock.Lock() + return true +} + func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg != nil { - cc.logger.Error("Previous DKG is not finished") - // TODO(mission): return here and fix CI failure. + // Make sure we only proceed when cc.dkg is nil. + if !cc.abortDKGNoLock(round, reset) { + return + } + if cc.dkg != nil { + // This panic would only raise when multiple attampts to register + // a DKG protocol at the same time. + panic(ErrMismatchDKG{ + expectRound: round, + expectReset: reset, + actualRound: cc.dkg.round, + actualReset: cc.dkg.reset, + }) + } } dkgSet, err := cc.cache.GetDKGSet(round) if err != nil { @@ -109,7 +179,6 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { if err != nil { panic(err) } - if cc.dkg == nil { cc.dkg = newDKGProtocol( cc.ID, @@ -132,37 +201,51 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { <-ticker.Tick() cc.dkgLock.Lock() defer cc.dkgLock.Unlock() - cc.dkg.proposeMPKReady() + if cc.dkg != nil && cc.dkg.round == round && cc.dkg.reset == reset { + cc.dkg.proposeMPKReady() + } }() } -func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) error { +func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) ( + abortCh chan<- struct{}, err error) { if cc.dkg.round < round || (cc.dkg.round == round && cc.dkg.reset < reset) { - return ErrDKGNotRegistered + err = ErrDKGNotRegistered + return } if cc.dkg.round != round || cc.dkg.reset != reset { cc.logger.Warn("DKG canceled", "round", round, "reset", reset) - return ErrSkipButNoError + err = ErrSkipButNoError + return } cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) if cc.gov.IsDKGFinal(round) { cc.logger.Warn("DKG already final", "round", round) - return ErrSkipButNoError + err = ErrSkipButNoError + return } cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round) - for !cc.gov.IsDKGMPKReady(round) { - cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", - "nodeID", cc.ID) + for abortCh == nil && !cc.gov.IsDKGMPKReady(round) { cc.dkgLock.Unlock() - time.Sleep(500 * time.Millisecond) + cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", + "nodeID", cc.ID, + "round", round) + select { + case abortCh = <-cc.abortDKGCh: + err = ErrDKGAborted + case <-time.After(500 * time.Millisecond): + } cc.dkgLock.Lock() } - - return nil + if abortCh != nil { + return + } + return } -func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64) error { +func (cc *configurationChain) runDKGPhaseTwoAndThree( + round uint64, reset uint64) (chan<- struct{}, error) { // Check if this node successfully join the protocol. cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round) mpks := cc.gov.DKGMasterPublicKeys(round) @@ -177,7 +260,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64) cc.logger.Warn("Failed to join DKG protocol", "round", round, "reset", reset) - return ErrSkipButNoError + return nil, ErrSkipButNoError } // Phase 2(T = 0): Exchange DKG secret key share. if err := cc.dkg.processMasterPublicKeys(mpks); err != nil { @@ -187,6 +270,13 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64) "error", err) } cc.mpkReady = true + // The time to process private share might be long, check aborting before + // get into that loop. + select { + case abortCh := <-cc.abortDKGCh: + return abortCh, ErrDKGAborted + default: + } for _, prvShare := range cc.pendingPrvShare { if err := cc.dkg.processPrivateShare(prvShare); err != nil { cc.logger.Error("Failed to process private share", @@ -198,7 +288,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64) // Phase 3(T = 0~λ): Propose complaint. // Propose complaint is done in `processMasterPublicKeys`. - return nil + return nil, nil } func (cc *configurationChain) runDKGPhaseFour() { @@ -232,17 +322,27 @@ func (cc *configurationChain) runDKGPhaseEight() { cc.dkg.proposeFinalize() } -func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error { +func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( + abortCh chan<- struct{}, err error) { // Phase 9(T = 6λ): DKG is ready. // Normally, IsDKGFinal would return true here. Use this for in case of // unexpected network fluctuation and ensure the robustness of DKG protocol. cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) - for !cc.gov.IsDKGFinal(round) { + for abortCh == nil && !cc.gov.IsDKGFinal(round) { + cc.dkgLock.Unlock() cc.logger.Debug("DKG is not ready yet. Try again later...", "nodeID", cc.ID.String()[:6], "round", round, "reset", reset) - time.Sleep(500 * time.Millisecond) + select { + case abortCh = <-cc.abortDKGCh: + err = ErrDKGAborted + case <-time.After(500 * time.Millisecond): + } + cc.dkgLock.Lock() + } + if abortCh != nil { + return } cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round) cc.logger.Debug("Calling Governance.DKGComplaints", "round", round) @@ -251,7 +351,7 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error cc.gov.DKGComplaints(round), cc.dkg.threshold) if err != nil { - return err + return } qualifies := "" for nID := range npks.QualifyNodeIDs { @@ -267,79 +367,106 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error cc.logger.Warn("Self is not in Qualify Nodes", "round", round, "reset", reset) - return nil + return } signer, err := cc.dkg.recoverShareSecret(npks.QualifyIDs) if err != nil { - return err + return } // Save private shares to DB. if err = cc.db.PutDKGPrivateKey(round, *signer.privateKey); err != nil { - return err + return } cc.dkgResult.Lock() defer cc.dkgResult.Unlock() cc.dkgSigner[round] = signer cc.npks[round] = npks - return nil + return } -func (cc *configurationChain) runTick(ticker Ticker) { +func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) { cc.dkgLock.Unlock() - <-ticker.Tick() - cc.dkgLock.Lock() + defer cc.dkgLock.Lock() + select { + case abortCh = <-cc.abortDKGCh: + case <-ticker.Tick(): + } + return } func (cc *configurationChain) initDKGPhasesFunc() { - cc.dkgRunPhases = []func(round uint64, reset uint64) error{ - func(round uint64, reset uint64) error { + cc.dkgRunPhases = []dkgStepFn{ + func(round uint64, reset uint64) (chan<- struct{}, error) { return cc.runDKGPhaseOne(round, reset) }, - func(round uint64, reset uint64) error { + func(round uint64, reset uint64) (chan<- struct{}, error) { return cc.runDKGPhaseTwoAndThree(round, reset) }, - func(round uint64, reset uint64) error { + func(round uint64, reset uint64) (chan<- struct{}, error) { cc.runDKGPhaseFour() - return nil + return nil, nil }, - func(round uint64, reset uint64) error { + func(round uint64, reset uint64) (chan<- struct{}, error) { cc.runDKGPhaseFiveAndSix(round, reset) - return nil + return nil, nil }, - func(round uint64, reset uint64) error { + func(round uint64, reset uint64) (chan<- struct{}, error) { complaints := cc.gov.DKGComplaints(round) cc.runDKGPhaseSeven(complaints) - return nil + return nil, nil }, - func(round uint64, reset uint64) error { + func(round uint64, reset uint64) (chan<- struct{}, error) { cc.runDKGPhaseEight() - return nil + return nil, nil }, - func(round uint64, reset uint64) error { + func(round uint64, reset uint64) (chan<- struct{}, error) { return cc.runDKGPhaseNine(round, reset) }, } } -func (cc *configurationChain) runDKG(round uint64, reset uint64) error { +func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { // Check if corresponding DKG signer is ready. - if _, _, err := cc.getDKGInfo(round); err == nil { + if _, _, err = cc.getDKGInfo(round); err == nil { return ErrSkipButNoError } cc.dkgLock.Lock() defer cc.dkgLock.Unlock() - - tickStartAt := 1 - var ticker Ticker + var ( + ticker Ticker + abortCh chan<- struct{} + ) defer func() { if ticker != nil { ticker.Stop() } + // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done. + cc.dkg = nil + if abortCh == nil { + select { + case abortCh = <-cc.abortDKGCh: + // The previous DKG finishes its job, don't overwrite its error + // with "aborted" here. + default: + } + } + if abortCh != nil { + abortCh <- struct{}{} + } }() + tickStartAt := 1 if cc.dkg == nil { return ErrDKGNotRegistered } + if cc.dkg.round != round || cc.dkg.reset != reset { + return ErrMismatchDKG{ + expectRound: round, + expectReset: reset, + actualRound: cc.dkg.round, + actualReset: cc.dkg.reset, + } + } for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ { if i >= tickStartAt && ticker == nil { @@ -347,13 +474,15 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) error { } if ticker != nil { - cc.runTick(ticker) + if abortCh = cc.runTick(ticker); abortCh != nil { + return + } } - switch err := cc.dkgRunPhases[i](round, reset); err { + switch abortCh, err = cc.dkgRunPhases[i](round, reset); err { case ErrSkipButNoError, nil: cc.dkg.step = i + 1 - err := cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo()) + err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo()) if err != nil { return fmt.Errorf("put or update DKG protocol error: %v", err) } @@ -361,10 +490,10 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) error { if err == nil { continue } else { - return nil + return } default: - return err + return } } diff --git a/core/configuration-chain_test.go b/core/configuration-chain_test.go index 3d8d1aa..9de06df 100644 --- a/core/configuration-chain_test.go +++ b/core/configuration-chain_test.go @@ -596,6 +596,50 @@ func (s *ConfigurationChainTestSuite) TestDKGPhasesSnapShot() { } } +func (s *ConfigurationChainTestSuite) TestDKGAbort() { + n := 4 + k := 1 + round := DKGDelayRound + reset := uint64(0) + s.setupNodes(n) + gov, err := test.NewGovernance(test.NewState(DKGDelayRound, + s.pubKeys, 100*time.Millisecond, &common.NullLogger{}, true, + ), ConfigRoundShift) + s.Require().NoError(err) + cache := utils.NewNodeSetCache(gov) + dbInst, err := db.NewMemBackedDB() + s.Require().NoError(err) + recv := newTestCCGlobalReceiver(s) + nID := s.nIDs[0] + cc := newConfigurationChain(nID, + newTestCCReceiver(nID, recv), gov, cache, dbInst, + &common.NullLogger{}) + recv.nodes[nID] = cc + recv.govs[nID] = gov + // The first register should not be blocked. + cc.registerDKG(round, reset, k) + // We should be blocked because DKGReady is not enough. + errs := make(chan error, 1) + go func() { + errs <- cc.runDKG(round, reset) + }() + // The second register shouldn't be blocked, too. + randHash := common.NewRandomHash() + gov.ResetDKG(randHash[:]) + cc.registerDKG(round, reset+1, k) + err = <-errs + s.Require().EqualError(ErrDKGAborted, err.Error()) + go func() { + errs <- cc.runDKG(round, reset+1) + }() + // The third register shouldn't be blocked, too + randHash = common.NewRandomHash() + gov.ProposeCRS(round+1, randHash[:]) + cc.registerDKG(round+1, reset, k) + err = <-errs + s.Require().EqualError(ErrDKGAborted, err.Error()) +} + func TestConfigurationChain(t *testing.T) { suite.Run(t, new(ConfigurationChainTestSuite)) } diff --git a/core/utils/round-event.go b/core/utils/round-event.go index 2689840..fe735e5 100644 --- a/core/utils/round-event.go +++ b/core/utils/round-event.go @@ -225,13 +225,13 @@ func (e *RoundEvent) ValidateNextRound(blockHeight uint64) { h(events) } }() - startRound := e.lastTriggeredRound + var ( + dkgFailed, triggered bool + param RoundEventParam + beginHeight = blockHeight + startRound = e.lastTriggeredRound + ) for { - var ( - dkgFailed, triggered bool - param RoundEventParam - beginHeight = blockHeight - ) for { param, dkgFailed, triggered = e.check(beginHeight, startRound, dkgFailed) -- cgit v1.2.3