aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-03-16 10:07:08 +0800
committerGitHub <noreply@github.com>2019-03-16 10:07:08 +0800
commit50c9419ba04ed44854fdb1afc1ef2e865be9876f (patch)
tree2acafd84bdb61d54da5621c29c3914986df45b65
parentb02fa5ee430cff9dafc9d9c399099a88d554a083 (diff)
downloaddexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar
dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.gz
dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.bz2
dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.lz
dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.xz
dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.tar.zst
dexon-consensus-50c9419ba04ed44854fdb1afc1ef2e865be9876f.zip
core, syncer: integrate utils.RoundEvent (#490)
-rw-r--r--core/agreement-mgr.go84
-rw-r--r--core/blockchain.go69
-rw-r--r--core/blockchain_test.go74
-rw-r--r--core/consensus.go331
-rw-r--r--core/consensus_test.go2
-rw-r--r--core/interfaces.go8
-rw-r--r--core/syncer/consensus.go137
-rw-r--r--core/test/app.go6
-rw-r--r--core/utils.go6
-rw-r--r--core/utils/round-event.go70
-rw-r--r--core/utils/utils.go6
-rw-r--r--integration_test/consensus_test.go10
-rw-r--r--integration_test/round-event_test.go29
13 files changed, 478 insertions, 354 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index 7b5effb..0e39fa5 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -90,8 +90,6 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config,
type baRoundSetting struct {
notarySet map[types.NodeID]struct{}
- agr *agreement
- recv *consensusBAReceiver
ticker Ticker
crs common.Hash
}
@@ -111,6 +109,7 @@ type agreementMgr struct {
initRound uint64
configs []agreementMgrConfig
baModule *agreement
+ recv *consensusBAReceiver
processedBAResult map[types.Position]struct{}
voteFilter *utils.VoteFilter
waitGroup sync.WaitGroup
@@ -136,15 +135,17 @@ func newAgreementMgr(con *Consensus, initRound uint64,
configs: []agreementMgrConfig{initConfig},
voteFilter: utils.NewVoteFilter(),
}
- recv := &consensusBAReceiver{
- consensus: con,
- restartNotary: make(chan types.Position, 1),
- roundValue: &atomic.Value{},
+ mgr.recv = &consensusBAReceiver{
+ consensus: con,
+ restartNotary: make(chan types.Position, 1),
+ roundValue: &atomic.Value{},
+ changeNotaryHeightValue: &atomic.Value{},
}
- recv.roundValue.Store(uint64(0))
+ mgr.recv.roundValue.Store(uint64(0))
+ mgr.recv.changeNotaryHeightValue.Store(uint64(0))
agr := newAgreement(
mgr.ID,
- recv,
+ mgr.recv,
newLeaderSelector(genValidLeader(mgr), mgr.logger),
mgr.signer,
mgr.logger)
@@ -156,7 +157,7 @@ func newAgreementMgr(con *Consensus, initRound uint64,
agr.notarySet = nodes.GetSubSet(
int(initConfig.notarySetSize), types.NewNotarySetTarget(initConfig.crs))
// Hacky way to make agreement module self contained.
- recv.agreementModule = agr
+ mgr.recv.agreementModule = agr
mgr.baModule = agr
return
}
@@ -188,15 +189,43 @@ func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig {
return &mgr.configs[roundIndex]
}
-func (mgr *agreementMgr) appendConfig(
- round uint64, config *types.Config, crs common.Hash) (err error) {
+func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error {
mgr.lock.Lock()
defer mgr.lock.Unlock()
- if round != uint64(len(mgr.configs))+mgr.initRound {
- return ErrRoundNotIncreasing
+ apply := func(e utils.RoundEventParam) error {
+ if len(mgr.configs) > 0 {
+ lastCfg := mgr.configs[len(mgr.configs)-1]
+ if e.BeginHeight != lastCfg.RoundEndHeight() {
+ return ErrInvalidBlockHeight
+ }
+ if lastCfg.RoundID() == e.Round {
+ mgr.configs[len(mgr.configs)-1].ExtendLength()
+ // It's not an atomic operation to update an atomic value based
+ // on another. However, it's the best way so far to extend
+ // length of round without refactoring.
+ if mgr.recv.round() == e.Round {
+ mgr.recv.changeNotaryHeightValue.Store(
+ mgr.configs[len(mgr.configs)-1].RoundEndHeight())
+ }
+ } else if lastCfg.RoundID()+1 == e.Round {
+ mgr.configs = append(mgr.configs, newAgreementMgrConfig(
+ lastCfg, e.Config, e.CRS))
+ } else {
+ return ErrInvalidRoundID
+ }
+ } else {
+ c := agreementMgrConfig{}
+ c.from(e.Round, e.Config, e.CRS)
+ c.SetRoundBeginHeight(e.BeginHeight)
+ mgr.configs = append(mgr.configs, c)
+ }
+ return nil
+ }
+ for _, e := range evts {
+ if err := apply(e); err != nil {
+ return err
+ }
}
- mgr.configs = append(mgr.configs, newAgreementMgrConfig(
- mgr.configs[len(mgr.configs)-1], config, crs))
return nil
}
@@ -252,7 +281,7 @@ func (mgr *agreementMgr) processAgreementResult(
}
} else if result.Position.Newer(aID) {
mgr.logger.Info("Fast syncing BA", "position", result.Position)
- nodes, err := mgr.cache.GetNodeSet(result.Position.Round)
+ nIDs, err := mgr.cache.GetNotarySet(result.Position.Round)
if err != nil {
return err
}
@@ -261,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult(
mgr.network.PullBlocks(common.Hashes{result.BlockHash})
mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round)
crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger)
- nIDs := nodes.GetSubSet(
- int(utils.GetConfigWithPanic(
- mgr.gov, result.Position.Round, mgr.logger).NotarySetSize),
- types.NewNotarySetTarget(crs))
for key := range result.Votes {
if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
return err
@@ -296,10 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) {
currentRound uint64
nextRound = initRound
curConfig = mgr.config(initRound)
- setting = baRoundSetting{
- agr: mgr.baModule,
- recv: mgr.baModule.data.recv.(*consensusBAReceiver),
- }
+ setting = baRoundSetting{}
tickDuration time.Duration
)
@@ -353,12 +375,12 @@ Loop:
break Loop
default:
}
- setting.recv.isNotary = checkRound()
+ mgr.recv.isNotary = checkRound()
// Run BA for this round.
- setting.recv.roundValue.Store(currentRound)
- setting.recv.changeNotaryHeight = curConfig.RoundEndHeight()
- setting.recv.restartNotary <- types.Position{
- Round: setting.recv.round(),
+ mgr.recv.roundValue.Store(currentRound)
+ mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight())
+ mgr.recv.restartNotary <- types.Position{
+ Round: mgr.recv.round(),
Height: math.MaxUint64,
}
mgr.voteFilter = utils.NewVoteFilter()
@@ -373,8 +395,8 @@ Loop:
func (mgr *agreementMgr) baRoutineForOneRound(
setting *baRoundSetting) (err error) {
- agr := setting.agr
- recv := setting.recv
+ agr := mgr.baModule
+ recv := mgr.recv
oldPos := agr.agreementID()
restart := func(restartPos types.Position) (breakLoop bool, err error) {
if !isStop(restartPos) {
diff --git a/core/blockchain.go b/core/blockchain.go
index 9263f67..c5a22b6 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -41,7 +41,6 @@ var (
ErrNotFollowTipPosition = errors.New("not follow tip position")
ErrDuplicatedPendingBlock = errors.New("duplicated pending block")
ErrRetrySanityCheckLater = errors.New("retry sanity check later")
- ErrRoundNotIncreasing = errors.New("round not increasing")
ErrRoundNotSwitch = errors.New("round not switch")
ErrIncorrectBlockRandomnessResult = errors.New(
"incorrect block randomness result")
@@ -142,19 +141,8 @@ type blockChain struct {
}
func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
- initConfig blockChainConfig, app Application, vGetter tsigVerifierGetter,
- signer *utils.Signer, logger common.Logger) *blockChain {
- if initBlock != nil {
- if initConfig.RoundID() != initBlock.Position.Round {
- panic(fmt.Errorf("incompatible config/block %s %d",
- initBlock, initConfig.RoundID()))
- }
- } else {
- if initConfig.RoundID() != 0 {
- panic(fmt.Errorf("genesis config should from round 0 %d",
- initConfig.RoundID()))
- }
- }
+ app Application, vGetter tsigVerifierGetter, signer *utils.Signer,
+ logger common.Logger) *blockChain {
return &blockChain{
ID: nID,
lastConfirmed: initBlock,
@@ -163,23 +151,58 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
vGetter: vGetter,
app: app,
logger: logger,
- configs: []blockChainConfig{initConfig},
dMoment: dMoment,
pendingRandomnesses: make(
map[types.Position]*types.BlockRandomnessResult),
}
}
-func (bc *blockChain) appendConfig(round uint64, config *types.Config) error {
- expectedRound := uint64(len(bc.configs))
- if bc.lastConfirmed != nil {
- expectedRound += bc.lastConfirmed.Position.Round
+func (bc *blockChain) notifyRoundEvents(evts []utils.RoundEventParam) error {
+ bc.lock.Lock()
+ defer bc.lock.Unlock()
+ apply := func(e utils.RoundEventParam) error {
+ if len(bc.configs) > 0 {
+ lastCfg := bc.configs[len(bc.configs)-1]
+ if e.BeginHeight != lastCfg.RoundEndHeight() {
+ return ErrInvalidBlockHeight
+ }
+ if lastCfg.RoundID() == e.Round {
+ bc.configs[len(bc.configs)-1].ExtendLength()
+ } else if lastCfg.RoundID()+1 == e.Round {
+ bc.configs = append(bc.configs, newBlockChainConfig(
+ lastCfg, e.Config))
+ } else {
+ return ErrInvalidRoundID
+ }
+ } else {
+ c := blockChainConfig{}
+ c.fromConfig(e.Round, e.Config)
+ c.SetRoundBeginHeight(e.BeginHeight)
+ if bc.lastConfirmed == nil {
+ if c.RoundID() != 0 {
+ panic(fmt.Errorf("genesis config should from round 0 %d",
+ c.RoundID()))
+ }
+ } else {
+ if c.RoundID() != bc.lastConfirmed.Position.Round {
+ panic(fmt.Errorf("incompatible config/block %s %d",
+ bc.lastConfirmed, c.RoundID()))
+ }
+ if !c.Contains(bc.lastConfirmed.Position.Height) {
+ panic(fmt.Errorf(
+ "unmatched round-event with block %s %d %d %d",
+ bc.lastConfirmed, e.Round, e.Reset, e.BeginHeight))
+ }
+ }
+ bc.configs = append(bc.configs, c)
+ }
+ return nil
}
- if round != expectedRound {
- return ErrRoundNotIncreasing
+ for _, e := range evts {
+ if err := apply(e); err != nil {
+ return err
+ }
}
- bc.configs = append(bc.configs, newBlockChainConfig(
- bc.configs[len(bc.configs)-1], config))
return nil
}
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 1996fbb..3fe7697 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -128,24 +128,27 @@ func (s *BlockChainTestSuite) newRandomnessFromBlock(
}
func (s *BlockChainTestSuite) newBlockChain(initB *types.Block,
- roundLength uint64) *blockChain {
+ roundLength uint64) (bc *blockChain) {
initRound := uint64(0)
if initB != nil {
initRound = initB.Position.Round
}
- initConfig := blockChainConfig{}
- initConfig.fromConfig(initRound, &types.Config{
- MinBlockInterval: s.blockInterval,
- RoundLength: roundLength,
- })
+ initHeight := uint64(0)
if initB != nil {
- initConfig.SetRoundBeginHeight(initB.Position.Height)
- } else {
- initConfig.SetRoundBeginHeight(0)
+ initHeight = initB.Position.Height
}
- return newBlockChain(s.nID, s.dMoment, initB, initConfig,
- test.NewApp(0, nil, nil), &testTSigVerifierGetter{}, s.signer,
- &common.NullLogger{})
+ bc = newBlockChain(s.nID, s.dMoment, initB, test.NewApp(0, nil, nil),
+ &testTSigVerifierGetter{}, s.signer, &common.NullLogger{})
+ s.Require().NoError(bc.notifyRoundEvents([]utils.RoundEventParam{
+ utils.RoundEventParam{
+ Round: initRound,
+ Reset: 0,
+ BeginHeight: initHeight,
+ Config: &types.Config{
+ MinBlockInterval: s.blockInterval,
+ RoundLength: roundLength,
+ }}}))
+ return
}
func (s *BlockChainTestSuite) newRoundOneInitBlock() *types.Block {
@@ -330,13 +333,32 @@ func (s *BlockChainTestSuite) TestSanityCheck() {
s.Require().NoError(bc.sanityCheck(b4))
}
-func (s *BlockChainTestSuite) TestAppendConfig() {
- bc := s.newBlockChain(nil, 10)
- s.Require().Equal(ErrRoundNotIncreasing.Error(),
- bc.appendConfig(0, &types.Config{}).Error())
- s.Require().Equal(ErrRoundNotIncreasing.Error(),
- bc.appendConfig(2, &types.Config{}).Error())
- s.Require().NoError(bc.appendConfig(1, &types.Config{}))
+func (s *BlockChainTestSuite) TestNotifyRoundEvents() {
+ roundLength := uint64(10)
+ bc := s.newBlockChain(nil, roundLength)
+ newEvent := func(round, reset, height uint64) []utils.RoundEventParam {
+ return []utils.RoundEventParam{
+ utils.RoundEventParam{
+ Round: round,
+ Reset: reset,
+ BeginHeight: height,
+ CRS: common.Hash{},
+ Config: &types.Config{RoundLength: roundLength},
+ }}
+ }
+ s.Require().Equal(ErrInvalidRoundID.Error(),
+ bc.notifyRoundEvents(newEvent(2, 0, roundLength)).Error())
+ s.Require().NoError(bc.notifyRoundEvents(newEvent(1, 0, roundLength)))
+ // Make sure new config is appended when new round is ready.
+ s.Require().Len(bc.configs, 2)
+ s.Require().Equal(ErrInvalidRoundID.Error(),
+ bc.notifyRoundEvents(newEvent(3, 1, roundLength*2)).Error())
+ s.Require().Equal(ErrInvalidBlockHeight.Error(),
+ bc.notifyRoundEvents(newEvent(1, 1, roundLength)).Error())
+ s.Require().NoError(bc.notifyRoundEvents(newEvent(1, 1, roundLength*2)))
+ // Make sure roundEndHeight is extended when DKG reset.
+ s.Require().Equal(bc.configs[len(bc.configs)-1].RoundEndHeight(),
+ roundLength*3)
}
func (s *BlockChainTestSuite) TestConfirmed() {
@@ -354,10 +376,16 @@ func (s *BlockChainTestSuite) TestConfirmed() {
func (s *BlockChainTestSuite) TestNextBlockAndTipRound() {
var roundLength uint64 = 3
bc := s.newBlockChain(nil, roundLength)
- s.Require().NoError(bc.appendConfig(1, &types.Config{
- MinBlockInterval: s.blockInterval,
- RoundLength: roundLength,
- }))
+ s.Require().NoError(bc.notifyRoundEvents([]utils.RoundEventParam{
+ utils.RoundEventParam{
+ Round: 1,
+ Reset: 0,
+ BeginHeight: roundLength,
+ CRS: common.Hash{},
+ Config: &types.Config{
+ MinBlockInterval: s.blockInterval,
+ RoundLength: roundLength,
+ }}}))
blocks := s.newBlocks(3, nil)
nextH, nextT := bc.nextBlock()
s.Require().Equal(nextH, uint64(0))
diff --git a/core/consensus.go b/core/consensus.go
index 5ee64c2..8529e40 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -56,18 +56,22 @@ var (
// consensusBAReceiver implements agreementReceiver.
type consensusBAReceiver struct {
// TODO(mission): consensus would be replaced by blockChain and network.
- consensus *Consensus
- agreementModule *agreement
- changeNotaryHeight uint64
- roundValue *atomic.Value
- isNotary bool
- restartNotary chan types.Position
+ consensus *Consensus
+ agreementModule *agreement
+ changeNotaryHeightValue *atomic.Value
+ roundValue *atomic.Value
+ isNotary bool
+ restartNotary chan types.Position
}
func (recv *consensusBAReceiver) round() uint64 {
return recv.roundValue.Load().(uint64)
}
+func (recv *consensusBAReceiver) changeNotaryHeight() uint64 {
+ return recv.changeNotaryHeightValue.Load().(uint64)
+}
+
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
@@ -247,16 +251,17 @@ CleanChannelLoop:
}
}
newPos := block.Position
- if block.Position.Height+1 == recv.changeNotaryHeight {
+ if block.Position.Height+1 == recv.changeNotaryHeight() {
newPos.Round++
recv.roundValue.Store(newPos.Round)
}
currentRound := recv.round()
- if block.Position.Height > recv.changeNotaryHeight &&
+ changeNotaryHeight := recv.changeNotaryHeight()
+ if block.Position.Height > changeNotaryHeight &&
block.Position.Round <= currentRound {
panic(fmt.Errorf(
"round not switch when confirmig: %s, %d, should switch at %d",
- block, currentRound, recv.changeNotaryHeight))
+ block, currentRound, changeNotaryHeight))
}
recv.restartNotary <- newPos
}
@@ -396,11 +401,11 @@ type Consensus struct {
bcModule *blockChain
dMoment time.Time
nodeSetCache *utils.NodeSetCache
- roundForNewConfig uint64
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
event *common.Event
+ roundEvent *utils.RoundEvent
logger common.Logger
resetRandomnessTicker chan struct{}
resetDeliveryGuardTicker chan struct{}
@@ -540,8 +545,10 @@ func newConsensusForRound(
}
// Get configuration for bootstrap round.
initRound := uint64(0)
+ initBlockHeight := uint64(0)
if initBlock != nil {
initRound = initBlock.Position.Round
+ initBlockHeight = initBlock.Position.Height
}
initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
initCRS := utils.GetCRSWithPanic(gov, initRound, logger)
@@ -566,10 +573,7 @@ func newConsensusForRound(
if usingNonBlocking {
appModule = newNonBlocking(app, debugApp)
}
- bcConfig := blockChainConfig{}
- bcConfig.fromConfig(initRound, initConfig)
- bcConfig.SetRoundBeginHeight(initRoundBeginHeight)
- bcModule := newBlockChain(ID, dMoment, initBlock, bcConfig, appModule,
+ bcModule := newBlockChain(ID, dMoment, initBlock, appModule,
NewTSigVerifierCache(gov, 7), signer, logger)
// Construct Consensus instance.
con := &Consensus{
@@ -594,6 +598,10 @@ func newConsensusForRound(
processBlockChan: make(chan *types.Block, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ if con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initRound,
+ initRoundBeginHeight, initBlockHeight, ConfigRoundShift); err != nil {
+ panic(err)
+ }
baConfig := agreementMgrConfig{}
baConfig.from(initRound, initConfig, initCRS)
baConfig.SetRoundBeginHeight(initRoundBeginHeight)
@@ -613,26 +621,139 @@ func newConsensusForRound(
// - the last finalized block
func (con *Consensus) prepare(
initRoundBeginHeight uint64, initBlock *types.Block) (err error) {
+ // Trigger the round validation method for the next round of the first
+ // round.
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
initRound := uint64(0)
if initBlock != nil {
initRound = initBlock.Position.Round
}
- // Setup blockChain module.
- con.roundForNewConfig = initRound + 1
- initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger)
- initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger)
- if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil {
- return
- }
if initRound == 0 {
if DKGDelayRound == 0 {
panic("not implemented yet")
}
}
- // Register events.
- con.initialRound(initRoundBeginHeight, initRound, initConfig)
+ // Register round event handler to update BA and BC modules.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ // Always updates newer configs to the later modules first in the flow.
+ if err := con.bcModule.notifyRoundEvents(evts); err != nil {
+ panic(err)
+ }
+ // The init config is provided to baModule when construction.
+ if evts[len(evts)-1].BeginHeight != initRoundBeginHeight {
+ if err := con.baMgr.notifyRoundEvents(evts); err != nil {
+ panic(err)
+ }
+ }
+ })
+ // Register round event handler to propose new CRS.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ // We don't have to propose new CRS during DKG reset, the reset of DKG
+ // would be done by the DKG set in previous round.
+ e := evts[len(evts)-1]
+ if e.Reset != 0 || e.Round < DKGDelayRound {
+ return
+ }
+ if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil {
+ con.logger.Error("Error getting DKG set when proposing CRS",
+ "round", e.Round,
+ "error", err)
+ } else {
+ if _, exist := curDkgSet[con.ID]; !exist {
+ return
+ }
+ con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) {
+ con.logger.Debug(
+ "Calling Governance.CRS to check if already proposed",
+ "round", e.Round+1)
+ if (con.gov.CRS(e.Round+1) != common.Hash{}) {
+ con.logger.Debug("CRS already proposed", "round", e.Round+1)
+ return
+ }
+ con.runCRS(e.Round, e.CRS)
+ })
+ }
+ })
+ // Touch nodeSetCache for next round.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ if e.Reset != 0 {
+ return
+ }
+ con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) {
+ if err := con.nodeSetCache.Touch(e.Round + 1); err != nil {
+ con.logger.Warn("Failed to update nodeSetCache",
+ "round", e.Round+1,
+ "error", err)
+ }
+ })
+ })
+ // checkCRS is a generator of checker to check if CRS for that round is
+ // ready or not.
+ checkCRS := func(round uint64) func() bool {
+ return func() bool {
+ nextCRS := con.gov.CRS(round)
+ if (nextCRS != common.Hash{}) {
+ return true
+ }
+ con.logger.Debug("CRS is not ready yet. Try again later...",
+ "nodeID", con.ID,
+ "round", round)
+ return false
+ }
+ }
+ // Trigger round validation method for next period.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ // Register a routine to trigger round events.
+ con.event.RegisterHeight(e.NextRoundValidationHeight(), func(
+ blockHeight uint64) {
+ con.roundEvent.ValidateNextRound(blockHeight)
+ })
+ // Register a routine to register next DKG.
+ con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
+ nextRound := e.Round + 1
+ if nextRound < DKGDelayRound {
+ con.logger.Info("Skip runDKG for round", "round", nextRound)
+ return
+ }
+ // Normally, gov.CRS would return non-nil. Use this for in case of
+ // unexpected network fluctuation and ensure the robustness.
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Debug("unable to prepare CRS for DKG set",
+ "round", nextRound)
+ return
+ }
+ nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
+ if err != nil {
+ con.logger.Error("Error getting DKG set for next round",
+ "round", nextRound,
+ "error", err)
+ return
+ }
+ if _, exist := nextDkgSet[con.ID]; !exist {
+ con.logger.Info("Not selected as DKG set", "round", nextRound)
+ return
+ }
+ con.logger.Info("Selected as DKG set", "round", nextRound)
+ nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
+ con.logger)
+ con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold(
+ nextConfig))
+ con.event.RegisterHeight(e.NextDKGPreparationHeight(),
+ func(uint64) {
+ func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgRunning = 0
+ }()
+ con.runDKG(nextRound, nextConfig)
+ })
+ })
+ })
+ con.roundEvent.TriggerInitEvent()
return
}
@@ -704,27 +825,9 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) {
}()
}
-func (con *Consensus) runCRS(round uint64) {
- for {
- con.logger.Debug("Calling Governance.CRS to check if already proposed",
- "round", round+1)
- if (con.gov.CRS(round+1) != common.Hash{}) {
- con.logger.Debug("CRS already proposed", "round", round+1)
- return
- }
- con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
- "round", round)
- if con.cfgModule.isDKGFinal(round) {
- break
- }
- con.logger.Debug("DKG is not ready for running CRS. Retry later...",
- "round", round)
- time.Sleep(500 * time.Millisecond)
- }
+func (con *Consensus) runCRS(round uint64, hash common.Hash) {
// Start running next round CRS.
- con.logger.Debug("Calling Governance.CRS", "round", round)
- psig, err := con.cfgModule.preparePartialSignature(
- round, utils.GetCRSWithPanic(con.gov, round, con.logger))
+ psig, err := con.cfgModule.preparePartialSignature(round, hash)
if err != nil {
con.logger.Error("Failed to prepare partial signature", "error", err)
} else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
@@ -751,130 +854,6 @@ func (con *Consensus) runCRS(round uint64) {
}
}
-func (con *Consensus) initialRound(
- startHeight uint64, round uint64, config *types.Config) {
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- if round >= DKGDelayRound {
- curDkgSet, err := con.nodeSetCache.GetDKGSet(round)
- if err != nil {
- con.logger.Error("Error getting DKG set", "round", round, "error", err)
- curDkgSet = make(map[types.NodeID]struct{})
- }
- // Initiate CRS routine.
- if _, exist := curDkgSet[con.ID]; exist {
- con.event.RegisterHeight(
- startHeight+config.RoundLength/2,
- func(uint64) {
- go func() {
- con.runCRS(round)
- }()
- })
- }
- }
- // checkCRS is a generator of checker to check if CRS for that round is
- // ready or not.
- checkCRS := func(round uint64) func() bool {
- return func() bool {
- nextCRS := con.gov.CRS(round)
- if (nextCRS != common.Hash{}) {
- return true
- }
- con.logger.Debug("CRS is not ready yet. Try again later...",
- "nodeID", con.ID,
- "round", round)
- return false
- }
- }
- // Initiate BA modules.
- con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) {
- go func(nextRound uint64) {
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for baMgr",
- "round", nextRound)
- return
- }
- // Notify BA for new round.
- nextConfig := utils.GetConfigWithPanic(
- con.gov, nextRound, con.logger)
- nextCRS := utils.GetCRSWithPanic(
- con.gov, nextRound, con.logger)
- con.logger.Info("appendConfig for baMgr", "round", nextRound)
- if err := con.baMgr.appendConfig(
- nextRound, nextConfig, nextCRS); err != nil {
- panic(err)
- }
- }(round + 1)
- })
- // Initiate DKG for this round.
- con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) {
- go func(nextRound uint64) {
- if nextRound < DKGDelayRound {
- con.logger.Info("Skip runDKG for round", "round", nextRound)
- return
- }
- // Normally, gov.CRS would return non-nil. Use this for in case of
- // unexpected network fluctuation and ensure the robustness.
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for DKG set",
- "round", nextRound)
- return
- }
- nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
- if err != nil {
- con.logger.Error("Error getting DKG set",
- "round", nextRound,
- "error", err)
- return
- }
- if _, exist := nextDkgSet[con.ID]; !exist {
- return
- }
- con.logger.Info("Selected as DKG set", "round", nextRound)
- con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold(config))
- con.event.RegisterHeight(startHeight+config.RoundLength*2/3,
- func(uint64) {
- func() {
- con.dkgReady.L.Lock()
- defer con.dkgReady.L.Unlock()
- con.dkgRunning = 0
- }()
- nextConfig := utils.GetConfigWithPanic(
- con.gov, nextRound, con.logger)
- con.runDKG(nextRound, nextConfig)
- })
- }(round + 1)
- })
- // Prepare blockChain module for next round and next "initialRound" routine.
- con.event.RegisterHeight(startHeight+config.RoundLength, func(uint64) {
- // Change round.
- // Get configuration for next round.
- nextRound := round + 1
- nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger)
- con.initialRound(
- startHeight+config.RoundLength, nextRound, nextConfig)
- })
- // Touch nodeSetCache for next round.
- con.event.RegisterHeight(startHeight+config.RoundLength*9/10, func(uint64) {
- go func() {
- // TODO(jimmy): check DKGResetCount and do not touch if nextRound is reset.
- if err := con.nodeSetCache.Touch(round + 1); err != nil {
- con.logger.Warn("Failed to update nodeSetCache",
- "round", round+1, "error", err)
- }
- if _, _, err := con.bcModule.vGetter.UpdateAndGet(round + 1); err != nil {
- con.logger.Warn("Failed to update tsigVerifierCache",
- "round", round+1, "error", err)
- }
- }()
- })
-}
-
// Stop the Consensus core.
func (con *Consensus) Stop() {
con.ctxCancel()
@@ -1195,24 +1174,6 @@ func (con *Consensus) deliverBlock(b *types.Block) {
con.cfgModule.untouchTSigHash(b.Hash)
con.logger.Debug("Calling Application.BlockDelivered", "block", b)
con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone())
- if b.Position.Round == con.roundForNewConfig {
- // Get configuration for the round next to next round. Configuration
- // for that round should be ready at this moment and is required for
- // blockChain module. This logic is related to:
- // - roundShift
- // - notifyGenesisRound
- futureRound := con.roundForNewConfig + 1
- futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger)
- con.logger.Debug("Append Config", "round", futureRound)
- if err := con.bcModule.appendConfig(
- futureRound, futureConfig); err != nil {
- con.logger.Debug("Unable to append config",
- "round", futureRound,
- "error", err)
- panic(err)
- }
- con.roundForNewConfig++
- }
if con.debugApp != nil {
con.debugApp.BlockReady(b.Hash)
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 8ebfb02..75f5422 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -283,7 +283,7 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
crsFinish := make(chan struct{})
for _, con := range cons {
go func(con *Consensus) {
- con.runCRS(0)
+ con.runCRS(0, gov.CRS(0))
crsFinish <- struct{}{}
}(con)
}
diff --git a/core/interfaces.go b/core/interfaces.go
index 7accac2..ddd6c3b 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -101,8 +101,12 @@ type Governance interface {
// Return the genesis configuration if round == 0.
Configuration(round uint64) *types.Config
- // CRS returns the CRS for a given round.
- // Return the genesis CRS if round == 0.
+ // CRS returns the CRS for a given round. Return the genesis CRS if
+ // round == 0.
+ //
+ // The CRS returned is the proposed or latest reseted one, it would be
+ // changed later if corresponding DKG set failed to generate group public
+ // key.
CRS(round uint64) common.Hash
// Propose a CRS of round.
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index fd48793..f2f8f9e 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -69,12 +69,14 @@ type Consensus struct {
configs []*types.Config
roundBeginHeights []uint64
agreementRoundCut uint64
+ heightEvt *common.Event
+ roundEvt *utils.RoundEvent
// lock for accessing all fields.
lock sync.RWMutex
duringBuffering bool
latestCRSRound uint64
- moduleWaitGroup sync.WaitGroup
+ waitGroup sync.WaitGroup
agreementWaitGroup sync.WaitGroup
pullChan chan common.Hash
receiveChan chan *types.Block
@@ -116,6 +118,7 @@ func NewConsensus(
receiveChan: make(chan *types.Block, 1000),
pullChan: make(chan common.Hash, 1000),
randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult),
+ heightEvt: common.NewEvent(),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
_, con.initChainTipHeight = db.GetCompactionChainTipInfo()
@@ -143,9 +146,73 @@ func (con *Consensus) assureBuffering() {
return
}
con.duringBuffering = true
+ // Get latest block to prepare utils.RoundEvent.
+ var (
+ err error
+ blockHash, height = con.db.GetCompactionChainTipInfo()
+ )
+ if height == 0 {
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger,
+ uint64(0), uint64(0), uint64(0), core.ConfigRoundShift)
+ } else {
+ var b types.Block
+ if b, err = con.db.GetBlock(blockHash); err == nil {
+ beginHeight := con.roundBeginHeights[b.Position.Round]
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov,
+ con.logger, b.Position.Round, beginHeight, beginHeight,
+ core.ConfigRoundShift)
+ }
+ }
+ if err != nil {
+ panic(err)
+ }
+ // Make sure con.roundEvt stopped before stopping con.agreementModule.
+ con.waitGroup.Add(1)
+ // Register a round event handler to notify CRS to agreementModule.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ con.waitGroup.Add(1)
+ go func() {
+ defer con.waitGroup.Done()
+ for _, e := range evts {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ for func() bool {
+ select {
+ case <-con.ctx.Done():
+ return false
+ case con.agreementModule.inputChan <- e.Round:
+ return false
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Warn(
+ "agreement input channel is full when putting CRS",
+ "round", e.Round,
+ )
+ return true
+ }
+ }() {
+ }
+ }
+ }()
+ })
+ // Register a round event handler to validate next round.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func(
+ blockHeight uint64) {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ con.roundEvt.ValidateNextRound(blockHeight)
+ })
+ })
+ con.roundEvt.TriggerInitEvent()
con.startAgreement()
con.startNetwork()
- con.startCRSMonitor()
}
func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
@@ -265,6 +332,7 @@ func (con *Consensus) SyncBlocks(
b.Hash, b.Finalization.Height); err != nil {
return
}
+ go con.heightEvt.NotifyHeight(b.Finalization.Height)
}
if latest {
con.assureBuffering()
@@ -346,7 +414,10 @@ func (con *Consensus) stopBuffering() {
return
}
con.logger.Trace("stop syncer modules")
- con.moduleWaitGroup.Wait()
+ con.roundEvt.Stop()
+ con.waitGroup.Done()
+ // Wait for all routines depends on con.agreementModule stopped.
+ con.waitGroup.Wait()
// Since there is no one waiting for the receive channel of fullnode, we
// need to launch a dummy receiver right away.
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
@@ -492,9 +563,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
- con.moduleWaitGroup.Add(1)
+ con.waitGroup.Add(1)
go func() {
- defer con.moduleWaitGroup.Done()
+ defer con.waitGroup.Done()
loop:
for {
select {
@@ -522,62 +593,6 @@ func (con *Consensus) startNetwork() {
}()
}
-// startCRSMonitor is the dummiest way to verify if the CRS for one round
-// is ready or not.
-func (con *Consensus) startCRSMonitor() {
- var lastNotifiedRound uint64
- // Notify all agreements for new CRS.
- notifyNewCRS := func(round uint64) {
- con.setupConfigsUntilRound(round)
- if round == lastNotifiedRound {
- return
- }
- con.logger.Debug("CRS is ready", "round", round)
- lastNotifiedRound = round
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- con.latestCRSRound = round
- }()
- for func() bool {
- select {
- case <-con.ctx.Done():
- return false
- case con.agreementModule.inputChan <- round:
- return false
- case <-time.After(500 * time.Millisecond):
- con.logger.Debug(
- "agreement input channel is full when putting CRS",
- "round", round,
- )
- return true
- }
- }() {
- }
- }
- con.moduleWaitGroup.Add(1)
- go func() {
- defer con.moduleWaitGroup.Done()
- for {
- select {
- case <-con.ctx.Done():
- return
- case <-time.After(500 * time.Millisecond):
- }
- // Notify agreement modules for the latest round that CRS is
- // available if the round is not notified yet.
- checked := lastNotifiedRound + 1
- for (con.gov.CRS(checked) != common.Hash{}) {
- checked++
- }
- checked--
- if checked > lastNotifiedRound {
- notifyNewCRS(checked)
- }
- }
- }()
-}
-
func (con *Consensus) stopAgreement() {
if con.agreementModule.inputChan != nil {
close(con.agreementModule.inputChan)
diff --git a/core/test/app.go b/core/test/app.go
index 4cfd580..df58135 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -109,11 +109,13 @@ func NewApp(initRound uint64, gov *Governance, rEvt *utils.RoundEvent) (
}
if rEvt != nil {
app.hEvt.RegisterHeight(
- utils.GetNextRoundCheckpoint(rEvt.LastPeriod()), func(h uint64) {
+ utils.GetNextRoundValidationHeight(rEvt.LastPeriod()),
+ func(h uint64) {
rEvt.ValidateNextRound(h)
})
rEvt.Register(func(evts []utils.RoundEventParam) {
- app.hEvt.RegisterHeight(evts[len(evts)-1].NextRoundCheckpoint(),
+ app.hEvt.RegisterHeight(
+ evts[len(evts)-1].NextRoundValidationHeight(),
func(h uint64) {
rEvt.ValidateNextRound(h)
})
diff --git a/core/utils.go b/core/utils.go
index f7dee75..5742d11 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -236,14 +236,14 @@ func checkWithCancel(parentCtx context.Context, interval time.Duration,
defer cancel()
Loop:
for {
+ if ret = checker(); ret {
+ return
+ }
select {
case <-ctx.Done():
break Loop
case <-time.After(interval):
}
- if ret = checker(); ret {
- return
- }
}
return
}
diff --git a/core/utils/round-event.go b/core/utils/round-event.go
index bab1d32..1ce877d 100644
--- a/core/utils/round-event.go
+++ b/core/utils/round-event.go
@@ -28,15 +28,15 @@ import (
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)
-// ErrUnmatchedBlockHeightWithGov is for invalid parameters for NewRoundEvent.
-type ErrUnmatchedBlockHeightWithGov struct {
+// ErrUnmatchedBlockHeightWithConfig is for invalid parameters for NewRoundEvent.
+type ErrUnmatchedBlockHeightWithConfig struct {
round uint64
reset uint64
blockHeight uint64
}
-func (e ErrUnmatchedBlockHeightWithGov) Error() string {
- return fmt.Sprintf("unsynced block height and gov: round:%d reset:%d h:%d",
+func (e ErrUnmatchedBlockHeightWithConfig) Error() string {
+ return fmt.Sprintf("unsynced block height and cfg: round:%d reset:%d h:%d",
e.round, e.reset, e.blockHeight)
}
@@ -56,11 +56,43 @@ type RoundEventParam struct {
CRS common.Hash
}
-// NextRoundCheckpoint returns the height to check if the next round is ready.
-func (e RoundEventParam) NextRoundCheckpoint() uint64 {
+// NextRoundValidationHeight returns the height to check if the next round is
+// ready.
+func (e RoundEventParam) NextRoundValidationHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength*9/10
+}
+
+// NextCRSProposingHeight returns the height to propose CRS for next round.
+func (e RoundEventParam) NextCRSProposingHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength/2
+}
+
+// NextDKGPreparationHeight returns the height to prepare DKG set for next
+// round.
+func (e RoundEventParam) NextDKGPreparationHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength*2/3
+}
+
+// NextRoundHeight returns the height of the beginning of next round.
+func (e RoundEventParam) NextRoundHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength
+}
+
+// NextTouchNodeSetCacheHeight returns the height to touch the node set cache.
+func (e RoundEventParam) NextTouchNodeSetCacheHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength*9/10
+}
+
+// NextDKGResetHeight returns the height to reset DKG for next period.
+func (e RoundEventParam) NextDKGResetHeight() uint64 {
return e.BeginHeight + e.Config.RoundLength*8/10
}
+// NextDKGRegisterHeight returns the height to register DKG.
+func (e RoundEventParam) NextDKGRegisterHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength/2
+}
+
// roundEventFn defines the fingerprint of handlers of round events.
type roundEventFn func([]RoundEventParam)
@@ -131,7 +163,7 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor,
e.config.ExtendLength()
}
if !e.config.Contains(initBlockHeight) {
- return nil, ErrUnmatchedBlockHeightWithGov{
+ return nil, ErrUnmatchedBlockHeightWithConfig{
round: initRound,
reset: resetCount,
blockHeight: initBlockHeight,
@@ -149,6 +181,22 @@ func (e *RoundEvent) Register(h roundEventFn) {
e.handlers = append(e.handlers, h)
}
+// TriggerInitEvent triggers event from the initial setting.
+func (e *RoundEvent) TriggerInitEvent() {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+ events := []RoundEventParam{RoundEventParam{
+ Round: e.lastTriggeredRound,
+ Reset: e.lastTriggeredResetCount,
+ BeginHeight: e.config.LastPeriodBeginHeight(),
+ CRS: GetCRSWithPanic(e.gov, e.lastTriggeredRound, e.logger),
+ Config: GetConfigWithPanic(e.gov, e.lastTriggeredRound, e.logger),
+ }}
+ for _, h := range e.handlers {
+ h(events)
+ }
+}
+
// ValidateNextRound validate if the DKG set for next round is ready to go or
// failed to setup, all registered handlers would be called once some decision
// is made on chain.
@@ -225,14 +273,6 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) (
"crs", param.CRS.String()[:6],
)
}()
- // Make sure current last config covers the blockHeight.
- if !e.config.Contains(blockHeight) {
- panic(ErrUnmatchedBlockHeightWithGov{
- round: e.lastTriggeredRound,
- reset: e.lastTriggeredResetCount,
- blockHeight: blockHeight,
- })
- }
nextRound := e.lastTriggeredRound + 1
if nextRound >= startRound+e.roundShift {
// Avoid access configuration newer than last confirmed one over
diff --git a/core/utils/utils.go b/core/utils/utils.go
index d714068..14687d6 100644
--- a/core/utils/utils.go
+++ b/core/utils/utils.go
@@ -139,8 +139,8 @@ func GetDKGThreshold(config *types.Config) int {
return int(config.DKGSetSize/3) + 1
}
-// GetNextRoundCheckpoint returns the block height to check if the next round
-// is ready.
-func GetNextRoundCheckpoint(begin, length uint64) uint64 {
+// GetNextRoundValidationHeight returns the block height to check if the next
+// round is ready.
+func GetNextRoundValidationHeight(begin, length uint64) uint64 {
return begin + length*9/10
}
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 5131465..70e6c1f 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -180,7 +180,7 @@ func (s *ConsensusTestSuite) syncBlocksWithSomeNode(
}
syncerHeight = b.Finalization.Height + 1
compactionChainBlocks = append(compactionChainBlocks, &b)
- if len(compactionChainBlocks) >= 100 {
+ if len(compactionChainBlocks) >= 20 {
if syncBlocks() {
break
}
@@ -340,9 +340,9 @@ func (s *ConsensusTestSuite) TestSync() {
req = s.Require()
peerCount = 4
dMoment = time.Now().UTC()
- untilRound = uint64(5)
- stopRound = uint64(3)
- aliveRound = uint64(1)
+ untilRound = uint64(7)
+ stopRound = uint64(5)
+ aliveRound = uint64(4)
errChan = make(chan error, 100)
)
prvKeys, pubKeys, err := test.NewKeys(peerCount)
@@ -442,7 +442,7 @@ ReachAlive:
select {
case <-runnerCtx.Done():
break SyncLoop
- case <-time.After(2 * time.Second):
+ case <-time.After(4 * time.Second):
}
}
}()
diff --git a/integration_test/round-event_test.go b/integration_test/round-event_test.go
index f8cac26..8201bfe 100644
--- a/integration_test/round-event_test.go
+++ b/integration_test/round-event_test.go
@@ -221,6 +221,35 @@ func (s *RoundEventTestSuite) TestLastPeriod() {
s.Require().Equal(length, uint64(200))
}
+func (s *RoundEventTestSuite) TestTriggerInitEvent() {
+ gov := s.prepareGov()
+ s.Require().NoError(gov.State().RequestChange(test.StateChangeRoundLength,
+ uint64(100)))
+ gov.CatchUpWithRound(0)
+ s.Require().NoError(gov.State().RequestChange(test.StateChangeRoundLength,
+ uint64(200)))
+ gov.CatchUpWithRound(1)
+ // Prepare utils.RoundEvent, starts from genesis.
+ rEvt, err := utils.NewRoundEvent(
+ context.Background(), gov, s.logger, 0, 0, 0, core.ConfigRoundShift)
+ s.Require().NoError(err)
+ // Register a handler to collects triggered events.
+ var evts []evtParamToCheck
+ rEvt.Register(func(params []utils.RoundEventParam) {
+ for _, p := range params {
+ evts = append(evts, evtParamToCheck{
+ round: p.Round,
+ reset: p.Reset,
+ height: p.BeginHeight,
+ crs: p.CRS,
+ })
+ }
+ })
+ rEvt.TriggerInitEvent()
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0], evtParamToCheck{0, 0, 0, gov.CRS(0)})
+}
+
func TestRoundEvent(t *testing.T) {
suite.Run(t, new(RoundEventTestSuite))
}