aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/consensus.go101
-rw-r--r--core/consensus_test.go50
-rw-r--r--core/syncer/consensus.go14
-rw-r--r--core/test/app.go13
-rw-r--r--core/test/app_test.go3
-rw-r--r--core/utils/round-event.go68
-rw-r--r--integration_test/round-event_test.go8
7 files changed, 154 insertions, 103 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 8f8002b..83727ec 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -658,8 +658,10 @@ func (con *Consensus) prepare(
// Register round event handler to abort previous running DKG if any.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
e := evts[len(evts)-1]
- defer elapse("abort DKG", e)()
- con.cfgModule.abortDKG(e.Round+1, e.Reset)
+ go func() {
+ defer elapse("abort DKG", e)()
+ con.cfgModule.abortDKG(e.Round+1, e.Reset)
+ }()
})
// Register round event handler to update BA and BC modules.
con.roundEvent.Register(func(evts []utils.RoundEventParam) {
@@ -721,8 +723,10 @@ func (con *Consensus) prepare(
return
}
// Aborting all previous running DKG protocol instance if any.
- con.cfgModule.abortDKG(nextRound, e.Reset)
- con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true)
+ go func() {
+ con.cfgModule.abortDKG(nextRound, e.Reset)
+ con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true)
+ }()
})
})
// Register round event handler to propose new CRS.
@@ -750,7 +754,7 @@ func (con *Consensus) prepare(
con.logger.Debug("CRS already proposed", "round", e.Round+1)
return
}
- con.runCRS(e.Round, e.CRS, false)
+ go con.runCRS(e.Round, e.CRS, false)
})
}
})
@@ -788,10 +792,8 @@ func (con *Consensus) prepare(
e := evts[len(evts)-1]
defer elapse("next round", e)()
// Register a routine to trigger round events.
- con.event.RegisterHeight(e.NextRoundValidationHeight(), func(
- blockHeight uint64) {
- con.roundEvent.ValidateNextRound(blockHeight)
- })
+ con.event.RegisterHeight(e.NextRoundValidationHeight(),
+ utils.RoundEventRetryHandlerGenerator(con.roundEvent, con.event))
// Register a routine to register next DKG.
con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
nextRound := e.Round + 1
@@ -801,48 +803,53 @@ func (con *Consensus) prepare(
"reset", e.Reset)
return
}
- // Normally, gov.CRS would return non-nil. Use this for in case of
- // unexpected network fluctuation and ensure the robustness.
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for DKG set",
- "round", nextRound,
- "reset", e.Reset)
- return
- }
- nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
- if err != nil {
- con.logger.Error("Error getting DKG set for next round",
- "round", nextRound,
- "reset", e.Reset,
- "error", err)
- return
- }
- if _, exist := nextDkgSet[con.ID]; !exist {
- con.logger.Info("Not selected as DKG set",
+ go func() {
+ // Normally, gov.CRS would return non-nil. Use this for in case
+ // of unexpected network fluctuation and ensure the robustness.
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Debug("unable to prepare CRS for DKG set",
+ "round", nextRound,
+ "reset", e.Reset)
+ return
+ }
+ nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
+ if err != nil {
+ con.logger.Error("Error getting DKG set for next round",
+ "round", nextRound,
+ "reset", e.Reset,
+ "error", err)
+ return
+ }
+ if _, exist := nextDkgSet[con.ID]; !exist {
+ con.logger.Info("Not selected as DKG set",
+ "round", nextRound,
+ "reset", e.Reset)
+ return
+ }
+ con.logger.Info("Selected as DKG set",
"round", nextRound,
"reset", e.Reset)
- return
- }
- con.logger.Info("Selected as DKG set",
- "round", nextRound,
- "reset", e.Reset)
- nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
- con.logger)
- con.cfgModule.registerDKG(nextRound, e.Reset, utils.GetDKGThreshold(
- nextConfig))
- con.event.RegisterHeight(e.NextDKGPreparationHeight(),
- func(uint64) {
- func() {
- con.dkgReady.L.Lock()
- defer con.dkgReady.L.Unlock()
- con.dkgRunning = 0
- }()
- con.runDKG(nextRound, e.Reset, nextConfig)
- })
+ nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
+ con.logger)
+ con.cfgModule.registerDKG(nextRound, e.Reset,
+ utils.GetDKGThreshold(nextConfig))
+ con.event.RegisterHeight(e.NextDKGPreparationHeight(),
+ func(uint64) {
+ func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgRunning = 0
+ }()
+ con.runDKG(nextRound, e.Reset, nextConfig)
+ })
+ }()
})
})
con.roundEvent.TriggerInitEvent()
+ if initBlock != nil {
+ con.event.NotifyHeight(initBlock.Finalization.Height)
+ }
return
}
@@ -1289,7 +1296,7 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
"pending", con.bcModule.lastPendingBlock())
for _, b := range deliveredBlocks {
con.deliverBlock(b)
- go con.event.NotifyHeight(b.Finalization.Height)
+ con.event.NotifyHeight(b.Finalization.Height)
}
return
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index e60a173..2a5cc54 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -338,6 +338,56 @@ func (s *ConsensusTestSuite) TestSyncBA() {
// Negative cases are moved to TestVerifyAgreementResult in utils_test.go.
}
+func (s *ConsensusTestSuite) TestInitialHeightEventTriggered() {
+ // Initial block is the last block of corresponding round, in this case,
+ // we should make sure all height event handlers could be triggered after
+ // returned from Consensus.prepare().
+ prvKeys, pubKeys, err := test.NewKeys(4)
+ s.Require().NoError(err)
+ // Prepare a governance instance, whose DKG-reset-count for round 2 is 1.
+ gov, err := test.NewGovernance(test.NewState(DKGDelayRound,
+ pubKeys, time.Second, &common.NullLogger{}, true), ConfigRoundShift)
+ gov.State().RequestChange(test.StateChangeRoundLength, uint64(100))
+ s.Require().NoError(err)
+ gov.NotifyRound(3)
+ hash := common.NewRandomHash()
+ gov.ProposeCRS(2, hash[:])
+ hash = common.NewRandomHash()
+ gov.ResetDKG(hash[:])
+ s.Require().Equal(gov.DKGResetCount(2), uint64(1))
+ prvKey := prvKeys[0]
+ initBlock := &types.Block{
+ Hash: common.NewRandomHash(),
+ Position: types.Position{Round: 1, Height: 199},
+ Finalization: types.FinalizationResult{Height: 200},
+ }
+ dbInst, err := db.NewMemBackedDB()
+ s.Require().NoError(err)
+ nID := types.NewNodeID(prvKey.PublicKey())
+ conn := s.newNetworkConnection()
+ network := conn.newNetwork(nID)
+ con, err := NewConsensusFromSyncer(
+ initBlock,
+ 100,
+ false,
+ time.Now().UTC(),
+ test.NewApp(0, nil, nil),
+ gov,
+ dbInst,
+ network,
+ prvKey,
+ []*types.Block(nil),
+ []*types.BlockRandomnessResult(nil),
+ []interface{}(nil),
+ &common.NullLogger{},
+ )
+ s.Require().NoError(err)
+ // Here is the tricky part, check if block chain module can handle the
+ // block with height == 200.
+ s.Require().Equal(con.bcModule.configs[0].RoundID(), uint64(1))
+ s.Require().Equal(con.bcModule.configs[0].RoundEndHeight(), uint64(300))
+}
+
func TestConsensus(t *testing.T) {
suite.Run(t, new(ConsensusTestSuite))
}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 2eeee9d..4fc24b4 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -209,16 +209,10 @@ func (con *Consensus) assureBuffering() {
})
// Register a round event handler to validate next round.
con.roundEvt.Register(func(evts []utils.RoundEventParam) {
- e := evts[len(evts)-1]
- con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func(
- blockHeight uint64) {
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- con.roundEvt.ValidateNextRound(blockHeight)
- })
+ con.heightEvt.RegisterHeight(
+ evts[len(evts)-1].NextRoundValidationHeight(),
+ utils.RoundEventRetryHandlerGenerator(con.roundEvt, con.heightEvt),
+ )
})
con.roundEvt.TriggerInitEvent()
con.startAgreement()
diff --git a/core/test/app.go b/core/test/app.go
index 12b2047..d704698 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -108,18 +108,13 @@ func NewApp(initRound uint64, gov *Governance, rEvt *utils.RoundEvent) (
app.state = gov.State()
}
if rEvt != nil {
- app.hEvt.RegisterHeight(
- utils.GetNextRoundValidationHeight(rEvt.LastPeriod()),
- func(h uint64) {
- rEvt.ValidateNextRound(h)
- })
rEvt.Register(func(evts []utils.RoundEventParam) {
app.hEvt.RegisterHeight(
evts[len(evts)-1].NextRoundValidationHeight(),
- func(h uint64) {
- rEvt.ValidateNextRound(h)
- })
+ utils.RoundEventRetryHandlerGenerator(rEvt, app.hEvt),
+ )
})
+ rEvt.TriggerInitEvent()
}
return app
}
@@ -265,7 +260,7 @@ func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position,
}
}
}()
- go app.hEvt.NotifyHeight(result.Height)
+ app.hEvt.NotifyHeight(result.Height)
}
// GetLatestDeliveredPosition would return the latest position of delivered
diff --git a/core/test/app_test.go b/core/test/app_test.go
index 138f803..e06b758 100644
--- a/core/test/app_test.go
+++ b/core/test/app_test.go
@@ -309,7 +309,7 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() {
1900, 2019, core.ConfigRoundShift)
s.Require().NoError(err)
// Register a handler to collects triggered events.
- evts := make(chan evtParamToCheck, 2)
+ evts := make(chan evtParamToCheck, 3)
rEvt.Register(func(params []utils.RoundEventParam) {
for _, p := range params {
evts <- evtParamToCheck{
@@ -336,6 +336,7 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() {
// Deliver blocks from height=2020 to height=2081.
deliver(0, 0, 2019)
deliver(19, 2020, 2091)
+ s.Require().Equal(<-evts, evtParamToCheck{19, 1, 2000, gov.CRS(19)})
s.Require().Equal(<-evts, evtParamToCheck{19, 2, 2100, gov.CRS(19)})
s.Require().Equal(<-evts, evtParamToCheck{20, 0, 2200, gov.CRS(20)})
// Deliver blocks from height=2082 to height=2281.
diff --git a/core/utils/round-event.go b/core/utils/round-event.go
index 3536a27..ff1d91e 100644
--- a/core/utils/round-event.go
+++ b/core/utils/round-event.go
@@ -21,7 +21,6 @@ import (
"context"
"fmt"
"sync"
- "time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/types"
@@ -127,6 +126,21 @@ type governanceAccessor interface {
DKGResetCount(round uint64) uint64
}
+// RoundEventRetryHandlerGenerator generates a handler to common.Event, which
+// would register itself to retry next round validation if round event is not
+// triggered.
+func RoundEventRetryHandlerGenerator(
+ rEvt *RoundEvent, hEvt *common.Event) func(uint64) {
+ var hEvtHandler func(uint64)
+ hEvtHandler = func(h uint64) {
+ if rEvt.ValidateNextRound(h) == 0 {
+ // Retry until at least one round event is triggered.
+ hEvt.RegisterHeight(h+1, hEvtHandler)
+ }
+ }
+ return hEvtHandler
+}
+
// RoundEvent would be triggered when either:
// - the next DKG set setup is ready.
// - the next DKG set setup is failed, and previous DKG set already reset the
@@ -140,9 +154,9 @@ type RoundEvent struct {
lastTriggeredRound uint64
lastTriggeredResetCount uint64
roundShift uint64
+ dkgFailed bool
ctx context.Context
ctxCancel context.CancelFunc
- retryInterval time.Duration
}
// NewRoundEvent creates an RoundEvent instance.
@@ -158,7 +172,6 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor,
logger: logger,
lastTriggeredRound: initRound,
roundShift: roundShift,
- retryInterval: initConfig.LambdaBA,
}
e.ctx, e.ctxCancel = context.WithCancel(parentCtx)
e.config = RoundBasedConfig{}
@@ -212,20 +225,20 @@ func (e *RoundEvent) TriggerInitEvent() {
// failed to setup, all registered handlers would be called once some decision
// is made on chain.
//
-// This method would block until at least one event is triggered. Multiple
-// trigger in one call is possible.
-func (e *RoundEvent) ValidateNextRound(blockHeight uint64) {
+// The count of triggered events would be returned.
+func (e *RoundEvent) ValidateNextRound(blockHeight uint64) (count uint) {
// To make triggers continuous and sequential, the next validation should
// wait for previous one finishing. That's why I use mutex here directly.
var events []RoundEventParam
e.lock.Lock()
defer e.lock.Unlock()
- e.logger.Info("ValidateNextRound",
+ e.logger.Trace("ValidateNextRound",
"height", blockHeight,
"round", e.lastTriggeredRound,
"count", e.lastTriggeredResetCount)
defer func() {
- if len(events) == 0 {
+ count = uint(len(events))
+ if count == 0 {
return
}
for _, h := range e.handlers {
@@ -235,34 +248,24 @@ func (e *RoundEvent) ValidateNextRound(blockHeight uint64) {
}
}()
var (
- dkgFailed, triggered bool
- param RoundEventParam
- beginHeight = blockHeight
- startRound = e.lastTriggeredRound
+ triggered bool
+ param RoundEventParam
+ beginHeight = blockHeight
+ startRound = e.lastTriggeredRound
)
for {
- for {
- param, dkgFailed, triggered = e.check(beginHeight, startRound,
- dkgFailed)
- if !triggered {
- break
- }
- events = append(events, param)
- beginHeight = param.BeginHeight
- }
- if len(events) > 0 {
+ param, triggered = e.check(beginHeight, startRound)
+ if !triggered {
break
}
- select {
- case <-e.ctx.Done():
- return
- case <-time.After(e.retryInterval):
- }
+ events = append(events, param)
+ beginHeight = param.BeginHeight
}
+ return
}
-func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) (
- param RoundEventParam, dkgFailed bool, triggered bool) {
+func (e *RoundEvent) check(blockHeight, startRound uint64) (
+ param RoundEventParam, triggered bool) {
defer func() {
if !triggered {
return
@@ -296,14 +299,14 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) (
if resetCount > e.lastTriggeredResetCount {
e.lastTriggeredResetCount++
e.config.ExtendLength()
+ e.dkgFailed = false
triggered = true
return
}
- if lastDKGCheck {
+ if e.dkgFailed {
// We know that DKG already failed, now wait for the DKG set from
// previous round to reset DKG and don't have to reconstruct the
// group public key again.
- dkgFailed = true
return
}
if nextRound >= dkgDelayRound {
@@ -322,13 +325,14 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) (
"group public key setup failed, waiting for DKG reset",
"round", nextRound,
"reset", e.lastTriggeredResetCount)
- dkgFailed = true
+ e.dkgFailed = true
return
}
}
// The DKG set for next round is well prepared.
e.lastTriggeredRound = nextRound
e.lastTriggeredResetCount = 0
+ e.dkgFailed = false
rCfg := RoundBasedConfig{}
rCfg.SetupRoundBasedFields(nextRound, nextCfg)
rCfg.AppendTo(e.config)
diff --git a/integration_test/round-event_test.go b/integration_test/round-event_test.go
index d5b432b..b0206ee 100644
--- a/integration_test/round-event_test.go
+++ b/integration_test/round-event_test.go
@@ -136,7 +136,7 @@ func (s *RoundEventTestSuite) TestFromRound0() {
gov.ResetDKG([]byte("DKG round 1 reset 2"))
s.proposeMPK(gov, 1, 2, 3)
s.proposeFinalize(gov, 1, 2, 3)
- rEvt.ValidateNextRound(80)
+ s.Require().Equal(rEvt.ValidateNextRound(80), uint(3))
// Check collected events.
s.Require().Len(evts, 3)
s.Require().Equal(evts[0], evtParamToCheck{0, 1, 100, gov.CRS(0)})
@@ -184,16 +184,16 @@ func (s *RoundEventTestSuite) TestFromRoundN() {
}
})
// Check for round#19, reset(for round#20)#2 at height=2080.
- rEvt.ValidateNextRound(2080)
+ s.Require().Equal(rEvt.ValidateNextRound(2080), uint(2))
// Check collected events.
s.Require().Len(evts, 2)
s.Require().Equal(evts[0], evtParamToCheck{19, 2, 2100, gov.CRS(19)})
s.Require().Equal(evts[1], evtParamToCheck{20, 0, 2200, gov.CRS(20)})
// Round might exceed round-shift limitation would not be triggered.
- rEvt.ValidateNextRound(2280)
+ s.Require().Equal(rEvt.ValidateNextRound(2280), uint(1))
s.Require().Len(evts, 3)
s.Require().Equal(evts[2], evtParamToCheck{21, 0, 2300, gov.CRS(21)})
- rEvt.ValidateNextRound(2380)
+ s.Require().Equal(rEvt.ValidateNextRound(2380), uint(1))
s.Require().Equal(evts[3], evtParamToCheck{22, 0, 2400, gov.CRS(22)})
}