aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-03-19 13:53:26 +0800
committerGitHub <noreply@github.com>2019-03-19 13:53:26 +0800
commit3785190d9154acb0492dd22b6d62e953a0bcf453 (patch)
tree48db405c51ff5a2ef7022f70f55a1d577bdecff3
parentc8cc33091b0949ba2d413f4aa12ce54212b86547 (diff)
downloaddexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.tar
dexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.tar.gz
dexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.tar.bz2
dexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.tar.lz
dexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.tar.xz
dexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.tar.zst
dexon-consensus-3785190d9154acb0492dd22b6d62e953a0bcf453.zip
core: abort hang dkg (#500)
-rw-r--r--core/configuration-chain.go233
-rw-r--r--core/configuration-chain_test.go44
-rw-r--r--core/utils/round-event.go12
-rw-r--r--integration_test/consensus_test.go2
4 files changed, 232 insertions, 59 deletions
diff --git a/core/configuration-chain.go b/core/configuration-chain.go
index 31827f2..084f9d0 100644
--- a/core/configuration-chain.go
+++ b/core/configuration-chain.go
@@ -38,20 +38,39 @@ var (
"tsig is already running")
ErrDKGNotReady = fmt.Errorf(
"DKG is not ready")
- ErrSkipButNoError = fmt.Errorf("skip but no error")
+ ErrSkipButNoError = fmt.Errorf(
+ "skip but no error")
+ ErrDKGAborted = fmt.Errorf(
+ "DKG is aborted")
)
+// ErrMismatchDKG represent an attempt to run DKG protocol is failed because
+// the register DKG protocol is mismatched, interms of round and resetCount.
+type ErrMismatchDKG struct {
+ expectRound, expectReset uint64
+ actualRound, actualReset uint64
+}
+
+func (e ErrMismatchDKG) Error() string {
+ return fmt.Sprintf(
+ "mismatch DKG, abort running: expect(%d %d) actual(%d %d)",
+ e.expectRound, e.expectReset, e.actualRound, e.actualReset)
+}
+
+type dkgStepFn func(round uint64, reset uint64) (chan<- struct{}, error)
+
type configurationChain struct {
ID types.NodeID
recv dkgReceiver
gov Governance
dkg *dkgProtocol
- dkgRunPhases []func(round uint64, reset uint64) error
+ dkgRunPhases []dkgStepFn
logger common.Logger
dkgLock sync.RWMutex
dkgSigner map[uint64]*dkgShareSecret
npks map[uint64]*typesDKG.NodePublicKeys
dkgResult sync.RWMutex
+ abortDKGCh chan chan<- struct{}
tsig map[common.Hash]*tsigProtocol
tsigTouched map[common.Hash]struct{}
tsigReady *sync.Cond
@@ -79,6 +98,7 @@ func newConfigurationChain(
logger: logger,
dkgSigner: make(map[uint64]*dkgShareSecret),
npks: make(map[uint64]*typesDKG.NodePublicKeys),
+ abortDKGCh: make(chan chan<- struct{}, 1),
tsig: make(map[common.Hash]*tsigProtocol),
tsigTouched: make(map[common.Hash]struct{}),
tsigReady: sync.NewCond(&sync.Mutex{}),
@@ -90,12 +110,62 @@ func newConfigurationChain(
return configurationChain
}
+func (cc *configurationChain) abortDKG(round, reset uint64) {
+ cc.dkgLock.Lock()
+ defer cc.dkgLock.Unlock()
+ if cc.dkg != nil {
+ cc.abortDKGNoLock(round, reset)
+ }
+}
+
+func (cc *configurationChain) abortDKGNoLock(round, reset uint64) bool {
+ if cc.dkg.round > round ||
+ (cc.dkg.round == round && cc.dkg.reset >= reset) {
+ cc.logger.Error("newer DKG already is registered",
+ "round", round,
+ "reset", reset)
+ return false
+ }
+ cc.logger.Error("Previous DKG is not finished",
+ "round", round,
+ "reset", reset,
+ "previous-round", cc.dkg.round,
+ "previous-reset", cc.dkg.reset)
+ // Abort DKG routine in previous round.
+ aborted := make(chan struct{}, 1)
+ cc.logger.Error("Aborting DKG in previous round",
+ "round", round,
+ "previous-round", cc.dkg.round)
+ cc.dkgLock.Unlock()
+ // Notify current running DKG protocol to abort.
+ cc.abortDKGCh <- aborted
+ // Wait for current running DKG protocol aborting.
+ <-aborted
+ cc.logger.Error("Previous DKG aborted",
+ "round", round,
+ "reset", reset)
+ cc.dkgLock.Lock()
+ return true
+}
+
func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) {
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
if cc.dkg != nil {
- cc.logger.Error("Previous DKG is not finished")
- // TODO(mission): return here and fix CI failure.
+ // Make sure we only proceed when cc.dkg is nil.
+ if !cc.abortDKGNoLock(round, reset) {
+ return
+ }
+ if cc.dkg != nil {
+ // This panic would only raise when multiple attampts to register
+ // a DKG protocol at the same time.
+ panic(ErrMismatchDKG{
+ expectRound: round,
+ expectReset: reset,
+ actualRound: cc.dkg.round,
+ actualReset: cc.dkg.reset,
+ })
+ }
}
dkgSet, err := cc.cache.GetDKGSet(round)
if err != nil {
@@ -109,7 +179,6 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) {
if err != nil {
panic(err)
}
-
if cc.dkg == nil {
cc.dkg = newDKGProtocol(
cc.ID,
@@ -132,37 +201,51 @@ func (cc *configurationChain) registerDKG(round, reset uint64, threshold int) {
<-ticker.Tick()
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
- cc.dkg.proposeMPKReady()
+ if cc.dkg != nil && cc.dkg.round == round && cc.dkg.reset == reset {
+ cc.dkg.proposeMPKReady()
+ }
}()
}
-func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) error {
+func (cc *configurationChain) runDKGPhaseOne(round uint64, reset uint64) (
+ abortCh chan<- struct{}, err error) {
if cc.dkg.round < round ||
(cc.dkg.round == round && cc.dkg.reset < reset) {
- return ErrDKGNotRegistered
+ err = ErrDKGNotRegistered
+ return
}
if cc.dkg.round != round || cc.dkg.reset != reset {
cc.logger.Warn("DKG canceled", "round", round, "reset", reset)
- return ErrSkipButNoError
+ err = ErrSkipButNoError
+ return
}
cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round)
if cc.gov.IsDKGFinal(round) {
cc.logger.Warn("DKG already final", "round", round)
- return ErrSkipButNoError
+ err = ErrSkipButNoError
+ return
}
cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round)
- for !cc.gov.IsDKGMPKReady(round) {
- cc.logger.Debug("DKG MPKs are not ready yet. Try again later...",
- "nodeID", cc.ID)
+ for abortCh == nil && !cc.gov.IsDKGMPKReady(round) {
cc.dkgLock.Unlock()
- time.Sleep(500 * time.Millisecond)
+ cc.logger.Debug("DKG MPKs are not ready yet. Try again later...",
+ "nodeID", cc.ID,
+ "round", round)
+ select {
+ case abortCh = <-cc.abortDKGCh:
+ err = ErrDKGAborted
+ case <-time.After(500 * time.Millisecond):
+ }
cc.dkgLock.Lock()
}
-
- return nil
+ if abortCh != nil {
+ return
+ }
+ return
}
-func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64) error {
+func (cc *configurationChain) runDKGPhaseTwoAndThree(
+ round uint64, reset uint64) (chan<- struct{}, error) {
// Check if this node successfully join the protocol.
cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round)
mpks := cc.gov.DKGMasterPublicKeys(round)
@@ -177,7 +260,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64)
cc.logger.Warn("Failed to join DKG protocol",
"round", round,
"reset", reset)
- return ErrSkipButNoError
+ return nil, ErrSkipButNoError
}
// Phase 2(T = 0): Exchange DKG secret key share.
if err := cc.dkg.processMasterPublicKeys(mpks); err != nil {
@@ -187,6 +270,13 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64)
"error", err)
}
cc.mpkReady = true
+ // The time to process private share might be long, check aborting before
+ // get into that loop.
+ select {
+ case abortCh := <-cc.abortDKGCh:
+ return abortCh, ErrDKGAborted
+ default:
+ }
for _, prvShare := range cc.pendingPrvShare {
if err := cc.dkg.processPrivateShare(prvShare); err != nil {
cc.logger.Error("Failed to process private share",
@@ -198,7 +288,7 @@ func (cc *configurationChain) runDKGPhaseTwoAndThree(round uint64, reset uint64)
// Phase 3(T = 0~λ): Propose complaint.
// Propose complaint is done in `processMasterPublicKeys`.
- return nil
+ return nil, nil
}
func (cc *configurationChain) runDKGPhaseFour() {
@@ -232,17 +322,27 @@ func (cc *configurationChain) runDKGPhaseEight() {
cc.dkg.proposeFinalize()
}
-func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error {
+func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) (
+ abortCh chan<- struct{}, err error) {
// Phase 9(T = 6λ): DKG is ready.
// Normally, IsDKGFinal would return true here. Use this for in case of
// unexpected network fluctuation and ensure the robustness of DKG protocol.
cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round)
- for !cc.gov.IsDKGFinal(round) {
+ for abortCh == nil && !cc.gov.IsDKGFinal(round) {
+ cc.dkgLock.Unlock()
cc.logger.Debug("DKG is not ready yet. Try again later...",
"nodeID", cc.ID.String()[:6],
"round", round,
"reset", reset)
- time.Sleep(500 * time.Millisecond)
+ select {
+ case abortCh = <-cc.abortDKGCh:
+ err = ErrDKGAborted
+ case <-time.After(500 * time.Millisecond):
+ }
+ cc.dkgLock.Lock()
+ }
+ if abortCh != nil {
+ return
}
cc.logger.Debug("Calling Governance.DKGMasterPublicKeys", "round", round)
cc.logger.Debug("Calling Governance.DKGComplaints", "round", round)
@@ -251,7 +351,7 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error
cc.gov.DKGComplaints(round),
cc.dkg.threshold)
if err != nil {
- return err
+ return
}
qualifies := ""
for nID := range npks.QualifyNodeIDs {
@@ -267,79 +367,106 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error
cc.logger.Warn("Self is not in Qualify Nodes",
"round", round,
"reset", reset)
- return nil
+ return
}
signer, err := cc.dkg.recoverShareSecret(npks.QualifyIDs)
if err != nil {
- return err
+ return
}
// Save private shares to DB.
if err = cc.db.PutDKGPrivateKey(round, *signer.privateKey); err != nil {
- return err
+ return
}
cc.dkgResult.Lock()
defer cc.dkgResult.Unlock()
cc.dkgSigner[round] = signer
cc.npks[round] = npks
- return nil
+ return
}
-func (cc *configurationChain) runTick(ticker Ticker) {
+func (cc *configurationChain) runTick(ticker Ticker) (abortCh chan<- struct{}) {
cc.dkgLock.Unlock()
- <-ticker.Tick()
- cc.dkgLock.Lock()
+ defer cc.dkgLock.Lock()
+ select {
+ case abortCh = <-cc.abortDKGCh:
+ case <-ticker.Tick():
+ }
+ return
}
func (cc *configurationChain) initDKGPhasesFunc() {
- cc.dkgRunPhases = []func(round uint64, reset uint64) error{
- func(round uint64, reset uint64) error {
+ cc.dkgRunPhases = []dkgStepFn{
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
return cc.runDKGPhaseOne(round, reset)
},
- func(round uint64, reset uint64) error {
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
return cc.runDKGPhaseTwoAndThree(round, reset)
},
- func(round uint64, reset uint64) error {
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
cc.runDKGPhaseFour()
- return nil
+ return nil, nil
},
- func(round uint64, reset uint64) error {
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
cc.runDKGPhaseFiveAndSix(round, reset)
- return nil
+ return nil, nil
},
- func(round uint64, reset uint64) error {
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
complaints := cc.gov.DKGComplaints(round)
cc.runDKGPhaseSeven(complaints)
- return nil
+ return nil, nil
},
- func(round uint64, reset uint64) error {
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
cc.runDKGPhaseEight()
- return nil
+ return nil, nil
},
- func(round uint64, reset uint64) error {
+ func(round uint64, reset uint64) (chan<- struct{}, error) {
return cc.runDKGPhaseNine(round, reset)
},
}
}
-func (cc *configurationChain) runDKG(round uint64, reset uint64) error {
+func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) {
// Check if corresponding DKG signer is ready.
- if _, _, err := cc.getDKGInfo(round); err == nil {
+ if _, _, err = cc.getDKGInfo(round); err == nil {
return ErrSkipButNoError
}
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
-
- tickStartAt := 1
- var ticker Ticker
+ var (
+ ticker Ticker
+ abortCh chan<- struct{}
+ )
defer func() {
if ticker != nil {
ticker.Stop()
}
+ // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done.
+ cc.dkg = nil
+ if abortCh == nil {
+ select {
+ case abortCh = <-cc.abortDKGCh:
+ // The previous DKG finishes its job, don't overwrite its error
+ // with "aborted" here.
+ default:
+ }
+ }
+ if abortCh != nil {
+ abortCh <- struct{}{}
+ }
}()
+ tickStartAt := 1
if cc.dkg == nil {
return ErrDKGNotRegistered
}
+ if cc.dkg.round != round || cc.dkg.reset != reset {
+ return ErrMismatchDKG{
+ expectRound: round,
+ expectReset: reset,
+ actualRound: cc.dkg.round,
+ actualReset: cc.dkg.reset,
+ }
+ }
for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ {
if i >= tickStartAt && ticker == nil {
@@ -347,13 +474,15 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) error {
}
if ticker != nil {
- cc.runTick(ticker)
+ if abortCh = cc.runTick(ticker); abortCh != nil {
+ return
+ }
}
- switch err := cc.dkgRunPhases[i](round, reset); err {
+ switch abortCh, err = cc.dkgRunPhases[i](round, reset); err {
case ErrSkipButNoError, nil:
cc.dkg.step = i + 1
- err := cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo())
+ err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo())
if err != nil {
return fmt.Errorf("put or update DKG protocol error: %v", err)
}
@@ -361,10 +490,10 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) error {
if err == nil {
continue
} else {
- return nil
+ return
}
default:
- return err
+ return
}
}
diff --git a/core/configuration-chain_test.go b/core/configuration-chain_test.go
index 3d8d1aa..9de06df 100644
--- a/core/configuration-chain_test.go
+++ b/core/configuration-chain_test.go
@@ -596,6 +596,50 @@ func (s *ConfigurationChainTestSuite) TestDKGPhasesSnapShot() {
}
}
+func (s *ConfigurationChainTestSuite) TestDKGAbort() {
+ n := 4
+ k := 1
+ round := DKGDelayRound
+ reset := uint64(0)
+ s.setupNodes(n)
+ gov, err := test.NewGovernance(test.NewState(DKGDelayRound,
+ s.pubKeys, 100*time.Millisecond, &common.NullLogger{}, true,
+ ), ConfigRoundShift)
+ s.Require().NoError(err)
+ cache := utils.NewNodeSetCache(gov)
+ dbInst, err := db.NewMemBackedDB()
+ s.Require().NoError(err)
+ recv := newTestCCGlobalReceiver(s)
+ nID := s.nIDs[0]
+ cc := newConfigurationChain(nID,
+ newTestCCReceiver(nID, recv), gov, cache, dbInst,
+ &common.NullLogger{})
+ recv.nodes[nID] = cc
+ recv.govs[nID] = gov
+ // The first register should not be blocked.
+ cc.registerDKG(round, reset, k)
+ // We should be blocked because DKGReady is not enough.
+ errs := make(chan error, 1)
+ go func() {
+ errs <- cc.runDKG(round, reset)
+ }()
+ // The second register shouldn't be blocked, too.
+ randHash := common.NewRandomHash()
+ gov.ResetDKG(randHash[:])
+ cc.registerDKG(round, reset+1, k)
+ err = <-errs
+ s.Require().EqualError(ErrDKGAborted, err.Error())
+ go func() {
+ errs <- cc.runDKG(round, reset+1)
+ }()
+ // The third register shouldn't be blocked, too
+ randHash = common.NewRandomHash()
+ gov.ProposeCRS(round+1, randHash[:])
+ cc.registerDKG(round+1, reset, k)
+ err = <-errs
+ s.Require().EqualError(ErrDKGAborted, err.Error())
+}
+
func TestConfigurationChain(t *testing.T) {
suite.Run(t, new(ConfigurationChainTestSuite))
}
diff --git a/core/utils/round-event.go b/core/utils/round-event.go
index 2689840..fe735e5 100644
--- a/core/utils/round-event.go
+++ b/core/utils/round-event.go
@@ -225,13 +225,13 @@ func (e *RoundEvent) ValidateNextRound(blockHeight uint64) {
h(events)
}
}()
- startRound := e.lastTriggeredRound
+ var (
+ dkgFailed, triggered bool
+ param RoundEventParam
+ beginHeight = blockHeight
+ startRound = e.lastTriggeredRound
+ )
for {
- var (
- dkgFailed, triggered bool
- param RoundEventParam
- beginHeight = blockHeight
- )
for {
param, dkgFailed, triggered = e.check(beginHeight, startRound,
dkgFailed)
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 4784df1..dd64113 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -509,7 +509,7 @@ func (s *ConsensusTestSuite) TestForceSync() {
core.ConfigRoundShift)
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundLength, uint64(60)))
+ test.StateChangeRoundLength, uint64(100)))
seedGov.CatchUpWithRound(0)
seedGov.CatchUpWithRound(1)
// A short round interval.