aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-29 15:17:13 +0800
committerGitHub <noreply@github.com>2019-03-29 15:17:13 +0800
commit92d1b675e7acff789a819426521efc99bdbd9aff (patch)
treee3c886a6678e7338d114ca9baec0ca2632b748ac
parent04a4df9479e31f1418760a389060706a72259381 (diff)
downloaddexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar
dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.gz
dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.bz2
dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.lz
dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.xz
dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.zst
dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.zip
core: run dkg by height (#527)
* core: run dkg by height * core: check cc.dkgCtx before running each dkg phases * fix
-rw-r--r--core/configuration-chain.go115
-rw-r--r--core/configuration-chain_test.go100
-rw-r--r--core/consensus.go16
-rw-r--r--core/consensus_test.go22
4 files changed, 180 insertions, 73 deletions
diff --git a/core/configuration-chain.go b/core/configuration-chain.go
index 92b2830..c8aac38 100644
--- a/core/configuration-chain.go
+++ b/core/configuration-chain.go
@@ -406,17 +406,6 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error
return nil
}
-func (cc *configurationChain) runTick(ticker Ticker) (aborted bool) {
- cc.dkgLock.Unlock()
- defer cc.dkgLock.Lock()
- select {
- case <-cc.dkgCtx.Done():
- aborted = true
- case <-ticker.Tick():
- }
- return
-}
-
func (cc *configurationChain) initDKGPhasesFunc() {
cc.dkgRunPhases = []dkgStepFn{
func(round uint64, reset uint64) error {
@@ -448,7 +437,18 @@ func (cc *configurationChain) initDKGPhasesFunc() {
}
}
-func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) {
+func (cc *configurationChain) runDKG(
+ round uint64, reset uint64, event *common.Event,
+ dkgBeginHeight, dkgHeight uint64) (err error) {
+ // Check if corresponding DKG signer is ready.
+ if _, _, err = cc.getDKGInfo(round, false); err == nil {
+ return ErrSkipButNoError
+ }
+ cfg := utils.GetConfigWithPanic(cc.gov, round, cc.logger)
+ phaseHeight := uint64(
+ cfg.LambdaDKG.Nanoseconds() / cfg.MinBlockInterval.Nanoseconds())
+ skipPhase := int(dkgHeight / phaseHeight)
+ cc.logger.Info("Skipping DKG phase", "phase", skipPhase)
cc.dkgLock.Lock()
defer cc.dkgLock.Unlock()
if cc.dkg == nil {
@@ -467,51 +467,70 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) {
panic(fmt.Errorf("duplicated call to runDKG: %d %d", round, reset))
}
cc.dkgRunning = true
- var ticker Ticker
defer func() {
- if ticker != nil {
- ticker.Stop()
- }
// Here we should hold the cc.dkgLock, reset cc.dkg to nil when done.
if cc.dkg != nil {
cc.dkg = nil
}
cc.dkgRunning = false
}()
- // Check if corresponding DKG signer is ready.
- if _, _, err = cc.getDKGInfo(round, true); err == nil {
- return ErrSkipButNoError
+ wg := sync.WaitGroup{}
+ var dkgError error
+ // Make a copy of cc.dkgCtx so each phase function can refer to the correct
+ // context.
+ ctx := cc.dkgCtx
+ cc.dkg.step = skipPhase
+ for i := skipPhase; i < len(cc.dkgRunPhases); i++ {
+ wg.Add(1)
+ event.RegisterHeight(dkgBeginHeight+phaseHeight*uint64(i), func(uint64) {
+ go func() {
+ defer wg.Done()
+ cc.dkgLock.Lock()
+ defer cc.dkgLock.Unlock()
+ if dkgError != nil {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ dkgError = ErrDKGAborted
+ return
+ default:
+ }
+
+ err := cc.dkgRunPhases[cc.dkg.step](round, reset)
+ if err == nil || err == ErrSkipButNoError {
+ err = nil
+ cc.dkg.step++
+ err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo())
+ if err != nil {
+ cc.logger.Error("Failed to save DKG Protocol",
+ "step", cc.dkg.step,
+ "error", err)
+ }
+ }
+ if err != nil && dkgError == nil {
+ dkgError = err
+ }
+ }()
+ })
}
- tickStartAt := 1
-
- for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ {
- if i >= tickStartAt && ticker == nil {
- ticker = newTicker(cc.gov, round, TickerDKG)
- }
-
- if ticker != nil && cc.runTick(ticker) {
- return
- }
-
- switch err = cc.dkgRunPhases[i](round, reset); err {
- case ErrSkipButNoError, nil:
- cc.dkg.step = i + 1
- err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo())
- if err != nil {
- return fmt.Errorf("put or update DKG protocol error: %v", err)
- }
-
- if err == nil {
- continue
- } else {
- return
- }
- default:
- return
- }
+ cc.dkgLock.Unlock()
+ wgChan := make(chan struct{}, 1)
+ go func() {
+ wg.Wait()
+ wgChan <- struct{}{}
+ }()
+ select {
+ case <-cc.dkgCtx.Done():
+ case <-wgChan:
}
-
- return nil
+ cc.dkgLock.Lock()
+ select {
+ case <-cc.dkgCtx.Done():
+ return ErrDKGAborted
+ default:
+ }
+ return dkgError
}
func (cc *configurationChain) isDKGFinal(round uint64) bool {
diff --git a/core/configuration-chain_test.go b/core/configuration-chain_test.go
index 0adfdcf..c3a8023 100644
--- a/core/configuration-chain_test.go
+++ b/core/configuration-chain_test.go
@@ -195,14 +195,50 @@ func (s *ConfigurationChainTestSuite) setupNodes(n int) {
}
}
+type testEvent struct {
+ event *common.Event
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+func newTestEvent() *testEvent {
+ e := &testEvent{
+ event: common.NewEvent(),
+ }
+ return e
+}
+
+func (evt *testEvent) run(interval time.Duration) {
+ evt.ctx, evt.cancel = context.WithCancel(context.Background())
+ go func() {
+ height := uint64(0)
+ Loop:
+ for {
+ select {
+ case <-evt.ctx.Done():
+ break Loop
+ case <-time.After(interval):
+ }
+ evt.event.NotifyHeight(height)
+ height++
+ }
+ }()
+}
+
+func (evt *testEvent) stop() {
+ evt.cancel()
+}
+
func (s *ConfigurationChainTestSuite) runDKG(
k, n int, round, reset uint64) map[types.NodeID]*configurationChain {
s.setupNodes(n)
+ evts := make(map[types.NodeID]*testEvent)
cfgChains := make(map[types.NodeID]*configurationChain)
recv := newTestCCGlobalReceiver(s)
for _, nID := range s.nIDs {
+ evts[nID] = newTestEvent()
gov, err := test.NewGovernance(test.NewState(DKGDelayRound,
s.pubKeys, 100*time.Millisecond, &common.NullLogger{}, true,
), ConfigRoundShift)
@@ -228,11 +264,13 @@ func (s *ConfigurationChainTestSuite) runDKG(
errs := make(chan error, n)
wg := sync.WaitGroup{}
wg.Add(n)
- for _, cc := range cfgChains {
- go func(cc *configurationChain) {
+ for nID, cc := range cfgChains {
+ go func(cc *configurationChain, nID types.NodeID) {
defer wg.Done()
- errs <- cc.runDKG(round, reset)
- }(cc)
+ errs <- cc.runDKG(round, reset, evts[nID].event, 10, 0)
+ }(cc, nID)
+ evts[nID].run(100 * time.Millisecond)
+ defer evts[nID].stop()
}
wg.Wait()
for range cfgChains {
@@ -324,6 +362,7 @@ func (s *ConfigurationChainTestSuite) TestDKGMasterPublicKeyDelayAdd() {
round := DKGDelayRound
reset := uint64(0)
lambdaDKG := 1000 * time.Millisecond
+ minBlockInterval := 100 * time.Millisecond
s.setupNodes(n)
cfgChains := make(map[types.NodeID]*configurationChain)
@@ -337,6 +376,8 @@ func (s *ConfigurationChainTestSuite) TestDKGMasterPublicKeyDelayAdd() {
s.Require().NoError(err)
s.Require().NoError(state.RequestChange(
test.StateChangeLambdaDKG, lambdaDKG))
+ s.Require().NoError(state.RequestChange(
+ test.StateChangeMinBlockInterval, minBlockInterval))
cache := utils.NewNodeSetCache(gov)
dbInst, err := db.NewMemBackedDB()
s.Require().NoError(err)
@@ -364,10 +405,13 @@ func (s *ConfigurationChainTestSuite) TestDKGMasterPublicKeyDelayAdd() {
wg := sync.WaitGroup{}
wg.Add(n)
for _, cc := range cfgChains {
+ evt := newTestEvent()
go func(cc *configurationChain) {
defer wg.Done()
- errs <- cc.runDKG(round, reset)
+ errs <- cc.runDKG(round, reset, evt.event, 0, 0)
}(cc)
+ evt.run(100 * time.Millisecond)
+ defer evt.stop()
}
wg.Wait()
for range cfgChains {
@@ -391,6 +435,7 @@ func (s *ConfigurationChainTestSuite) TestDKGComplaintDelayAdd() {
round := DKGDelayRound
reset := uint64(0)
lambdaDKG := 1000 * time.Millisecond
+ minBlockInterval := 100 * time.Millisecond
s.setupNodes(n)
cfgChains := make(map[types.NodeID]*configurationChain)
@@ -403,6 +448,8 @@ func (s *ConfigurationChainTestSuite) TestDKGComplaintDelayAdd() {
s.Require().NoError(err)
s.Require().NoError(state.RequestChange(
test.StateChangeLambdaDKG, lambdaDKG))
+ s.Require().NoError(state.RequestChange(
+ test.StateChangeMinBlockInterval, minBlockInterval))
cache := utils.NewNodeSetCache(gov)
dbInst, err := db.NewMemBackedDB()
s.Require().NoError(err)
@@ -425,10 +472,13 @@ func (s *ConfigurationChainTestSuite) TestDKGComplaintDelayAdd() {
wg := sync.WaitGroup{}
wg.Add(n)
for _, cc := range cfgChains {
+ evt := newTestEvent()
go func(cc *configurationChain) {
defer wg.Done()
- errs <- cc.runDKG(round, reset)
+ errs <- cc.runDKG(round, reset, evt.event, 0, 0)
}(cc)
+ evt.run(minBlockInterval)
+ defer evt.stop()
}
complaints := -1
go func() {
@@ -636,36 +686,54 @@ func (s *ConfigurationChainTestSuite) TestDKGAbort() {
cc.registerDKG(context.Background(), round, reset, k)
// We should be blocked because DKGReady is not enough.
errs := make(chan error, 1)
- called := make(chan struct{}, 1)
+ evt := newTestEvent()
go func() {
- called <- struct{}{}
- errs <- cc.runDKG(round, reset)
+ errs <- cc.runDKG(round, reset, evt.event, 0, 0)
}()
+ evt.run(100 * time.Millisecond)
+ defer evt.stop()
+
// The second register shouldn't be blocked, too.
randHash := common.NewRandomHash()
gov.ResetDKG(randHash[:])
- <-called
+ for func() bool {
+ cc.dkgLock.RLock()
+ defer cc.dkgLock.RUnlock()
+ return !cc.dkgRunning
+ }() {
+ time.Sleep(100 * time.Millisecond)
+ }
cc.registerDKG(context.Background(), round, reset+1, k)
err = <-errs
s.Require().EqualError(ErrDKGAborted, err.Error())
go func() {
- called <- struct{}{}
- errs <- cc.runDKG(round, reset+1)
+ errs <- cc.runDKG(round, reset+1, evt.event, 0, 0)
}()
// The third register shouldn't be blocked, too
randHash = common.NewRandomHash()
gov.ProposeCRS(round+1, randHash[:])
randHash = common.NewRandomHash()
gov.ResetDKG(randHash[:])
- <-called
+ for func() bool {
+ cc.dkgLock.RLock()
+ defer cc.dkgLock.RUnlock()
+ return !cc.dkgRunning
+ }() {
+ time.Sleep(100 * time.Millisecond)
+ }
cc.registerDKG(context.Background(), round+1, reset+1, k)
err = <-errs
s.Require().EqualError(ErrDKGAborted, err.Error())
go func() {
- called <- struct{}{}
- errs <- cc.runDKG(round+1, reset+1)
+ errs <- cc.runDKG(round+1, reset+1, evt.event, 0, 0)
}()
- <-called
+ for func() bool {
+ cc.dkgLock.RLock()
+ defer cc.dkgLock.RUnlock()
+ return !cc.dkgRunning
+ }() {
+ time.Sleep(100 * time.Millisecond)
+ }
// Abort with older round, shouldn't be aborted.
aborted := cc.abortDKG(context.Background(), round, reset+1)
s.Require().False(aborted)
diff --git a/core/consensus.go b/core/consensus.go
index 2246bf1..6e30723 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -980,13 +980,17 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) {
con.cfgModule.registerDKG(con.ctx, nextRound, e.Reset,
utils.GetDKGThreshold(nextConfig))
con.event.RegisterHeight(e.NextDKGPreparationHeight(),
- func(uint64) {
+ func(h uint64) {
func() {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
con.dkgRunning = 0
}()
- con.runDKG(nextRound, e.Reset, nextConfig)
+ // We want to skip some of the DKG phases when started.
+ dkgCurrentHeight := h - e.NextDKGPreparationHeight()
+ con.runDKG(
+ nextRound, e.Reset,
+ e.NextDKGPreparationHeight(), dkgCurrentHeight)
})
}()
})
@@ -1118,7 +1122,8 @@ func (con *Consensus) generateBlockRandomness(blocks []*types.Block) {
}
// runDKG starts running DKG protocol.
-func (con *Consensus) runDKG(round, reset uint64, config *types.Config) {
+func (con *Consensus) runDKG(
+ round, reset, dkgBeginHeight, dkgHeight uint64) {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
if con.dkgRunning != 0 {
@@ -1132,7 +1137,10 @@ func (con *Consensus) runDKG(round, reset uint64, config *types.Config) {
con.dkgReady.Broadcast()
con.dkgRunning = 2
}()
- if err := con.cfgModule.runDKG(round, reset); err != nil {
+ if err :=
+ con.cfgModule.runDKG(
+ round, reset,
+ con.event, dkgBeginHeight, dkgHeight); err != nil {
con.logger.Error("Failed to runDKG", "error", err)
}
}()
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 9565e95..19c43ad 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -267,7 +267,23 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
}
time.Sleep(gov.Configuration(0).MinBlockInterval * 4)
for _, con := range cons {
- go con.runDKG(0, 0, gov.Configuration(0))
+ go con.runDKG(0, 0, 0, 0)
+ }
+ crsFinish := make(chan struct{}, len(cons))
+ for _, con := range cons {
+ go func(con *Consensus) {
+ height := uint64(0)
+ Loop:
+ for {
+ select {
+ case <-crsFinish:
+ break Loop
+ case <-time.After(lambda):
+ }
+ con.event.NotifyHeight(height)
+ height++
+ }
+ }(con)
}
for _, con := range cons {
func() {
@@ -278,16 +294,12 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
}
}()
}
- crsFinish := make(chan struct{})
for _, con := range cons {
go func(con *Consensus) {
con.runCRS(0, gov.CRS(0), false)
crsFinish <- struct{}{}
}(con)
}
- for range cons {
- <-crsFinish
- }
s.NotNil(gov.CRS(1))
}