aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go152
1 files changed, 107 insertions, 45 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
index 9e863696a..a8fab7c69 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
@@ -94,6 +94,7 @@ type agreementMgr struct {
initRound uint64
configs []*agreementMgrConfig
baModules []*agreement
+ voteFilters []*utils.VoteFilter
waitGroup sync.WaitGroup
pendingVotes map[uint64][]*types.Vote
pendingBlocks map[uint64][]*types.Block
@@ -201,6 +202,7 @@ func (mgr *agreementMgr) appendConfig(
// Hacky way to make agreement module self contained.
recv.agreementModule = agrModule
mgr.baModules = append(mgr.baModules, agrModule)
+ mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter())
if mgr.isRunning {
mgr.waitGroup.Add(1)
go func(idx uint32) {
@@ -213,7 +215,6 @@ func (mgr *agreementMgr) appendConfig(
}
func (mgr *agreementMgr) processVote(v *types.Vote) error {
- v = v.Clone()
mgr.lock.RLock()
defer mgr.lock.RUnlock()
if v.Position.ChainID >= uint32(len(mgr.baModules)) {
@@ -224,7 +225,16 @@ func (mgr *agreementMgr) processVote(v *types.Vote) error {
"initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
- return mgr.baModules[v.Position.ChainID].processVote(v)
+ filter := mgr.voteFilters[v.Position.ChainID]
+ if filter.Filter(v) {
+ return nil
+ }
+ v = v.Clone()
+ err := mgr.baModules[v.Position.ChainID].processVote(v)
+ if err == nil {
+ mgr.baModules[v.Position.ChainID].updateFilter(filter)
+ }
+ return err
}
func (mgr *agreementMgr) processBlock(b *types.Block) error {
@@ -419,7 +429,11 @@ Loop:
// Run BA for this round.
recv.roundValue.Store(currentRound)
recv.changeNotaryTime = roundEndTime
- recv.restartNotary <- types.Position{ChainID: math.MaxUint32}
+ recv.restartNotary <- types.Position{
+ Round: setting.recv.round(),
+ ChainID: math.MaxUint32,
+ }
+ mgr.voteFilters[chainID] = utils.NewVoteFilter()
if err := mgr.baRoutineForOneRound(&setting); err != nil {
mgr.logger.Error("BA routine failed",
"error", err,
@@ -435,6 +449,79 @@ func (mgr *agreementMgr) baRoutineForOneRound(
agr := setting.agr
recv := setting.recv
oldPos := agr.agreementID()
+ restart := func(restartPos types.Position) (breakLoop bool, err error) {
+ if !isStop(restartPos) {
+ if restartPos.Round > oldPos.Round {
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break
+ default:
+ }
+ tipRound := mgr.lattice.TipRound(setting.chainID)
+ if tipRound > restartPos.Round {
+ // It's a vary rare that this go routine sleeps for entire round.
+ break
+ } else if tipRound != restartPos.Round {
+ mgr.logger.Debug("Waiting lattice to change round...",
+ "pos", &restartPos)
+ } else {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ // This round is finished.
+ breakLoop = true
+ return
+ }
+ if restartPos.Older(&oldPos) {
+ // The restartNotary event is triggered by 'BlockConfirmed'
+ // of some older block.
+ return
+ }
+ }
+ var nextHeight uint64
+ var nextTime time.Time
+ for {
+ nextHeight, nextTime, err =
+ mgr.lattice.NextBlock(recv.round(), setting.chainID)
+ if err != nil {
+ mgr.logger.Debug("Error getting next height",
+ "error", err,
+ "round", recv.round(),
+ "chainID", setting.chainID)
+ err = nil
+ nextHeight = restartPos.Height
+ }
+ if isStop(oldPos) && nextHeight == 0 {
+ break
+ }
+ if isStop(restartPos) && nextHeight == 0 {
+ break
+ }
+ if nextHeight > restartPos.Height {
+ break
+ }
+ mgr.logger.Debug("Lattice not ready!!!",
+ "old", &oldPos, "restart", &restartPos, "next", nextHeight)
+ time.Sleep(100 * time.Millisecond)
+ }
+ nextPos := types.Position{
+ Round: recv.round(),
+ ChainID: setting.chainID,
+ Height: nextHeight,
+ }
+ oldPos = nextPos
+ var leader types.NodeID
+ leader, err = mgr.cache.GetLeaderNode(nextPos)
+ if err != nil {
+ return
+ }
+ time.Sleep(nextTime.Sub(time.Now()))
+ setting.ticker.Restart()
+ agr.restart(setting.notarySet, nextPos, leader, setting.crs)
+ return
+ }
Loop:
for {
select {
@@ -442,55 +529,30 @@ Loop:
break Loop
default:
}
- select {
- case restartPos := <-recv.restartNotary:
- if !isStop(restartPos) {
- if restartPos.Round > oldPos.Round {
- // This round is finished.
- break Loop
- }
- if restartPos.Older(&oldPos) {
- // The restartNotary event is triggered by 'BlockConfirmed'
- // of some older block.
- break
- }
- }
- var nextHeight uint64
- var nextTime time.Time
- for {
- nextHeight, nextTime, err =
- mgr.lattice.NextBlock(recv.round(), setting.chainID)
+ if agr.confirmed() {
+ // Block until receive restartPos
+ select {
+ case restartPos := <-recv.restartNotary:
+ breakLoop, err := restart(restartPos)
if err != nil {
- mgr.logger.Debug("Error getting next height",
- "error", err,
- "round", recv.round(),
- "chainID", setting.chainID)
- err = nil
- nextHeight = restartPos.Height
+ return err
}
- if isStop(restartPos) || nextHeight == 0 {
- break
- }
- if nextHeight > restartPos.Height {
- break
+ if breakLoop {
+ break Loop
}
- mgr.logger.Debug("Lattice not ready!!!",
- "old", &restartPos, "next", nextHeight)
- time.Sleep(100 * time.Millisecond)
- }
- nextPos := types.Position{
- Round: recv.round(),
- ChainID: setting.chainID,
- Height: nextHeight,
+ case <-mgr.ctx.Done():
+ break Loop
}
- oldPos = nextPos
- leader, err := mgr.cache.GetLeaderNode(nextPos)
+ }
+ select {
+ case restartPos := <-recv.restartNotary:
+ breakLoop, err := restart(restartPos)
if err != nil {
return err
}
- time.Sleep(nextTime.Sub(time.Now()))
- setting.ticker.Restart()
- agr.restart(setting.notarySet, nextPos, leader, setting.crs)
+ if breakLoop {
+ break Loop
+ }
default:
}
if agr.pullVotes() {