diff options
-rw-r--r-- | core/consensus.go | 101 | ||||
-rw-r--r-- | core/consensus_test.go | 50 | ||||
-rw-r--r-- | core/syncer/consensus.go | 14 | ||||
-rw-r--r-- | core/test/app.go | 13 | ||||
-rw-r--r-- | core/test/app_test.go | 3 | ||||
-rw-r--r-- | core/utils/round-event.go | 68 | ||||
-rw-r--r-- | integration_test/round-event_test.go | 8 |
7 files changed, 154 insertions, 103 deletions
diff --git a/core/consensus.go b/core/consensus.go index 8f8002b..83727ec 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -658,8 +658,10 @@ func (con *Consensus) prepare( // Register round event handler to abort previous running DKG if any. con.roundEvent.Register(func(evts []utils.RoundEventParam) { e := evts[len(evts)-1] - defer elapse("abort DKG", e)() - con.cfgModule.abortDKG(e.Round+1, e.Reset) + go func() { + defer elapse("abort DKG", e)() + con.cfgModule.abortDKG(e.Round+1, e.Reset) + }() }) // Register round event handler to update BA and BC modules. con.roundEvent.Register(func(evts []utils.RoundEventParam) { @@ -721,8 +723,10 @@ func (con *Consensus) prepare( return } // Aborting all previous running DKG protocol instance if any. - con.cfgModule.abortDKG(nextRound, e.Reset) - con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) + go func() { + con.cfgModule.abortDKG(nextRound, e.Reset) + con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true) + }() }) }) // Register round event handler to propose new CRS. @@ -750,7 +754,7 @@ func (con *Consensus) prepare( con.logger.Debug("CRS already proposed", "round", e.Round+1) return } - con.runCRS(e.Round, e.CRS, false) + go con.runCRS(e.Round, e.CRS, false) }) } }) @@ -788,10 +792,8 @@ func (con *Consensus) prepare( e := evts[len(evts)-1] defer elapse("next round", e)() // Register a routine to trigger round events. - con.event.RegisterHeight(e.NextRoundValidationHeight(), func( - blockHeight uint64) { - con.roundEvent.ValidateNextRound(blockHeight) - }) + con.event.RegisterHeight(e.NextRoundValidationHeight(), + utils.RoundEventRetryHandlerGenerator(con.roundEvent, con.event)) // Register a routine to register next DKG. con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) { nextRound := e.Round + 1 @@ -801,48 +803,53 @@ func (con *Consensus) prepare( "reset", e.Reset) return } - // Normally, gov.CRS would return non-nil. Use this for in case of - // unexpected network fluctuation and ensure the robustness. - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for DKG set", - "round", nextRound, - "reset", e.Reset) - return - } - nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) - if err != nil { - con.logger.Error("Error getting DKG set for next round", - "round", nextRound, - "reset", e.Reset, - "error", err) - return - } - if _, exist := nextDkgSet[con.ID]; !exist { - con.logger.Info("Not selected as DKG set", + go func() { + // Normally, gov.CRS would return non-nil. Use this for in case + // of unexpected network fluctuation and ensure the robustness. + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Debug("unable to prepare CRS for DKG set", + "round", nextRound, + "reset", e.Reset) + return + } + nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) + if err != nil { + con.logger.Error("Error getting DKG set for next round", + "round", nextRound, + "reset", e.Reset, + "error", err) + return + } + if _, exist := nextDkgSet[con.ID]; !exist { + con.logger.Info("Not selected as DKG set", + "round", nextRound, + "reset", e.Reset) + return + } + con.logger.Info("Selected as DKG set", "round", nextRound, "reset", e.Reset) - return - } - con.logger.Info("Selected as DKG set", - "round", nextRound, - "reset", e.Reset) - nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, - con.logger) - con.cfgModule.registerDKG(nextRound, e.Reset, utils.GetDKGThreshold( - nextConfig)) - con.event.RegisterHeight(e.NextDKGPreparationHeight(), - func(uint64) { - func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgRunning = 0 - }() - con.runDKG(nextRound, e.Reset, nextConfig) - }) + nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, + con.logger) + con.cfgModule.registerDKG(nextRound, e.Reset, + utils.GetDKGThreshold(nextConfig)) + con.event.RegisterHeight(e.NextDKGPreparationHeight(), + func(uint64) { + func() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + con.dkgRunning = 0 + }() + con.runDKG(nextRound, e.Reset, nextConfig) + }) + }() }) }) con.roundEvent.TriggerInitEvent() + if initBlock != nil { + con.event.NotifyHeight(initBlock.Finalization.Height) + } return } @@ -1289,7 +1296,7 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) { "pending", con.bcModule.lastPendingBlock()) for _, b := range deliveredBlocks { con.deliverBlock(b) - go con.event.NotifyHeight(b.Finalization.Height) + con.event.NotifyHeight(b.Finalization.Height) } return } diff --git a/core/consensus_test.go b/core/consensus_test.go index e60a173..2a5cc54 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -338,6 +338,56 @@ func (s *ConsensusTestSuite) TestSyncBA() { // Negative cases are moved to TestVerifyAgreementResult in utils_test.go. } +func (s *ConsensusTestSuite) TestInitialHeightEventTriggered() { + // Initial block is the last block of corresponding round, in this case, + // we should make sure all height event handlers could be triggered after + // returned from Consensus.prepare(). + prvKeys, pubKeys, err := test.NewKeys(4) + s.Require().NoError(err) + // Prepare a governance instance, whose DKG-reset-count for round 2 is 1. + gov, err := test.NewGovernance(test.NewState(DKGDelayRound, + pubKeys, time.Second, &common.NullLogger{}, true), ConfigRoundShift) + gov.State().RequestChange(test.StateChangeRoundLength, uint64(100)) + s.Require().NoError(err) + gov.NotifyRound(3) + hash := common.NewRandomHash() + gov.ProposeCRS(2, hash[:]) + hash = common.NewRandomHash() + gov.ResetDKG(hash[:]) + s.Require().Equal(gov.DKGResetCount(2), uint64(1)) + prvKey := prvKeys[0] + initBlock := &types.Block{ + Hash: common.NewRandomHash(), + Position: types.Position{Round: 1, Height: 199}, + Finalization: types.FinalizationResult{Height: 200}, + } + dbInst, err := db.NewMemBackedDB() + s.Require().NoError(err) + nID := types.NewNodeID(prvKey.PublicKey()) + conn := s.newNetworkConnection() + network := conn.newNetwork(nID) + con, err := NewConsensusFromSyncer( + initBlock, + 100, + false, + time.Now().UTC(), + test.NewApp(0, nil, nil), + gov, + dbInst, + network, + prvKey, + []*types.Block(nil), + []*types.BlockRandomnessResult(nil), + []interface{}(nil), + &common.NullLogger{}, + ) + s.Require().NoError(err) + // Here is the tricky part, check if block chain module can handle the + // block with height == 200. + s.Require().Equal(con.bcModule.configs[0].RoundID(), uint64(1)) + s.Require().Equal(con.bcModule.configs[0].RoundEndHeight(), uint64(300)) +} + func TestConsensus(t *testing.T) { suite.Run(t, new(ConsensusTestSuite)) } diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 2eeee9d..4fc24b4 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -209,16 +209,10 @@ func (con *Consensus) assureBuffering() { }) // Register a round event handler to validate next round. con.roundEvt.Register(func(evts []utils.RoundEventParam) { - e := evts[len(evts)-1] - con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func( - blockHeight uint64) { - select { - case <-con.ctx.Done(): - return - default: - } - con.roundEvt.ValidateNextRound(blockHeight) - }) + con.heightEvt.RegisterHeight( + evts[len(evts)-1].NextRoundValidationHeight(), + utils.RoundEventRetryHandlerGenerator(con.roundEvt, con.heightEvt), + ) }) con.roundEvt.TriggerInitEvent() con.startAgreement() diff --git a/core/test/app.go b/core/test/app.go index 12b2047..d704698 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -108,18 +108,13 @@ func NewApp(initRound uint64, gov *Governance, rEvt *utils.RoundEvent) ( app.state = gov.State() } if rEvt != nil { - app.hEvt.RegisterHeight( - utils.GetNextRoundValidationHeight(rEvt.LastPeriod()), - func(h uint64) { - rEvt.ValidateNextRound(h) - }) rEvt.Register(func(evts []utils.RoundEventParam) { app.hEvt.RegisterHeight( evts[len(evts)-1].NextRoundValidationHeight(), - func(h uint64) { - rEvt.ValidateNextRound(h) - }) + utils.RoundEventRetryHandlerGenerator(rEvt, app.hEvt), + ) }) + rEvt.TriggerInitEvent() } return app } @@ -265,7 +260,7 @@ func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position, } } }() - go app.hEvt.NotifyHeight(result.Height) + app.hEvt.NotifyHeight(result.Height) } // GetLatestDeliveredPosition would return the latest position of delivered diff --git a/core/test/app_test.go b/core/test/app_test.go index 138f803..e06b758 100644 --- a/core/test/app_test.go +++ b/core/test/app_test.go @@ -309,7 +309,7 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() { 1900, 2019, core.ConfigRoundShift) s.Require().NoError(err) // Register a handler to collects triggered events. - evts := make(chan evtParamToCheck, 2) + evts := make(chan evtParamToCheck, 3) rEvt.Register(func(params []utils.RoundEventParam) { for _, p := range params { evts <- evtParamToCheck{ @@ -336,6 +336,7 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() { // Deliver blocks from height=2020 to height=2081. deliver(0, 0, 2019) deliver(19, 2020, 2091) + s.Require().Equal(<-evts, evtParamToCheck{19, 1, 2000, gov.CRS(19)}) s.Require().Equal(<-evts, evtParamToCheck{19, 2, 2100, gov.CRS(19)}) s.Require().Equal(<-evts, evtParamToCheck{20, 0, 2200, gov.CRS(20)}) // Deliver blocks from height=2082 to height=2281. diff --git a/core/utils/round-event.go b/core/utils/round-event.go index 3536a27..ff1d91e 100644 --- a/core/utils/round-event.go +++ b/core/utils/round-event.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "sync" - "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" @@ -127,6 +126,21 @@ type governanceAccessor interface { DKGResetCount(round uint64) uint64 } +// RoundEventRetryHandlerGenerator generates a handler to common.Event, which +// would register itself to retry next round validation if round event is not +// triggered. +func RoundEventRetryHandlerGenerator( + rEvt *RoundEvent, hEvt *common.Event) func(uint64) { + var hEvtHandler func(uint64) + hEvtHandler = func(h uint64) { + if rEvt.ValidateNextRound(h) == 0 { + // Retry until at least one round event is triggered. + hEvt.RegisterHeight(h+1, hEvtHandler) + } + } + return hEvtHandler +} + // RoundEvent would be triggered when either: // - the next DKG set setup is ready. // - the next DKG set setup is failed, and previous DKG set already reset the @@ -140,9 +154,9 @@ type RoundEvent struct { lastTriggeredRound uint64 lastTriggeredResetCount uint64 roundShift uint64 + dkgFailed bool ctx context.Context ctxCancel context.CancelFunc - retryInterval time.Duration } // NewRoundEvent creates an RoundEvent instance. @@ -158,7 +172,6 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor, logger: logger, lastTriggeredRound: initRound, roundShift: roundShift, - retryInterval: initConfig.LambdaBA, } e.ctx, e.ctxCancel = context.WithCancel(parentCtx) e.config = RoundBasedConfig{} @@ -212,20 +225,20 @@ func (e *RoundEvent) TriggerInitEvent() { // failed to setup, all registered handlers would be called once some decision // is made on chain. // -// This method would block until at least one event is triggered. Multiple -// trigger in one call is possible. -func (e *RoundEvent) ValidateNextRound(blockHeight uint64) { +// The count of triggered events would be returned. +func (e *RoundEvent) ValidateNextRound(blockHeight uint64) (count uint) { // To make triggers continuous and sequential, the next validation should // wait for previous one finishing. That's why I use mutex here directly. var events []RoundEventParam e.lock.Lock() defer e.lock.Unlock() - e.logger.Info("ValidateNextRound", + e.logger.Trace("ValidateNextRound", "height", blockHeight, "round", e.lastTriggeredRound, "count", e.lastTriggeredResetCount) defer func() { - if len(events) == 0 { + count = uint(len(events)) + if count == 0 { return } for _, h := range e.handlers { @@ -235,34 +248,24 @@ func (e *RoundEvent) ValidateNextRound(blockHeight uint64) { } }() var ( - dkgFailed, triggered bool - param RoundEventParam - beginHeight = blockHeight - startRound = e.lastTriggeredRound + triggered bool + param RoundEventParam + beginHeight = blockHeight + startRound = e.lastTriggeredRound ) for { - for { - param, dkgFailed, triggered = e.check(beginHeight, startRound, - dkgFailed) - if !triggered { - break - } - events = append(events, param) - beginHeight = param.BeginHeight - } - if len(events) > 0 { + param, triggered = e.check(beginHeight, startRound) + if !triggered { break } - select { - case <-e.ctx.Done(): - return - case <-time.After(e.retryInterval): - } + events = append(events, param) + beginHeight = param.BeginHeight } + return } -func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) ( - param RoundEventParam, dkgFailed bool, triggered bool) { +func (e *RoundEvent) check(blockHeight, startRound uint64) ( + param RoundEventParam, triggered bool) { defer func() { if !triggered { return @@ -296,14 +299,14 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) ( if resetCount > e.lastTriggeredResetCount { e.lastTriggeredResetCount++ e.config.ExtendLength() + e.dkgFailed = false triggered = true return } - if lastDKGCheck { + if e.dkgFailed { // We know that DKG already failed, now wait for the DKG set from // previous round to reset DKG and don't have to reconstruct the // group public key again. - dkgFailed = true return } if nextRound >= dkgDelayRound { @@ -322,13 +325,14 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) ( "group public key setup failed, waiting for DKG reset", "round", nextRound, "reset", e.lastTriggeredResetCount) - dkgFailed = true + e.dkgFailed = true return } } // The DKG set for next round is well prepared. e.lastTriggeredRound = nextRound e.lastTriggeredResetCount = 0 + e.dkgFailed = false rCfg := RoundBasedConfig{} rCfg.SetupRoundBasedFields(nextRound, nextCfg) rCfg.AppendTo(e.config) diff --git a/integration_test/round-event_test.go b/integration_test/round-event_test.go index d5b432b..b0206ee 100644 --- a/integration_test/round-event_test.go +++ b/integration_test/round-event_test.go @@ -136,7 +136,7 @@ func (s *RoundEventTestSuite) TestFromRound0() { gov.ResetDKG([]byte("DKG round 1 reset 2")) s.proposeMPK(gov, 1, 2, 3) s.proposeFinalize(gov, 1, 2, 3) - rEvt.ValidateNextRound(80) + s.Require().Equal(rEvt.ValidateNextRound(80), uint(3)) // Check collected events. s.Require().Len(evts, 3) s.Require().Equal(evts[0], evtParamToCheck{0, 1, 100, gov.CRS(0)}) @@ -184,16 +184,16 @@ func (s *RoundEventTestSuite) TestFromRoundN() { } }) // Check for round#19, reset(for round#20)#2 at height=2080. - rEvt.ValidateNextRound(2080) + s.Require().Equal(rEvt.ValidateNextRound(2080), uint(2)) // Check collected events. s.Require().Len(evts, 2) s.Require().Equal(evts[0], evtParamToCheck{19, 2, 2100, gov.CRS(19)}) s.Require().Equal(evts[1], evtParamToCheck{20, 0, 2200, gov.CRS(20)}) // Round might exceed round-shift limitation would not be triggered. - rEvt.ValidateNextRound(2280) + s.Require().Equal(rEvt.ValidateNextRound(2280), uint(1)) s.Require().Len(evts, 3) s.Require().Equal(evts[2], evtParamToCheck{21, 0, 2300, gov.CRS(21)}) - rEvt.ValidateNextRound(2380) + s.Require().Equal(rEvt.ValidateNextRound(2380), uint(1)) s.Require().Equal(evts[3], evtParamToCheck{22, 0, 2400, gov.CRS(22)}) } |