aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go204
1 files changed, 109 insertions, 95 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
index 084f9d0c9..48b0f2a89 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
@@ -18,6 +18,7 @@
package core
import (
+ "context"
"fmt"
"sync"
"time"
@@ -57,7 +58,7 @@ func (e ErrMismatchDKG) Error() string {
e.expectRound, e.expectReset, e.actualRound, e.actualReset)
}
-type dkgStepFn func(round uint64, reset uint64) (chan<- struct{}, error)
+type dkgStepFn func(round uint64, reset uint64) error
type configurationChain struct {
ID types.NodeID
@@ -70,7 +71,6 @@ type configurationChain struct {
dkgSigner map[uint64]*dkgShareSecret
npks map[uint64]*typesDKG.NodePublicKeys
dkgResult sync.RWMutex
- abortDKGCh chan chan<- struct{}
tsig map[common.Hash]*tsigProtocol
tsigTouched map[common.Hash]struct{}
tsigReady *sync.Cond
@@ -80,8 +80,11 @@ type configurationChain struct {
mpkReady bool
pendingPrvShare map[types.NodeID]*typesDKG.PrivateShare
// TODO(jimmy-dexon): add timeout to pending psig.
- pendingPsig map[common.Hash][]*typesDKG.PartialSignature
- prevHash common.Hash
+ pendingPsig map[common.Hash][]*typesDKG.PartialSignature
+ prevHash common.Hash
+ dkgCtx context.Context
+ dkgCtxCancel context.CancelFunc
+ dkgRunning bool
}
func newConfigurationChain(
@@ -98,7 +101,6 @@ func newConfigurationChain(
logger: logger,
dkgSigner: make(map[uint64]*dkgShareSecret),
npks: make(map[uint64]*typesDKG.NodePublicKeys),
- abortDKGCh: make(chan chan<- struct{}, 1),
tsig: make(map[common.Hash]*tsigProtocol),
tsigTouched: make(map[common.Hash]struct{}),
tsigReady: sync.NewCond(&sync.Mutex{}),
@@ -110,18 +112,23 @@ func newConfigurationChain(
return configurationChain
}
-func (cc *configurationChain) abortDKG(round, reset uint64) {
+func (cc *configurationChain) abortDKG(
+ parentCtx context.Context,
+ round, reset uint64) bool {
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
if cc.dkg != nil {
- cc.abortDKGNoLock(round, reset)
+ return cc.abortDKGNoLock(parentCtx, round, reset)
}
+ return false
}
-func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool {
+func (cc *configurationChain) abortDKGNoLock(
+ ctx context.Context,
+ round, reset uint64) bool {
if cc.dkg.round > round ||
- (cc.dkg.round == round && cc.dkg.reset >= reset) {
- cc.logger.Error("newer DKG already is registered",
+ (cc.dkg.round == round && cc.dkg.reset > reset) {
+ cc.logger.Error("Newer DKG already is registered",
"round", round,
"reset", reset)
return false
@@ -132,30 +139,50 @@ func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool {
"previous-round", cc.dkg.round,
"previous-reset", cc.dkg.reset)
// Abort DKG routine in previous round.
- aborted := make(chan struct{}, 1)
cc.logger.Error("Aborting DKG in previous round",
"round", round,
"previous-round", cc.dkg.round)
- cc.dkgLock.Unlock()
// Notify current running DKG protocol to abort.
- cc.abortDKGCh <- aborted
+ if cc.dkgCtxCancel != nil {
+ cc.dkgCtxCancel()
+ }
+ cc.dkgLock.Unlock()
// Wait for current running DKG protocol aborting.
- <-aborted
+ for {
+ cc.dkgLock.Lock()
+ if cc.dkgRunning == false {
+ cc.dkg = nil
+ break
+ }
+ select {
+ case <-ctx.Done():
+ return false
+ case <-time.After(100 * time.Millisecond):
+ }
+ cc.dkgLock.Unlock()
+ }
cc.logger.Error("Previous DKG aborted",
"round", round,
"reset", reset)
- cc.dkgLock.Lock()
- return true
+ return cc.dkg == nil
}
-func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) {
+func (cc *configurationChain) registerDKG(
+ parentCtx context.Context,
+ round, reset uint64,
+ threshold int) {
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
if cc.dkg != nil {
// Make sure we only proceed when cc.dkg is nil.
- if !cc.abortDKGNoLock(round, reset) {
+ if !cc.abortDKGNoLock(parentCtx, round, reset) {
return
}
+ select {
+ case <-parentCtx.Done():
+ return
+ default:
+ }
if cc.dkg != nil {
// This panic would only raise when multiple attampts to register
// a DKG protocol at the same time.
@@ -176,6 +203,7 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) {
cc.pendingPrvShare = make(map[types.NodeID]*typesDKG.PrivateShare)
cc.mpkReady = false
cc.dkg, err = recoverDKGProtocol(cc.ID, cc.recv, round, reset, cc.db)
+ cc.dkgCtx, cc.dkgCtxCancel = context.WithCancel(parentCtx)
if err != nil {
panic(err)
}
@@ -207,45 +235,39 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) {
}()
}
-func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) (
- abortCh chan<- struct{}, err error) {
+func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) error {
if cc.dkg.round < round ||
(cc.dkg.round == round && cc.dkg.reset < reset) {
- err = ErrDKGNotRegistered
- return
+ return ErrDKGNotRegistered
}
if cc.dkg.round != round || cc.dkg.reset != reset {
cc.logger.Warn("DKG canceled", "round", round, "reset", reset)
- err = ErrSkipButNoError
- return
+ return ErrSkipButNoError
}
cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round)
if cc.gov.IsDKGFinal(round) {
cc.logger.Warn("DKG already final", "round", round)
- err = ErrSkipButNoError
- return
+ return ErrSkipButNoError
}
cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round)
- for abortCh == nil && !cc.gov.IsDKGMPKReady(round) {
+ var err error
+ for err == nil && !cc.gov.IsDKGMPKReady(round) {
cc.dkgLock.Unlock()
cc.logger.Debug("DKG MPKs are not ready yet. Try again later...",
"nodeID", cc.ID,
"round", round)
select {
- case abortCh = <-cc.abortDKGCh:
+ case <-cc.dkgCtx.Done():
err = ErrDKGAborted
case <-time.After(500 * time.Millisecond):
}
cc.dkgLock.Lock()
}
- if abortCh != nil {
- return
- }
- return
+ return err
}
func (cc *configurationChain) runDKGPhaseTwoAndThree(
- round uint64, reset uint64) (chan<- struct{}, error) {
+ round uint64, reset uint64) error {
// Check if this node successfully join the protocol.
cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round)
mpks := cc.gov.DKGMasterPublicKeys(round)
@@ -260,7 +282,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(
cc.logger.Warn("Failed to join DKG protocol",
"round", round,
"reset", reset)
- return nil, ErrSkipButNoError
+ return ErrSkipButNoError
}
// Phase 2(T = 0): Exchange DKG secret key share.
if err := cc.dkg.processMasterPublicKeys(mpks); err != nil {
@@ -273,8 +295,8 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(
// The time to process private share might be long, check aborting before
// get into that loop.
select {
- case abortCh := <-cc.abortDKGCh:
- return abortCh, ErrDKGAborted
+ case <-cc.dkgCtx.Done():
+ return ErrDKGAborted
default:
}
for _, prvShare := range cc.pendingPrvShare {
@@ -288,7 +310,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(
// Phase 3(T = 0~λ): Propose complaint.
// Propose complaint is done in `processMasterPublicKeys`.
- return nil, nil
+ return nil
}
func (cc *configurationChain) runDKGPhaseFour() {
@@ -322,27 +344,27 @@ func (cc *configurationChain) runDKGPhaseEight() {
cc.dkg.proposeFinalize()
}
-func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) (
- abortCh chan<- struct{}, err error) {
+func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error {
// Phase 9(T = 6λ): DKG is ready.
// Normally, IsDKGFinal would return true here. Use this for in case of
// unexpected network fluctuation and ensure the robustness of DKG protocol.
cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round)
- for abortCh == nil && !cc.gov.IsDKGFinal(round) {
+ var err error
+ for err == nil && !cc.gov.IsDKGFinal(round) {
cc.dkgLock.Unlock()
cc.logger.Debug("DKG is not ready yet. Try again later...",
"nodeID", cc.ID.String()[:6],
"round", round,
"reset", reset)
select {
- case abortCh = <-cc.abortDKGCh:
+ case <-cc.dkgCtx.Done():
err = ErrDKGAborted
case <-time.After(500 * time.Millisecond):
}
cc.dkgLock.Lock()
}
- if abortCh != nil {
- return
+ if err != nil {
+ return err
}
cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round)
cc.logger.Debug("Calling Governance.DKGComplaints", "round", round)
@@ -351,7 +373,7 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) (
cc.gov.DKGComplaints(round),
cc.dkg.threshold)
if err != nil {
- return
+ return err
}
qualifies := ""
for nID := range npks.QualifyNodeIDs {
@@ -367,28 +389,29 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) (
cc.logger.Warn("Self is not in Qualify Nodes",
"round", round,
"reset", reset)
- return
+ return nil
}
signer, err := cc.dkg.recoverShareSecret(npks.QualifyIDs)
if err != nil {
- return
+ return err
}
// Save private shares to DB.
if err = cc.db.PutDKGPrivateKey(round, *signer.privateKey); err != nil {
- return
+ return err
}
cc.dkgResult.Lock()
defer cc.dkgResult.Unlock()
cc.dkgSigner[round] = signer
cc.npks[round] = npks
- return
+ return nil
}
-func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) {
+func (cc *configurationChain) runTick(ticker Ticker) (aborted bool) {
cc.dkgLock.Unlock()
defer cc.dkgLock.Lock()
select {
- case abortCh = <-cc.abortDKGCh:
+ case <-cc.dkgCtx.Done():
+ aborted = true
case <-ticker.Tick():
}
return
@@ -396,69 +419,42 @@ func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) {
func (cc *configurationChain) initDKGPhasesFunc() {
cc.dkgRunPhases = []dkgStepFn{
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
return cc.runDKGPhaseOne(round, reset)
},
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
return cc.runDKGPhaseTwoAndThree(round, reset)
},
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
cc.runDKGPhaseFour()
- return nil, nil
+ return nil
},
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
cc.runDKGPhaseFiveAndSix(round, reset)
- return nil, nil
+ return nil
},
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
complaints := cc.gov.DKGComplaints(round)
cc.runDKGPhaseSeven(complaints)
- return nil, nil
+ return nil
},
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
cc.runDKGPhaseEight()
- return nil, nil
+ return nil
},
- func(round uint64, reset uint64) (chan<- struct{}, error) {
+ func(round uint64, reset uint64) error {
return cc.runDKGPhaseNine(round, reset)
},
}
}
func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) {
- // Check if corresponding DKG signer is ready.
- if _, _, err = cc.getDKGInfo(round); err == nil {
- return ErrSkipButNoError
- }
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
- var (
- ticker Ticker
- abortCh chan<- struct{}
- )
- defer func() {
- if ticker != nil {
- ticker.Stop()
- }
- // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done.
- cc.dkg = nil
- if abortCh == nil {
- select {
- case abortCh = <-cc.abortDKGCh:
- // The previous DKG finishes its job, don't overwrite its error
- // with "aborted" here.
- default:
- }
- }
- if abortCh != nil {
- abortCh <- struct{}{}
- }
- }()
- tickStartAt := 1
-
if cc.dkg == nil {
return ErrDKGNotRegistered
}
+ // Make sure the existed dkgProtocol is expected one.
if cc.dkg.round != round || cc.dkg.reset != reset {
return ErrMismatchDKG{
expectRound: round,
@@ -467,19 +463,37 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) {
actualReset: cc.dkg.reset,
}
}
+ if cc.dkgRunning {
+ panic(fmt.Errorf("duplicated call to runDKG: %d %d", round, reset))
+ }
+ cc.dkgRunning = true
+ var ticker Ticker
+ defer func() {
+ if ticker != nil {
+ ticker.Stop()
+ }
+ // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done.
+ if cc.dkg != nil {
+ cc.dkg = nil
+ }
+ cc.dkgRunning = false
+ }()
+ // Check if corresponding DKG signer is ready.
+ if _, _, err = cc.getDKGInfo(round); err == nil {
+ return ErrSkipButNoError
+ }
+ tickStartAt := 1
for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ {
if i >= tickStartAt && ticker == nil {
ticker = newTicker(cc.gov, round, TickerDKG)
}
- if ticker != nil {
- if abortCh = cc.runTick(ticker); abortCh != nil {
- return
- }
+ if ticker != nil && cc.runTick(ticker) {
+ return
}
- switch abortCh, err = cc.dkgRunPhases[i](round, reset); err {
+ switch err = cc.dkgRunPhases[i](round, reset); err {
case ErrSkipButNoError, nil:
cc.dkg.step = i + 1
err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo())
@@ -633,7 +647,7 @@ func (cc *configurationChain) runTSig(
go func() {
for _, psig := range pendingPsig {
if err := cc.processPartialSignature(psig); err != nil {
- cc.logger.Error("failed to process partial signature",
+ cc.logger.Error("Failed to process partial signature",
"nodeID", cc.ID,
"error", err)
}