aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go156
1 files changed, 99 insertions, 57 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
index 423174c48..8cb4c2e37 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
@@ -22,11 +22,11 @@ import (
"errors"
"math"
"sync"
- "sync/atomic"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/types"
+ typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
@@ -35,6 +35,7 @@ var (
ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished")
ErrRoundOutOfRange = errors.New("round out of range")
ErrInvalidBlock = errors.New("invalid block")
+ ErrNoValidLeader = errors.New("no valid leader")
)
const maxResultCache = 100
@@ -89,7 +90,9 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config,
}
type baRoundSetting struct {
- notarySet map[types.NodeID]struct{}
+ round uint64
+ dkgSet map[types.NodeID]struct{}
+ threshold int
ticker Ticker
crs common.Hash
}
@@ -132,13 +135,9 @@ func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) {
voteFilter: utils.NewVoteFilter(),
}
mgr.recv = &consensusBAReceiver{
- consensus: con,
- restartNotary: make(chan types.Position, 1),
- roundValue: &atomic.Value{},
- changeNotaryHeightValue: &atomic.Value{},
+ consensus: con,
+ restartNotary: make(chan types.Position, 1),
}
- mgr.recv.updateRound(uint64(0))
- mgr.recv.changeNotaryHeightValue.Store(uint64(0))
return mgr, nil
}
@@ -177,6 +176,19 @@ func (mgr *agreementMgr) run() {
}()
}
+func (mgr *agreementMgr) calcLeader(
+ dkgSet map[types.NodeID]struct{},
+ crs common.Hash, pos types.Position) (
+ types.NodeID, error) {
+ nodeSet := types.NewNodeSetFromMap(dkgSet)
+ leader := nodeSet.GetSubSet(1, types.NewNodeLeaderTarget(
+ crs, pos.Height))
+ for nID := range leader {
+ return nID, nil
+ }
+ return types.NodeID{}, ErrNoValidLeader
+}
+
func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig {
mgr.lock.RLock()
defer mgr.lock.RUnlock()
@@ -201,13 +213,6 @@ func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error {
}
if lastCfg.RoundID() == e.Round {
mgr.configs[len(mgr.configs)-1].ExtendLength()
- // It's not an atomic operation to update an atomic value based
- // on another. However, it's the best way so far to extend
- // length of round without refactoring.
- if mgr.recv.round() == e.Round {
- mgr.recv.changeNotaryHeightValue.Store(
- mgr.configs[len(mgr.configs)-1].RoundEndHeight())
- }
} else if lastCfg.RoundID()+1 == e.Round {
mgr.configs = append(mgr.configs, newAgreementMgrConfig(
lastCfg, e.Config, e.CRS))
@@ -285,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult(
}
} else if result.Position.Newer(aID) {
mgr.logger.Info("Fast syncing BA", "position", result.Position)
- nIDs, err := mgr.cache.GetNotarySet(result.Position.Round)
- if err != nil {
- return err
- }
if result.Position.Round < DKGDelayRound {
mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA",
"hash", result.BlockHash)
@@ -299,13 +300,19 @@ func (mgr *agreementMgr) processAgreementResult(
}
}
}
- mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round)
- crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger)
- leader, err := mgr.cache.GetLeaderNode(result.Position)
+ setting := mgr.generateSetting(result.Position.Round)
+ if setting == nil {
+ mgr.logger.Warn("unable to get setting", "round",
+ result.Position.Round)
+ return ErrConfigurationNotReady
+ }
+ leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position)
if err != nil {
return err
}
- mgr.baModule.restart(nIDs, result.Position, leader, crs)
+ mgr.baModule.restart(
+ setting.dkgSet, setting.threshold,
+ result.Position, leader, setting.crs)
if result.Position.Round >= DKGDelayRound {
return mgr.baModule.processAgreementResult(result)
}
@@ -333,57 +340,87 @@ func (mgr *agreementMgr) stop() {
mgr.waitGroup.Wait()
}
+func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting {
+ curConfig := mgr.config(round)
+ if curConfig == nil {
+ return nil
+ }
+ var dkgSet map[types.NodeID]struct{}
+ if round >= DKGDelayRound {
+ _, qualidifed, err := typesDKG.CalcQualifyNodes(
+ mgr.gov.DKGMasterPublicKeys(round),
+ mgr.gov.DKGComplaints(round),
+ utils.GetDKGThreshold(mgr.gov.Configuration(round)),
+ )
+ if err != nil {
+ mgr.logger.Error("Failed to get gpk", "round", round, "error", err)
+ return nil
+ }
+ dkgSet = qualidifed
+ }
+ if len(dkgSet) == 0 {
+ var err error
+ dkgSet, err = mgr.cache.GetNotarySet(round)
+ if err != nil {
+ mgr.logger.Error("Failed to get notarySet", "round", round)
+ return nil
+ }
+ }
+ return &baRoundSetting{
+ crs: curConfig.crs,
+ dkgSet: dkgSet,
+ round: round,
+ threshold: utils.GetBAThreshold(&types.Config{
+ NotarySetSize: curConfig.notarySetSize}),
+ }
+}
+
func (mgr *agreementMgr) runBA(initRound uint64) {
// These are round based variables.
var (
currentRound uint64
nextRound = initRound
curConfig = mgr.config(initRound)
- setting = baRoundSetting{}
+ setting = &baRoundSetting{}
tickDuration time.Duration
+ ticker Ticker
)
// Check if this routine needs to awake in this round and prepare essential
// variables when yes.
- checkRound := func() (isNotary bool) {
+ checkRound := func() (isDKG bool) {
defer func() {
currentRound = nextRound
nextRound++
}()
// Wait until the configuartion for next round is ready.
for {
- if curConfig = mgr.config(nextRound); curConfig != nil {
+ if setting = mgr.generateSetting(nextRound); setting != nil {
break
} else {
mgr.logger.Debug("Round is not ready", "round", nextRound)
time.Sleep(1 * time.Second)
}
}
- // Check if this node in notary set of this chain in this round.
- notarySet, err := mgr.cache.GetNotarySet(nextRound)
- if err != nil {
- panic(err)
- }
- setting.crs = curConfig.crs
- setting.notarySet = notarySet
- _, isNotary = setting.notarySet[mgr.ID]
- if isNotary {
- mgr.logger.Info("Selected as notary set",
+ _, isDKG = setting.dkgSet[mgr.ID]
+ if isDKG {
+ mgr.logger.Info("Selected as dkg set",
"ID", mgr.ID,
"round", nextRound)
} else {
- mgr.logger.Info("Not selected as notary set",
+ mgr.logger.Info("Not selected as dkg set",
"ID", mgr.ID,
"round", nextRound)
}
// Setup ticker
if tickDuration != curConfig.lambdaBA {
- if setting.ticker != nil {
- setting.ticker.Stop()
+ if ticker != nil {
+ ticker.Stop()
}
- setting.ticker = newTicker(mgr.gov, nextRound, TickerBA)
+ ticker = newTicker(mgr.gov, nextRound, TickerBA)
tickDuration = curConfig.lambdaBA
}
+ setting.ticker = ticker
return
}
Loop:
@@ -395,15 +432,25 @@ Loop:
}
mgr.recv.isNotary = checkRound()
// Run BA for this round.
- mgr.recv.updateRound(currentRound)
- mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight())
mgr.recv.restartNotary <- types.Position{
- Round: mgr.recv.round(),
+ Round: currentRound,
Height: math.MaxUint64,
}
mgr.voteFilter = utils.NewVoteFilter()
mgr.recv.emptyBlockHashMap = &sync.Map{}
- if err := mgr.baRoutineForOneRound(&setting); err != nil {
+ if currentRound >= DKGDelayRound && mgr.recv.isNotary {
+ var err error
+ mgr.recv.npks, mgr.recv.psigSigner, err =
+ mgr.con.cfgModule.getDKGInfo(currentRound, false)
+ if err != nil {
+ mgr.logger.Warn("cannot get dkg info",
+ "round", currentRound, "error", err)
+ }
+ } else {
+ mgr.recv.npks = nil
+ mgr.recv.psigSigner = nil
+ }
+ if err := mgr.baRoutineForOneRound(setting); err != nil {
mgr.logger.Error("BA routine failed",
"error", err,
"nodeID", mgr.ID)
@@ -419,7 +466,7 @@ func (mgr *agreementMgr) baRoutineForOneRound(
oldPos := agr.agreementID()
restart := func(restartPos types.Position) (breakLoop bool, err error) {
if !isStop(restartPos) {
- if restartPos.Round > oldPos.Round {
+ if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() {
for {
select {
case <-mgr.ctx.Done():
@@ -427,14 +474,12 @@ func (mgr *agreementMgr) baRoutineForOneRound(
default:
}
tipRound := mgr.bcModule.tipRound()
- if tipRound > restartPos.Round {
- // It's a vary rare that this go routine sleeps for entire round.
+ if tipRound > setting.round {
break
- } else if tipRound != restartPos.Round {
- mgr.logger.Debug("Waiting blockChain to change round...",
- "pos", restartPos)
} else {
- break
+ mgr.logger.Debug("Waiting blockChain to change round...",
+ "curRound", setting.round,
+ "tipRound", tipRound)
}
time.Sleep(100 * time.Millisecond)
}
@@ -459,9 +504,6 @@ func (mgr *agreementMgr) baRoutineForOneRound(
default:
}
nextHeight, nextTime = mgr.bcModule.nextBlock()
- if isStop(oldPos) && nextHeight == 0 {
- break
- }
if isStop(restartPos) {
break
}
@@ -473,18 +515,18 @@ func (mgr *agreementMgr) baRoutineForOneRound(
time.Sleep(100 * time.Millisecond)
}
nextPos := types.Position{
- Round: recv.round(),
+ Round: setting.round,
Height: nextHeight,
}
oldPos = nextPos
var leader types.NodeID
- leader, err = mgr.cache.GetLeaderNode(nextPos)
+ leader, err = mgr.calcLeader(setting.dkgSet, setting.crs, nextPos)
if err != nil {
return
}
time.Sleep(nextTime.Sub(time.Now()))
setting.ticker.Restart()
- agr.restart(setting.notarySet, nextPos, leader, setting.crs)
+ agr.restart(setting.dkgSet, setting.threshold, nextPos, leader, setting.crs)
return
}
Loop: