aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go84
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go76
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go472
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go17
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go162
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go148
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go6
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go70
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go6
9 files changed, 658 insertions, 383 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
index 7b5effba8..0e39fa52a 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
@@ -90,8 +90,6 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config,
type baRoundSetting struct {
notarySet map[types.NodeID]struct{}
- agr *agreement
- recv *consensusBAReceiver
ticker Ticker
crs common.Hash
}
@@ -111,6 +109,7 @@ type agreementMgr struct {
initRound uint64
configs []agreementMgrConfig
baModule *agreement
+ recv *consensusBAReceiver
processedBAResult map[types.Position]struct{}
voteFilter *utils.VoteFilter
waitGroup sync.WaitGroup
@@ -136,15 +135,17 @@ func newAgreementMgr(con *Consensus, initRound uint64,
configs: []agreementMgrConfig{initConfig},
voteFilter: utils.NewVoteFilter(),
}
- recv := &consensusBAReceiver{
- consensus: con,
- restartNotary: make(chan types.Position, 1),
- roundValue: &atomic.Value{},
+ mgr.recv = &consensusBAReceiver{
+ consensus: con,
+ restartNotary: make(chan types.Position, 1),
+ roundValue: &atomic.Value{},
+ changeNotaryHeightValue: &atomic.Value{},
}
- recv.roundValue.Store(uint64(0))
+ mgr.recv.roundValue.Store(uint64(0))
+ mgr.recv.changeNotaryHeightValue.Store(uint64(0))
agr := newAgreement(
mgr.ID,
- recv,
+ mgr.recv,
newLeaderSelector(genValidLeader(mgr), mgr.logger),
mgr.signer,
mgr.logger)
@@ -156,7 +157,7 @@ func newAgreementMgr(con *Consensus, initRound uint64,
agr.notarySet = nodes.GetSubSet(
int(initConfig.notarySetSize), types.NewNotarySetTarget(initConfig.crs))
// Hacky way to make agreement module self contained.
- recv.agreementModule = agr
+ mgr.recv.agreementModule = agr
mgr.baModule = agr
return
}
@@ -188,15 +189,43 @@ func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig {
return &mgr.configs[roundIndex]
}
-func (mgr *agreementMgr) appendConfig(
- round uint64, config *types.Config, crs common.Hash) (err error) {
+func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error {
mgr.lock.Lock()
defer mgr.lock.Unlock()
- if round != uint64(len(mgr.configs))+mgr.initRound {
- return ErrRoundNotIncreasing
+ apply := func(e utils.RoundEventParam) error {
+ if len(mgr.configs) > 0 {
+ lastCfg := mgr.configs[len(mgr.configs)-1]
+ if e.BeginHeight != lastCfg.RoundEndHeight() {
+ return ErrInvalidBlockHeight
+ }
+ if lastCfg.RoundID() == e.Round {
+ mgr.configs[len(mgr.configs)-1].ExtendLength()
+ // It's not an atomic operation to update an atomic value based
+ // on another. However, it's the best way so far to extend
+ // length of round without refactoring.
+ if mgr.recv.round() == e.Round {
+ mgr.recv.changeNotaryHeightValue.Store(
+ mgr.configs[len(mgr.configs)-1].RoundEndHeight())
+ }
+ } else if lastCfg.RoundID()+1 == e.Round {
+ mgr.configs = append(mgr.configs, newAgreementMgrConfig(
+ lastCfg, e.Config, e.CRS))
+ } else {
+ return ErrInvalidRoundID
+ }
+ } else {
+ c := agreementMgrConfig{}
+ c.from(e.Round, e.Config, e.CRS)
+ c.SetRoundBeginHeight(e.BeginHeight)
+ mgr.configs = append(mgr.configs, c)
+ }
+ return nil
+ }
+ for _, e := range evts {
+ if err := apply(e); err != nil {
+ return err
+ }
}
- mgr.configs = append(mgr.configs, newAgreementMgrConfig(
- mgr.configs[len(mgr.configs)-1], config, crs))
return nil
}
@@ -252,7 +281,7 @@ func (mgr *agreementMgr) processAgreementResult(
}
} else if result.Position.Newer(aID) {
mgr.logger.Info("Fast syncing BA", "position", result.Position)
- nodes, err := mgr.cache.GetNodeSet(result.Position.Round)
+ nIDs, err := mgr.cache.GetNotarySet(result.Position.Round)
if err != nil {
return err
}
@@ -261,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult(
mgr.network.PullBlocks(common.Hashes{result.BlockHash})
mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round)
crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger)
- nIDs := nodes.GetSubSet(
- int(utils.GetConfigWithPanic(
- mgr.gov, result.Position.Round, mgr.logger).NotarySetSize),
- types.NewNotarySetTarget(crs))
for key := range result.Votes {
if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
return err
@@ -296,10 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) {
currentRound uint64
nextRound = initRound
curConfig = mgr.config(initRound)
- setting = baRoundSetting{
- agr: mgr.baModule,
- recv: mgr.baModule.data.recv.(*consensusBAReceiver),
- }
+ setting = baRoundSetting{}
tickDuration time.Duration
)
@@ -353,12 +375,12 @@ Loop:
break Loop
default:
}
- setting.recv.isNotary = checkRound()
+ mgr.recv.isNotary = checkRound()
// Run BA for this round.
- setting.recv.roundValue.Store(currentRound)
- setting.recv.changeNotaryHeight = curConfig.RoundEndHeight()
- setting.recv.restartNotary <- types.Position{
- Round: setting.recv.round(),
+ mgr.recv.roundValue.Store(currentRound)
+ mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight())
+ mgr.recv.restartNotary <- types.Position{
+ Round: mgr.recv.round(),
Height: math.MaxUint64,
}
mgr.voteFilter = utils.NewVoteFilter()
@@ -373,8 +395,8 @@ Loop:
func (mgr *agreementMgr) baRoutineForOneRound(
setting *baRoundSetting) (err error) {
- agr := setting.agr
- recv := setting.recv
+ agr := mgr.baModule
+ recv := mgr.recv
oldPos := agr.agreementID()
restart := func(restartPos types.Position) (breakLoop bool, err error) {
if !isStop(restartPos) {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go
index 19a580b4f..c5a22b628 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go
@@ -41,7 +41,6 @@ var (
ErrNotFollowTipPosition = errors.New("not follow tip position")
ErrDuplicatedPendingBlock = errors.New("duplicated pending block")
ErrRetrySanityCheckLater = errors.New("retry sanity check later")
- ErrRoundNotIncreasing = errors.New("round not increasing")
ErrRoundNotSwitch = errors.New("round not switch")
ErrIncorrectBlockRandomnessResult = errors.New(
"incorrect block randomness result")
@@ -142,19 +141,8 @@ type blockChain struct {
}
func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
- initConfig blockChainConfig, app Application, vGetter tsigVerifierGetter,
- signer *utils.Signer, logger common.Logger) *blockChain {
- if initBlock != nil {
- if initConfig.RoundID() != initBlock.Position.Round {
- panic(fmt.Errorf("incompatible config/block %s %d",
- initBlock, initConfig.RoundID()))
- }
- } else {
- if initConfig.RoundID() != 0 {
- panic(fmt.Errorf("genesis config should from round 0 %d",
- initConfig.RoundID()))
- }
- }
+ app Application, vGetter tsigVerifierGetter, signer *utils.Signer,
+ logger common.Logger) *blockChain {
return &blockChain{
ID: nID,
lastConfirmed: initBlock,
@@ -163,23 +151,58 @@ func newBlockChain(nID types.NodeID, dMoment time.Time, initBlock *types.Block,
vGetter: vGetter,
app: app,
logger: logger,
- configs: []blockChainConfig{initConfig},
dMoment: dMoment,
pendingRandomnesses: make(
map[types.Position]*types.BlockRandomnessResult),
}
}
-func (bc *blockChain) appendConfig(round uint64, config *types.Config) error {
- expectedRound := uint64(len(bc.configs))
- if bc.lastConfirmed != nil {
- expectedRound += bc.lastConfirmed.Position.Round
+func (bc *blockChain) notifyRoundEvents(evts []utils.RoundEventParam) error {
+ bc.lock.Lock()
+ defer bc.lock.Unlock()
+ apply := func(e utils.RoundEventParam) error {
+ if len(bc.configs) > 0 {
+ lastCfg := bc.configs[len(bc.configs)-1]
+ if e.BeginHeight != lastCfg.RoundEndHeight() {
+ return ErrInvalidBlockHeight
+ }
+ if lastCfg.RoundID() == e.Round {
+ bc.configs[len(bc.configs)-1].ExtendLength()
+ } else if lastCfg.RoundID()+1 == e.Round {
+ bc.configs = append(bc.configs, newBlockChainConfig(
+ lastCfg, e.Config))
+ } else {
+ return ErrInvalidRoundID
+ }
+ } else {
+ c := blockChainConfig{}
+ c.fromConfig(e.Round, e.Config)
+ c.SetRoundBeginHeight(e.BeginHeight)
+ if bc.lastConfirmed == nil {
+ if c.RoundID() != 0 {
+ panic(fmt.Errorf("genesis config should from round 0 %d",
+ c.RoundID()))
+ }
+ } else {
+ if c.RoundID() != bc.lastConfirmed.Position.Round {
+ panic(fmt.Errorf("incompatible config/block %s %d",
+ bc.lastConfirmed, c.RoundID()))
+ }
+ if !c.Contains(bc.lastConfirmed.Position.Height) {
+ panic(fmt.Errorf(
+ "unmatched round-event with block %s %d %d %d",
+ bc.lastConfirmed, e.Round, e.Reset, e.BeginHeight))
+ }
+ }
+ bc.configs = append(bc.configs, c)
+ }
+ return nil
}
- if round != expectedRound {
- return ErrRoundNotIncreasing
+ for _, e := range evts {
+ if err := apply(e); err != nil {
+ return err
+ }
}
- bc.configs = append(bc.configs, newBlockChainConfig(
- bc.configs[len(bc.configs)-1], config))
return nil
}
@@ -558,8 +581,11 @@ func (bc *blockChain) prepareBlock(position types.Position,
}
if tipConfig.IsLastBlock(tip) {
if tip.Position.Round+1 != position.Round {
- b, err = nil, ErrRoundNotSwitch
- return
+ if !empty {
+ b, err = nil, ErrRoundNotSwitch
+ return
+ }
+ b.Position.Round = tip.Position.Round + 1
}
} else {
if tip.Position.Round != position.Round {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index 4201cbcc2..8529e4031 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -56,18 +56,22 @@ var (
// consensusBAReceiver implements agreementReceiver.
type consensusBAReceiver struct {
// TODO(mission): consensus would be replaced by blockChain and network.
- consensus *Consensus
- agreementModule *agreement
- changeNotaryHeight uint64
- roundValue *atomic.Value
- isNotary bool
- restartNotary chan types.Position
+ consensus *Consensus
+ agreementModule *agreement
+ changeNotaryHeightValue *atomic.Value
+ roundValue *atomic.Value
+ isNotary bool
+ restartNotary chan types.Position
}
func (recv *consensusBAReceiver) round() uint64 {
return recv.roundValue.Load().(uint64)
}
+func (recv *consensusBAReceiver) changeNotaryHeight() uint64 {
+ return recv.changeNotaryHeightValue.Load().(uint64)
+}
+
func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) {
if !recv.isNotary {
return
@@ -247,16 +251,17 @@ CleanChannelLoop:
}
}
newPos := block.Position
- if block.Position.Height+1 == recv.changeNotaryHeight {
+ if block.Position.Height+1 == recv.changeNotaryHeight() {
newPos.Round++
recv.roundValue.Store(newPos.Round)
}
currentRound := recv.round()
- if block.Position.Height > recv.changeNotaryHeight &&
+ changeNotaryHeight := recv.changeNotaryHeight()
+ if block.Position.Height > changeNotaryHeight &&
block.Position.Round <= currentRound {
panic(fmt.Errorf(
"round not switch when confirmig: %s, %d, should switch at %d",
- block, currentRound, recv.changeNotaryHeight))
+ block, currentRound, changeNotaryHeight))
}
recv.restartNotary <- newPos
}
@@ -396,11 +401,11 @@ type Consensus struct {
bcModule *blockChain
dMoment time.Time
nodeSetCache *utils.NodeSetCache
- roundForNewConfig uint64
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
event *common.Event
+ roundEvent *utils.RoundEvent
logger common.Logger
resetRandomnessTicker chan struct{}
resetDeliveryGuardTicker chan struct{}
@@ -453,6 +458,7 @@ func NewConsensusForSimulation(
func NewConsensusFromSyncer(
initBlock *types.Block,
initRoundBeginHeight uint64,
+ startWithEmpty bool,
dMoment time.Time,
app Application,
gov Governance,
@@ -495,6 +501,23 @@ func NewConsensusFromSyncer(
continue
}
}
+ if startWithEmpty {
+ pos := initBlock.Position
+ pos.Height++
+ block, err := con.bcModule.addEmptyBlock(pos)
+ if err != nil {
+ panic(err)
+ }
+ con.processBlockChan <- block
+ if pos.Round >= DKGDelayRound {
+ rand := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ IsEmptyBlock: true,
+ }
+ go con.prepareRandomnessResult(rand)
+ }
+ }
return con, nil
}
@@ -522,8 +545,10 @@ func newConsensusForRound(
}
// Get configuration for bootstrap round.
initRound := uint64(0)
+ initBlockHeight := uint64(0)
if initBlock != nil {
initRound = initBlock.Position.Round
+ initBlockHeight = initBlock.Position.Height
}
initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
initCRS := utils.GetCRSWithPanic(gov, initRound, logger)
@@ -548,10 +573,7 @@ func newConsensusForRound(
if usingNonBlocking {
appModule = newNonBlocking(app, debugApp)
}
- bcConfig := blockChainConfig{}
- bcConfig.fromConfig(initRound, initConfig)
- bcConfig.SetRoundBeginHeight(initRoundBeginHeight)
- bcModule := newBlockChain(ID, dMoment, initBlock, bcConfig, appModule,
+ bcModule := newBlockChain(ID, dMoment, initBlock, appModule,
NewTSigVerifierCache(gov, 7), signer, logger)
// Construct Consensus instance.
con := &Consensus{
@@ -576,6 +598,10 @@ func newConsensusForRound(
processBlockChan: make(chan *types.Block, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ if con.roundEvent, err = utils.NewRoundEvent(con.ctx, gov, logger, initRound,
+ initRoundBeginHeight, initBlockHeight, ConfigRoundShift); err != nil {
+ panic(err)
+ }
baConfig := agreementMgrConfig{}
baConfig.from(initRound, initConfig, initCRS)
baConfig.SetRoundBeginHeight(initRoundBeginHeight)
@@ -595,26 +621,139 @@ func newConsensusForRound(
// - the last finalized block
func (con *Consensus) prepare(
initRoundBeginHeight uint64, initBlock *types.Block) (err error) {
+ // Trigger the round validation method for the next round of the first
+ // round.
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
initRound := uint64(0)
if initBlock != nil {
initRound = initBlock.Position.Round
}
- // Setup blockChain module.
- con.roundForNewConfig = initRound + 1
- initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger)
- initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger)
- if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil {
- return
- }
if initRound == 0 {
if DKGDelayRound == 0 {
panic("not implemented yet")
}
}
- // Register events.
- con.initialRound(initRoundBeginHeight, initRound, initConfig)
+ // Register round event handler to update BA and BC modules.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ // Always updates newer configs to the later modules first in the flow.
+ if err := con.bcModule.notifyRoundEvents(evts); err != nil {
+ panic(err)
+ }
+ // The init config is provided to baModule when construction.
+ if evts[len(evts)-1].BeginHeight != initRoundBeginHeight {
+ if err := con.baMgr.notifyRoundEvents(evts); err != nil {
+ panic(err)
+ }
+ }
+ })
+ // Register round event handler to propose new CRS.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ // We don't have to propose new CRS during DKG reset, the reset of DKG
+ // would be done by the DKG set in previous round.
+ e := evts[len(evts)-1]
+ if e.Reset != 0 || e.Round < DKGDelayRound {
+ return
+ }
+ if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil {
+ con.logger.Error("Error getting DKG set when proposing CRS",
+ "round", e.Round,
+ "error", err)
+ } else {
+ if _, exist := curDkgSet[con.ID]; !exist {
+ return
+ }
+ con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) {
+ con.logger.Debug(
+ "Calling Governance.CRS to check if already proposed",
+ "round", e.Round+1)
+ if (con.gov.CRS(e.Round+1) != common.Hash{}) {
+ con.logger.Debug("CRS already proposed", "round", e.Round+1)
+ return
+ }
+ con.runCRS(e.Round, e.CRS)
+ })
+ }
+ })
+ // Touch nodeSetCache for next round.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ if e.Reset != 0 {
+ return
+ }
+ con.event.RegisterHeight(e.NextTouchNodeSetCacheHeight(), func(uint64) {
+ if err := con.nodeSetCache.Touch(e.Round + 1); err != nil {
+ con.logger.Warn("Failed to update nodeSetCache",
+ "round", e.Round+1,
+ "error", err)
+ }
+ })
+ })
+ // checkCRS is a generator of checker to check if CRS for that round is
+ // ready or not.
+ checkCRS := func(round uint64) func() bool {
+ return func() bool {
+ nextCRS := con.gov.CRS(round)
+ if (nextCRS != common.Hash{}) {
+ return true
+ }
+ con.logger.Debug("CRS is not ready yet. Try again later...",
+ "nodeID", con.ID,
+ "round", round)
+ return false
+ }
+ }
+ // Trigger round validation method for next period.
+ con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ // Register a routine to trigger round events.
+ con.event.RegisterHeight(e.NextRoundValidationHeight(), func(
+ blockHeight uint64) {
+ con.roundEvent.ValidateNextRound(blockHeight)
+ })
+ // Register a routine to register next DKG.
+ con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
+ nextRound := e.Round + 1
+ if nextRound < DKGDelayRound {
+ con.logger.Info("Skip runDKG for round", "round", nextRound)
+ return
+ }
+ // Normally, gov.CRS would return non-nil. Use this for in case of
+ // unexpected network fluctuation and ensure the robustness.
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Debug("unable to prepare CRS for DKG set",
+ "round", nextRound)
+ return
+ }
+ nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
+ if err != nil {
+ con.logger.Error("Error getting DKG set for next round",
+ "round", nextRound,
+ "error", err)
+ return
+ }
+ if _, exist := nextDkgSet[con.ID]; !exist {
+ con.logger.Info("Not selected as DKG set", "round", nextRound)
+ return
+ }
+ con.logger.Info("Selected as DKG set", "round", nextRound)
+ nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
+ con.logger)
+ con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold(
+ nextConfig))
+ con.event.RegisterHeight(e.NextDKGPreparationHeight(),
+ func(uint64) {
+ func() {
+ con.dkgReady.L.Lock()
+ defer con.dkgReady.L.Unlock()
+ con.dkgRunning = 0
+ }()
+ con.runDKG(nextRound, nextConfig)
+ })
+ })
+ })
+ con.roundEvent.TriggerInitEvent()
return
}
@@ -686,27 +825,9 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) {
}()
}
-func (con *Consensus) runCRS(round uint64) {
- for {
- con.logger.Debug("Calling Governance.CRS to check if already proposed",
- "round", round+1)
- if (con.gov.CRS(round+1) != common.Hash{}) {
- con.logger.Debug("CRS already proposed", "round", round+1)
- return
- }
- con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
- "round", round)
- if con.cfgModule.isDKGFinal(round) {
- break
- }
- con.logger.Debug("DKG is not ready for running CRS. Retry later...",
- "round", round)
- time.Sleep(500 * time.Millisecond)
- }
+func (con *Consensus) runCRS(round uint64, hash common.Hash) {
// Start running next round CRS.
- con.logger.Debug("Calling Governance.CRS", "round", round)
- psig, err := con.cfgModule.preparePartialSignature(
- round, utils.GetCRSWithPanic(con.gov, round, con.logger))
+ psig, err := con.cfgModule.preparePartialSignature(round, hash)
if err != nil {
con.logger.Error("Failed to prepare partial signature", "error", err)
} else if err = con.signer.SignDKGPartialSignature(psig); err != nil {
@@ -733,136 +854,16 @@ func (con *Consensus) runCRS(round uint64) {
}
}
-func (con *Consensus) initialRound(
- startHeight uint64, round uint64, config *types.Config) {
- select {
- case <-con.ctx.Done():
- return
- default:
- }
- if round >= DKGDelayRound {
- curDkgSet, err := con.nodeSetCache.GetDKGSet(round)
- if err != nil {
- con.logger.Error("Error getting DKG set", "round", round, "error", err)
- curDkgSet = make(map[types.NodeID]struct{})
- }
- // Initiate CRS routine.
- if _, exist := curDkgSet[con.ID]; exist {
- con.event.RegisterHeight(
- startHeight+config.RoundLength/2,
- func(uint64) {
- go func() {
- con.runCRS(round)
- }()
- })
- }
- }
- // checkCRS is a generator of checker to check if CRS for that round is
- // ready or not.
- checkCRS := func(round uint64) func() bool {
- return func() bool {
- nextCRS := con.gov.CRS(round)
- if (nextCRS != common.Hash{}) {
- return true
- }
- con.logger.Debug("CRS is not ready yet. Try again later...",
- "nodeID", con.ID,
- "round", round)
- return false
- }
- }
- // Initiate BA modules.
- con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) {
- go func(nextRound uint64) {
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for baMgr",
- "round", nextRound)
- return
- }
- // Notify BA for new round.
- nextConfig := utils.GetConfigWithPanic(
- con.gov, nextRound, con.logger)
- nextCRS := utils.GetCRSWithPanic(
- con.gov, nextRound, con.logger)
- con.logger.Info("appendConfig for baMgr", "round", nextRound)
- if err := con.baMgr.appendConfig(
- nextRound, nextConfig, nextCRS); err != nil {
- panic(err)
- }
- }(round + 1)
- })
- // Initiate DKG for this round.
- con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) {
- go func(nextRound uint64) {
- if nextRound < DKGDelayRound {
- con.logger.Info("Skip runDKG for round", "round", nextRound)
- return
- }
- // Normally, gov.CRS would return non-nil. Use this for in case of
- // unexpected network fluctuation and ensure the robustness.
- if !checkWithCancel(
- con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Debug("unable to prepare CRS for DKG set",
- "round", nextRound)
- return
- }
- nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
- if err != nil {
- con.logger.Error("Error getting DKG set",
- "round", nextRound,
- "error", err)
- return
- }
- if _, exist := nextDkgSet[con.ID]; !exist {
- return
- }
- con.logger.Info("Selected as DKG set", "round", nextRound)
- con.cfgModule.registerDKG(nextRound, utils.GetDKGThreshold(config))
- con.event.RegisterHeight(startHeight+config.RoundLength*2/3,
- func(uint64) {
- func() {
- con.dkgReady.L.Lock()
- defer con.dkgReady.L.Unlock()
- con.dkgRunning = 0
- }()
- nextConfig := utils.GetConfigWithPanic(
- con.gov, nextRound, con.logger)
- con.runDKG(nextRound, nextConfig)
- })
- }(round + 1)
- })
- // Prepare blockChain module for next round and next "initialRound" routine.
- con.event.RegisterHeight(startHeight+config.RoundLength, func(uint64) {
- // Change round.
- // Get configuration for next round.
- nextRound := round + 1
- nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger)
- con.initialRound(
- startHeight+config.RoundLength, nextRound, nextConfig)
- })
- // Touch nodeSetCache for next round.
- con.event.RegisterHeight(startHeight+config.RoundLength*9/10, func(uint64) {
- go func() {
- // TODO(jimmy): check DKGResetCount and do not touch if nextRound is reset.
- if err := con.nodeSetCache.Touch(round + 1); err != nil {
- con.logger.Warn("Failed to update nodeSetCache",
- "round", round+1, "error", err)
- }
- if _, _, err := con.bcModule.vGetter.UpdateAndGet(round + 1); err != nil {
- con.logger.Warn("Failed to update tsigVerifierCache",
- "round", round+1, "error", err)
- }
- }()
- })
-}
-
// Stop the Consensus core.
func (con *Consensus) Stop() {
con.ctxCancel()
con.baMgr.stop()
con.event.Reset()
con.waitGroup.Wait()
+ if nbApp, ok := con.app.(*nonBlocking); ok {
+ fmt.Println("Stopping nonBlocking App")
+ nbApp.wait()
+ }
}
func (con *Consensus) deliverNetworkMsg() {
@@ -1014,62 +1015,64 @@ func (con *Consensus) ProcessAgreementResult(
con.logger.Debug("Rebroadcast AgreementResult",
"result", rand)
con.network.BroadcastAgreementResult(rand)
+ go con.prepareRandomnessResult(rand)
+ return nil
+}
- go func() {
- dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
- if err != nil {
- con.logger.Error("Failed to get dkg set",
- "round", rand.Position.Round, "error", err)
- return
- }
- if _, exist := dkgSet[con.ID]; !exist {
- return
- }
- psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
- if err != nil {
- con.logger.Error("Failed to prepare psig",
- "round", rand.Position.Round,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.signer.SignDKGPartialSignature(psig); err != nil {
- con.logger.Error("Failed to sign psig",
+func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) {
+ dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
+ if err != nil {
+ con.logger.Error("Failed to get dkg set",
+ "round", rand.Position.Round, "error", err)
+ return
+ }
+ if _, exist := dkgSet[con.ID]; !exist {
+ return
+ }
+ con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash)
+ psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
+ if err != nil {
+ con.logger.Error("Failed to prepare psig",
+ "round", rand.Position.Round,
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ if err = con.signer.SignDKGPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to sign psig",
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ con.logger.Error("Failed process psig",
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
+ "proposer", psig.ProposerID,
+ "round", psig.Round,
+ "hash", psig.Hash.String()[:6])
+ con.network.BroadcastDKGPartialSignature(psig)
+ tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
+ if err != nil {
+ if err != ErrTSigAlreadyRunning {
+ con.logger.Error("Failed to run TSIG",
+ "position", rand.Position,
"hash", rand.BlockHash.String()[:6],
"error", err)
- return
}
- if err = con.cfgModule.processPartialSignature(psig); err != nil {
- con.logger.Error("Failed process psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
- "proposer", psig.ProposerID,
- "round", psig.Round,
- "hash", psig.Hash.String()[:6])
- con.network.BroadcastDKGPartialSignature(psig)
- tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
- if err != nil {
- if err != ErrTSigAlreadyRunning {
- con.logger.Error("Failed to run TSIG",
- "position", rand.Position,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- }
- return
- }
- result := &types.BlockRandomnessResult{
- BlockHash: rand.BlockHash,
- Position: rand.Position,
- Randomness: tsig.Signature,
- }
- // ProcessBlockRandomnessResult is not thread-safe so we put the result in
- // the message channnel to be processed in the main thread.
- con.msgChan <- result
- }()
- return nil
+ return
+ }
+ result := &types.BlockRandomnessResult{
+ BlockHash: rand.BlockHash,
+ Position: rand.Position,
+ Randomness: tsig.Signature,
+ }
+ // ProcessBlockRandomnessResult is not thread-safe so we put the result in
+ // the message channnel to be processed in the main thread.
+ con.msgChan <- result
}
// ProcessBlockRandomnessResult processes the randomness result.
@@ -1094,14 +1097,6 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- var exist bool
- exist, err = con.nodeSetCache.Exists(b.Position.Round, b.ProposerID)
- if err != nil {
- return
- }
- if !exist {
- return ErrProposerNotInNodeSet
- }
err = con.baMgr.processBlock(b)
if err == nil && con.debugApp != nil {
con.debugApp.BlockReceived(b.Hash)
@@ -1137,7 +1132,10 @@ func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
time.Sleep(con.dMoment.Sub(time.Now()))
// Node takes time to start.
- time.Sleep(60 * time.Second)
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(60 * time.Second):
+ }
for {
select {
case <-con.ctx.Done():
@@ -1176,24 +1174,6 @@ func (con *Consensus) deliverBlock(b *types.Block) {
con.cfgModule.untouchTSigHash(b.Hash)
con.logger.Debug("Calling Application.BlockDelivered", "block", b)
con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone())
- if b.Position.Round == con.roundForNewConfig {
- // Get configuration for the round next to next round. Configuration
- // for that round should be ready at this moment and is required for
- // blockChain module. This logic is related to:
- // - roundShift
- // - notifyGenesisRound
- futureRound := con.roundForNewConfig + 1
- futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger)
- con.logger.Debug("Append Config", "round", futureRound)
- if err := con.bcModule.appendConfig(
- futureRound, futureConfig); err != nil {
- con.logger.Debug("Unable to append config",
- "round", futureRound,
- "error", err)
- panic(err)
- }
- con.roundForNewConfig++
- }
if con.debugApp != nil {
con.debugApp.BlockReady(b.Hash)
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
index 45a1fc7d5..ddd6c3bb9 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
@@ -101,8 +101,12 @@ type Governance interface {
// Return the genesis configuration if round == 0.
Configuration(round uint64) *types.Config
- // CRS returns the CRS for a given round.
- // Return the genesis CRS if round == 0.
+ // CRS returns the CRS for a given round. Return the genesis CRS if
+ // round == 0.
+ //
+ // The CRS returned is the proposed or latest reseted one, it would be
+ // changed later if corresponding DKG set failed to generate group public
+ // key.
CRS(round uint64) common.Hash
// Propose a CRS of round.
@@ -162,3 +166,12 @@ type Ticker interface {
// Retart the ticker and clear all internal data.
Restart()
}
+
+// Recovery interface for interacting with recovery information.
+type Recovery interface {
+ // ProposeSkipBlock proposes a skip block.
+ ProposeSkipBlock(height uint64) error
+
+ // Votes gets the number of votes of given height.
+ Votes(height uint64) (uint64, error)
+}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
index 25911ce5f..f2f8f9e66 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
@@ -69,12 +69,14 @@ type Consensus struct {
configs []*types.Config
roundBeginHeights []uint64
agreementRoundCut uint64
+ heightEvt *common.Event
+ roundEvt *utils.RoundEvent
// lock for accessing all fields.
lock sync.RWMutex
duringBuffering bool
latestCRSRound uint64
- moduleWaitGroup sync.WaitGroup
+ waitGroup sync.WaitGroup
agreementWaitGroup sync.WaitGroup
pullChan chan common.Hash
receiveChan chan *types.Block
@@ -82,6 +84,7 @@ type Consensus struct {
ctxCancel context.CancelFunc
syncedLastBlock *types.Block
syncedConsensus *core.Consensus
+ syncedSkipNext bool
dummyCancel context.CancelFunc
dummyFinished <-chan struct{}
dummyMsgBuffer []interface{}
@@ -115,6 +118,7 @@ func NewConsensus(
receiveChan: make(chan *types.Block, 1000),
pullChan: make(chan common.Hash, 1000),
randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult),
+ heightEvt: common.NewEvent(),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
_, con.initChainTipHeight = db.GetCompactionChainTipInfo()
@@ -142,9 +146,73 @@ func (con *Consensus) assureBuffering() {
return
}
con.duringBuffering = true
+ // Get latest block to prepare utils.RoundEvent.
+ var (
+ err error
+ blockHash, height = con.db.GetCompactionChainTipInfo()
+ )
+ if height == 0 {
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger,
+ uint64(0), uint64(0), uint64(0), core.ConfigRoundShift)
+ } else {
+ var b types.Block
+ if b, err = con.db.GetBlock(blockHash); err == nil {
+ beginHeight := con.roundBeginHeights[b.Position.Round]
+ con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov,
+ con.logger, b.Position.Round, beginHeight, beginHeight,
+ core.ConfigRoundShift)
+ }
+ }
+ if err != nil {
+ panic(err)
+ }
+ // Make sure con.roundEvt stopped before stopping con.agreementModule.
+ con.waitGroup.Add(1)
+ // Register a round event handler to notify CRS to agreementModule.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ con.waitGroup.Add(1)
+ go func() {
+ defer con.waitGroup.Done()
+ for _, e := range evts {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ for func() bool {
+ select {
+ case <-con.ctx.Done():
+ return false
+ case con.agreementModule.inputChan <- e.Round:
+ return false
+ case <-time.After(500 * time.Millisecond):
+ con.logger.Warn(
+ "agreement input channel is full when putting CRS",
+ "round", e.Round,
+ )
+ return true
+ }
+ }() {
+ }
+ }
+ }()
+ })
+ // Register a round event handler to validate next round.
+ con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+ e := evts[len(evts)-1]
+ con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func(
+ blockHeight uint64) {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ con.roundEvt.ValidateNextRound(blockHeight)
+ })
+ })
+ con.roundEvt.TriggerInitEvent()
con.startAgreement()
con.startNetwork()
- con.startCRSMonitor()
}
func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) {
@@ -180,6 +248,29 @@ func (con *Consensus) buildAllEmptyBlocks() {
}
}
+// ForceSync forces syncer to become synced.
+func (con *Consensus) ForceSync(skip bool) {
+ if con.syncedLastBlock != nil {
+ return
+ }
+ hash, _ := con.db.GetCompactionChainTipInfo()
+ var block types.Block
+ block, err := con.db.GetBlock(hash)
+ if err != nil {
+ panic(err)
+ }
+ con.logger.Info("Force Sync", "block", &block)
+ con.setupConfigsUntilRound(block.Position.Round + core.ConfigRoundShift - 1)
+ con.syncedLastBlock = &block
+ con.stopBuffering()
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ con.syncedSkipNext = skip
+}
+
// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
@@ -241,6 +332,7 @@ func (con *Consensus) SyncBlocks(
b.Hash, b.Finalization.Height); err != nil {
return
}
+ go con.heightEvt.NotifyHeight(b.Finalization.Height)
}
if latest {
con.assureBuffering()
@@ -279,6 +371,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.syncedConsensus, err = core.NewConsensusFromSyncer(
con.syncedLastBlock,
con.roundBeginHeights[con.syncedLastBlock.Position.Round],
+ con.syncedSkipNext,
con.dMoment,
con.app,
con.gov,
@@ -321,7 +414,10 @@ func (con *Consensus) stopBuffering() {
return
}
con.logger.Trace("stop syncer modules")
- con.moduleWaitGroup.Wait()
+ con.roundEvt.Stop()
+ con.waitGroup.Done()
+ // Wait for all routines depends on con.agreementModule stopped.
+ con.waitGroup.Wait()
// Since there is no one waiting for the receive channel of fullnode, we
// need to launch a dummy receiver right away.
con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
@@ -467,9 +563,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) {
// startNetwork starts network for receiving blocks and agreement results.
func (con *Consensus) startNetwork() {
- con.moduleWaitGroup.Add(1)
+ con.waitGroup.Add(1)
go func() {
- defer con.moduleWaitGroup.Done()
+ defer con.waitGroup.Done()
loop:
for {
select {
@@ -497,62 +593,6 @@ func (con *Consensus) startNetwork() {
}()
}
-// startCRSMonitor is the dummiest way to verify if the CRS for one round
-// is ready or not.
-func (con *Consensus) startCRSMonitor() {
- var lastNotifiedRound uint64
- // Notify all agreements for new CRS.
- notifyNewCRS := func(round uint64) {
- con.setupConfigsUntilRound(round)
- if round == lastNotifiedRound {
- return
- }
- con.logger.Debug("CRS is ready", "round", round)
- lastNotifiedRound = round
- func() {
- con.lock.Lock()
- defer con.lock.Unlock()
- con.latestCRSRound = round
- }()
- for func() bool {
- select {
- case <-con.ctx.Done():
- return false
- case con.agreementModule.inputChan <- round:
- return false
- case <-time.After(500 * time.Millisecond):
- con.logger.Debug(
- "agreement input channel is full when putting CRS",
- "round", round,
- )
- return true
- }
- }() {
- }
- }
- con.moduleWaitGroup.Add(1)
- go func() {
- defer con.moduleWaitGroup.Done()
- for {
- select {
- case <-con.ctx.Done():
- return
- case <-time.After(500 * time.Millisecond):
- }
- // Notify agreement modules for the latest round that CRS is
- // available if the round is not notified yet.
- checked := lastNotifiedRound + 1
- for (con.gov.CRS(checked) != common.Hash{}) {
- checked++
- }
- checked--
- if checked > lastNotifiedRound {
- notifyNewCRS(checked)
- }
- }
- }()
-}
-
func (con *Consensus) stopAgreement() {
if con.agreementModule.inputChan != nil {
close(con.agreementModule.inputChan)
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go
new file mode 100644
index 000000000..d08bff9e9
--- /dev/null
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/watch-cat.go
@@ -0,0 +1,148 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "context"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+type configReader interface {
+ Configuration(round uint64) *types.Config
+}
+
+// WatchCat is reponsible for signaling if syncer object should be terminated.
+type WatchCat struct {
+ recovery core.Recovery
+ timeout time.Duration
+ configReader configReader
+ feed chan types.Position
+ polling time.Duration
+ ctx context.Context
+ cancel context.CancelFunc
+ logger common.Logger
+}
+
+// NewWatchCat creats a new WatchCat 🐱 object.
+func NewWatchCat(
+ recovery core.Recovery,
+ configReader configReader,
+ polling time.Duration,
+ timeout time.Duration,
+ logger common.Logger) *WatchCat {
+ wc := &WatchCat{
+ recovery: recovery,
+ timeout: timeout,
+ configReader: configReader,
+ feed: make(chan types.Position),
+ polling: polling,
+ logger: logger,
+ }
+ return wc
+}
+
+// Feed the WatchCat so it won't produce the termination signal.
+func (wc *WatchCat) Feed(position types.Position) {
+ wc.feed <- position
+}
+
+// Start the WatchCat.
+func (wc *WatchCat) Start() {
+ wc.Stop()
+ wc.ctx, wc.cancel = context.WithCancel(context.Background())
+ go func() {
+ var lastPos types.Position
+ MonitorLoop:
+ for {
+ select {
+ case <-wc.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-wc.ctx.Done():
+ return
+ case pos := <-wc.feed:
+ if !pos.Newer(lastPos) {
+ wc.logger.Warn("Feed with older height",
+ "pos", pos, "lastPos", lastPos)
+ continue
+ }
+ lastPos = pos
+ case <-time.After(wc.timeout):
+ break MonitorLoop
+ }
+ }
+ go func() {
+ for {
+ select {
+ case <-wc.ctx.Done():
+ return
+ case <-wc.feed:
+ }
+ }
+ }()
+ defer wc.cancel()
+ proposed := false
+ threshold := uint64(
+ utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger).
+ NotarySetSize / 2)
+ wc.logger.Info("Threshold for recovery", "votes", threshold)
+ ResetLoop:
+ for {
+ if !proposed {
+ wc.logger.Info("Calling Recovery.ProposeSkipBlock",
+ "height", lastPos.Height)
+ if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil {
+ wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err)
+ } else {
+ proposed = true
+ }
+ }
+ votes, err := wc.recovery.Votes(lastPos.Height)
+ if err != nil {
+ wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err)
+ } else if votes > threshold {
+ wc.logger.Info("Threshold for recovery reached!")
+ break ResetLoop
+ }
+ select {
+ case <-wc.ctx.Done():
+ return
+ case <-time.After(wc.polling):
+ }
+ }
+ }()
+}
+
+// Stop the WatchCat.
+func (wc *WatchCat) Stop() {
+ if wc.cancel != nil {
+ wc.cancel()
+ }
+}
+
+// Meow return a closed channel if syncer should be terminated.
+func (wc *WatchCat) Meow() <-chan struct{} {
+ return wc.ctx.Done()
+}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go
index f7dee757f..5742d113a 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go
@@ -236,14 +236,14 @@ func checkWithCancel(parentCtx context.Context, interval time.Duration,
defer cancel()
Loop:
for {
+ if ret = checker(); ret {
+ return
+ }
select {
case <-ctx.Done():
break Loop
case <-time.After(interval):
}
- if ret = checker(); ret {
- return
- }
}
return
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go
index bab1d32d2..1ce877dda 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/round-event.go
@@ -28,15 +28,15 @@ import (
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)
-// ErrUnmatchedBlockHeightWithGov is for invalid parameters for NewRoundEvent.
-type ErrUnmatchedBlockHeightWithGov struct {
+// ErrUnmatchedBlockHeightWithConfig is for invalid parameters for NewRoundEvent.
+type ErrUnmatchedBlockHeightWithConfig struct {
round uint64
reset uint64
blockHeight uint64
}
-func (e ErrUnmatchedBlockHeightWithGov) Error() string {
- return fmt.Sprintf("unsynced block height and gov: round:%d reset:%d h:%d",
+func (e ErrUnmatchedBlockHeightWithConfig) Error() string {
+ return fmt.Sprintf("unsynced block height and cfg: round:%d reset:%d h:%d",
e.round, e.reset, e.blockHeight)
}
@@ -56,11 +56,43 @@ type RoundEventParam struct {
CRS common.Hash
}
-// NextRoundCheckpoint returns the height to check if the next round is ready.
-func (e RoundEventParam) NextRoundCheckpoint() uint64 {
+// NextRoundValidationHeight returns the height to check if the next round is
+// ready.
+func (e RoundEventParam) NextRoundValidationHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength*9/10
+}
+
+// NextCRSProposingHeight returns the height to propose CRS for next round.
+func (e RoundEventParam) NextCRSProposingHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength/2
+}
+
+// NextDKGPreparationHeight returns the height to prepare DKG set for next
+// round.
+func (e RoundEventParam) NextDKGPreparationHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength*2/3
+}
+
+// NextRoundHeight returns the height of the beginning of next round.
+func (e RoundEventParam) NextRoundHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength
+}
+
+// NextTouchNodeSetCacheHeight returns the height to touch the node set cache.
+func (e RoundEventParam) NextTouchNodeSetCacheHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength*9/10
+}
+
+// NextDKGResetHeight returns the height to reset DKG for next period.
+func (e RoundEventParam) NextDKGResetHeight() uint64 {
return e.BeginHeight + e.Config.RoundLength*8/10
}
+// NextDKGRegisterHeight returns the height to register DKG.
+func (e RoundEventParam) NextDKGRegisterHeight() uint64 {
+ return e.BeginHeight + e.Config.RoundLength/2
+}
+
// roundEventFn defines the fingerprint of handlers of round events.
type roundEventFn func([]RoundEventParam)
@@ -131,7 +163,7 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor,
e.config.ExtendLength()
}
if !e.config.Contains(initBlockHeight) {
- return nil, ErrUnmatchedBlockHeightWithGov{
+ return nil, ErrUnmatchedBlockHeightWithConfig{
round: initRound,
reset: resetCount,
blockHeight: initBlockHeight,
@@ -149,6 +181,22 @@ func (e *RoundEvent) Register(h roundEventFn) {
e.handlers = append(e.handlers, h)
}
+// TriggerInitEvent triggers event from the initial setting.
+func (e *RoundEvent) TriggerInitEvent() {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+ events := []RoundEventParam{RoundEventParam{
+ Round: e.lastTriggeredRound,
+ Reset: e.lastTriggeredResetCount,
+ BeginHeight: e.config.LastPeriodBeginHeight(),
+ CRS: GetCRSWithPanic(e.gov, e.lastTriggeredRound, e.logger),
+ Config: GetConfigWithPanic(e.gov, e.lastTriggeredRound, e.logger),
+ }}
+ for _, h := range e.handlers {
+ h(events)
+ }
+}
+
// ValidateNextRound validate if the DKG set for next round is ready to go or
// failed to setup, all registered handlers would be called once some decision
// is made on chain.
@@ -225,14 +273,6 @@ func (e *RoundEvent) check(blockHeight, startRound uint64, lastDKGCheck bool) (
"crs", param.CRS.String()[:6],
)
}()
- // Make sure current last config covers the blockHeight.
- if !e.config.Contains(blockHeight) {
- panic(ErrUnmatchedBlockHeightWithGov{
- round: e.lastTriggeredRound,
- reset: e.lastTriggeredResetCount,
- blockHeight: blockHeight,
- })
- }
nextRound := e.lastTriggeredRound + 1
if nextRound >= startRound+e.roundShift {
// Avoid access configuration newer than last confirmed one over
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go
index b8bd95ec4..14687d6ac 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go
@@ -138,3 +138,9 @@ func LaunchDummyReceiver(
func GetDKGThreshold(config *types.Config) int {
return int(config.DKGSetSize/3) + 1
}
+
+// GetNextRoundValidationHeight returns the block height to check if the next
+// round is ready.
+func GetNextRoundValidationHeight(begin, length uint64) uint64 {
+ return begin + length*9/10
+}