diff options
Diffstat (limited to 'core/configuration-chain.go')
-rw-r--r-- | core/configuration-chain.go | 195 |
1 files changed, 104 insertions, 91 deletions
diff --git a/core/configuration-chain.go b/core/configuration-chain.go index 084f9d0..2c16ac3 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -18,6 +18,7 @@ package core import ( + "context" "fmt" "sync" "time" @@ -57,7 +58,7 @@ func (e ErrMismatchDKG) Error() string { e.expectRound, e.expectReset, e.actualRound, e.actualReset) } -type dkgStepFn func(round uint64, reset uint64) (chan<- struct{}, error) +type dkgStepFn func(round uint64, reset uint64) error type configurationChain struct { ID types.NodeID @@ -70,7 +71,6 @@ type configurationChain struct { dkgSigner map[uint64]*dkgShareSecret npks map[uint64]*typesDKG.NodePublicKeys dkgResult sync.RWMutex - abortDKGCh chan chan<- struct{} tsig map[common.Hash]*tsigProtocol tsigTouched map[common.Hash]struct{} tsigReady *sync.Cond @@ -80,8 +80,11 @@ type configurationChain struct { mpkReady bool pendingPrvShare map[types.NodeID]*typesDKG.PrivateShare // TODO(jimmy-dexon): add timeout to pending psig. - pendingPsig map[common.Hash][]*typesDKG.PartialSignature - prevHash common.Hash + pendingPsig map[common.Hash][]*typesDKG.PartialSignature + prevHash common.Hash + dkgCtx context.Context + dkgCtxCancel context.CancelFunc + dkgRunning bool } func newConfigurationChain( @@ -98,7 +101,6 @@ func newConfigurationChain( logger: logger, dkgSigner: make(map[uint64]*dkgShareSecret), npks: make(map[uint64]*typesDKG.NodePublicKeys), - abortDKGCh: make(chan chan<- struct{}, 1), tsig: make(map[common.Hash]*tsigProtocol), tsigTouched: make(map[common.Hash]struct{}), tsigReady: sync.NewCond(&sync.Mutex{}), @@ -110,15 +112,19 @@ func newConfigurationChain( return configurationChain } -func (cc *configurationChain) abortDKG(round, reset uint64) { +func (cc *configurationChain) abortDKG( + parentCtx context.Context, + round, reset uint64) { cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg != nil { - cc.abortDKGNoLock(round, reset) + cc.abortDKGNoLock(parentCtx, round, reset) } } -func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool { +func (cc *configurationChain) abortDKGNoLock( + ctx context.Context, + round, reset uint64) bool { if cc.dkg.round > round || (cc.dkg.round == round && cc.dkg.reset >= reset) { cc.logger.Error("newer DKG already is registered", @@ -132,30 +138,50 @@ func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool { "previous-round", cc.dkg.round, "previous-reset", cc.dkg.reset) // Abort DKG routine in previous round. - aborted := make(chan struct{}, 1) cc.logger.Error("Aborting DKG in previous round", "round", round, "previous-round", cc.dkg.round) - cc.dkgLock.Unlock() // Notify current running DKG protocol to abort. - cc.abortDKGCh <- aborted + if cc.dkgCtxCancel != nil { + cc.dkgCtxCancel() + } + cc.dkgLock.Unlock() // Wait for current running DKG protocol aborting. - <-aborted + for { + cc.dkgLock.Lock() + if cc.dkgRunning == false { + cc.dkg = nil + break + } + select { + case <-ctx.Done(): + return false + case <-time.After(100 * time.Millisecond): + } + cc.dkgLock.Unlock() + } cc.logger.Error("Previous DKG aborted", "round", round, "reset", reset) - cc.dkgLock.Lock() return true } -func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { +func (cc *configurationChain) registerDKG( + parentCtx context.Context, + round, reset uint64, + threshold int) { cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg != nil { // Make sure we only proceed when cc.dkg is nil. - if !cc.abortDKGNoLock(round, reset) { + if !cc.abortDKGNoLock(parentCtx, round, reset) { return } + select { + case <-parentCtx.Done(): + return + default: + } if cc.dkg != nil { // This panic would only raise when multiple attampts to register // a DKG protocol at the same time. @@ -176,6 +202,7 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { cc.pendingPrvShare = make(map[types.NodeID]*typesDKG.PrivateShare) cc.mpkReady = false cc.dkg, err = recoverDKGProtocol(cc.ID, cc.recv, round, reset, cc.db) + cc.dkgCtx, cc.dkgCtxCancel = context.WithCancel(parentCtx) if err != nil { panic(err) } @@ -207,45 +234,39 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) { }() } -func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) ( - abortCh chan<- struct{}, err error) { +func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) error { if cc.dkg.round < round || (cc.dkg.round == round && cc.dkg.reset < reset) { - err = ErrDKGNotRegistered - return + return ErrDKGNotRegistered } if cc.dkg.round != round || cc.dkg.reset != reset { cc.logger.Warn("DKG canceled", "round", round, "reset", reset) - err = ErrSkipButNoError - return + return ErrSkipButNoError } cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) if cc.gov.IsDKGFinal(round) { cc.logger.Warn("DKG already final", "round", round) - err = ErrSkipButNoError - return + return ErrSkipButNoError } cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round) - for abortCh == nil && !cc.gov.IsDKGMPKReady(round) { + var err error + for err == nil && !cc.gov.IsDKGMPKReady(round) { cc.dkgLock.Unlock() cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", "nodeID", cc.ID, "round", round) select { - case abortCh = <-cc.abortDKGCh: + case <-cc.dkgCtx.Done(): err = ErrDKGAborted case <-time.After(500 * time.Millisecond): } cc.dkgLock.Lock() } - if abortCh != nil { - return - } - return + return err } func (cc *configurationChain) runDKGPhaseTwoAndThree( - round uint64, reset uint64) (chan<- struct{}, error) { + round uint64, reset uint64) error { // Check if this node successfully join the protocol. cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round) mpks := cc.gov.DKGMasterPublicKeys(round) @@ -260,7 +281,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree( cc.logger.Warn("Failed to join DKG protocol", "round", round, "reset", reset) - return nil, ErrSkipButNoError + return ErrSkipButNoError } // Phase 2(T = 0): Exchange DKG secret key share. if err := cc.dkg.processMasterPublicKeys(mpks); err != nil { @@ -273,8 +294,8 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree( // The time to process private share might be long, check aborting before // get into that loop. select { - case abortCh := <-cc.abortDKGCh: - return abortCh, ErrDKGAborted + case <-cc.dkgCtx.Done(): + return ErrDKGAborted default: } for _, prvShare := range cc.pendingPrvShare { @@ -288,7 +309,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree( // Phase 3(T = 0~λ): Propose complaint. // Propose complaint is done in `processMasterPublicKeys`. - return nil, nil + return nil } func (cc *configurationChain) runDKGPhaseFour() { @@ -322,27 +343,27 @@ func (cc *configurationChain) runDKGPhaseEight() { cc.dkg.proposeFinalize() } -func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( - abortCh chan<- struct{}, err error) { +func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error { // Phase 9(T = 6λ): DKG is ready. // Normally, IsDKGFinal would return true here. Use this for in case of // unexpected network fluctuation and ensure the robustness of DKG protocol. cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) - for abortCh == nil && !cc.gov.IsDKGFinal(round) { + var err error + for err == nil && !cc.gov.IsDKGFinal(round) { cc.dkgLock.Unlock() cc.logger.Debug("DKG is not ready yet. Try again later...", "nodeID", cc.ID.String()[:6], "round", round, "reset", reset) select { - case abortCh = <-cc.abortDKGCh: + case <-cc.dkgCtx.Done(): err = ErrDKGAborted case <-time.After(500 * time.Millisecond): } cc.dkgLock.Lock() } - if abortCh != nil { - return + if err != nil { + return err } cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round) cc.logger.Debug("Calling Governance.DKGComplaints", "round", round) @@ -351,7 +372,7 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( cc.gov.DKGComplaints(round), cc.dkg.threshold) if err != nil { - return + return err } qualifies := "" for nID := range npks.QualifyNodeIDs { @@ -367,28 +388,29 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) ( cc.logger.Warn("Self is not in Qualify Nodes", "round", round, "reset", reset) - return + return nil } signer, err := cc.dkg.recoverShareSecret(npks.QualifyIDs) if err != nil { - return + return err } // Save private shares to DB. if err = cc.db.PutDKGPrivateKey(round, *signer.privateKey); err != nil { - return + return err } cc.dkgResult.Lock() defer cc.dkgResult.Unlock() cc.dkgSigner[round] = signer cc.npks[round] = npks - return + return nil } -func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) { +func (cc *configurationChain) runTick(ticker Ticker) (aborted bool) { cc.dkgLock.Unlock() defer cc.dkgLock.Lock() select { - case abortCh = <-cc.abortDKGCh: + case <-cc.dkgCtx.Done(): + aborted = true case <-ticker.Tick(): } return @@ -396,69 +418,42 @@ func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) { func (cc *configurationChain) initDKGPhasesFunc() { cc.dkgRunPhases = []dkgStepFn{ - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { return cc.runDKGPhaseOne(round, reset) }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { return cc.runDKGPhaseTwoAndThree(round, reset) }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { cc.runDKGPhaseFour() - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { cc.runDKGPhaseFiveAndSix(round, reset) - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { complaints := cc.gov.DKGComplaints(round) cc.runDKGPhaseSeven(complaints) - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { cc.runDKGPhaseEight() - return nil, nil + return nil }, - func(round uint64, reset uint64) (chan<- struct{}, error) { + func(round uint64, reset uint64) error { return cc.runDKGPhaseNine(round, reset) }, } } func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { - // Check if corresponding DKG signer is ready. - if _, _, err = cc.getDKGInfo(round); err == nil { - return ErrSkipButNoError - } cc.dkgLock.Lock() defer cc.dkgLock.Unlock() - var ( - ticker Ticker - abortCh chan<- struct{} - ) - defer func() { - if ticker != nil { - ticker.Stop() - } - // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done. - cc.dkg = nil - if abortCh == nil { - select { - case abortCh = <-cc.abortDKGCh: - // The previous DKG finishes its job, don't overwrite its error - // with "aborted" here. - default: - } - } - if abortCh != nil { - abortCh <- struct{}{} - } - }() - tickStartAt := 1 - if cc.dkg == nil { return ErrDKGNotRegistered } + // Make sure the existed dkgProtocol is expected one. if cc.dkg.round != round || cc.dkg.reset != reset { return ErrMismatchDKG{ expectRound: round, @@ -467,19 +462,37 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { actualReset: cc.dkg.reset, } } + if cc.dkgRunning { + panic(fmt.Errorf("duplicated call to runDKG: %d %d", round, reset)) + } + cc.dkgRunning = true + var ticker Ticker + defer func() { + if ticker != nil { + ticker.Stop() + } + // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done. + if cc.dkg != nil { + cc.dkg = nil + } + cc.dkgRunning = false + }() + // Check if corresponding DKG signer is ready. + if _, _, err = cc.getDKGInfo(round); err == nil { + return ErrSkipButNoError + } + tickStartAt := 1 for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ { if i >= tickStartAt && ticker == nil { ticker = newTicker(cc.gov, round, TickerDKG) } - if ticker != nil { - if abortCh = cc.runTick(ticker); abortCh != nil { - return - } + if ticker != nil && cc.runTick(ticker) { + return } - switch abortCh, err = cc.dkgRunPhases[i](round, reset); err { + switch err = cc.dkgRunPhases[i](round, reset); err { case ErrSkipButNoError, nil: cc.dkg.step = i + 1 err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo()) |