aboutsummaryrefslogtreecommitdiffstats
path: root/core/agreement-mgr.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-02-15 14:18:59 +0800
committerJimmy Hu <jimmy.hu@dexon.org>2019-02-19 10:48:50 +0800
commit4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c (patch)
tree625b7d34aa700d072ffb8e68dc89ed3936b76d29 /core/agreement-mgr.go
parente4825619fb2499f5f534537c1a4d52d3e0bcacfe (diff)
downloaddexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.gz
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.bz2
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.lz
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.xz
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.tar.zst
dexon-consensus-4dbdc22e355cf1f6f0c39af1b2f3737b7527bc0c.zip
big-bang: single chain (#446)
Diffstat (limited to 'core/agreement-mgr.go')
-rw-r--r--core/agreement-mgr.go260
1 files changed, 93 insertions, 167 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index c3692d8..5f5b9ae 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -33,6 +33,8 @@ import (
// Errors returned from BA modules
var (
ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished")
+ ErrRoundOutOfRange = errors.New("round out of range")
+ ErrInvalidBlock = errors.New("invalid block")
)
const maxResultCache = 100
@@ -44,7 +46,7 @@ func genValidLeader(
if block.Timestamp.After(time.Now()) {
return false, nil
}
- if err := mgr.lattice.SanityCheck(block, true); err != nil {
+ if err := mgr.bcModule.sanityCheck(block); err != nil {
if err == ErrRetrySanityCheckLater {
return false, nil
}
@@ -64,7 +66,6 @@ func genValidLeader(
type agreementMgrConfig struct {
beginTime time.Time
- numChains uint32
roundInterval time.Duration
notarySetSize uint32
lambdaBA time.Duration
@@ -72,7 +73,6 @@ type agreementMgrConfig struct {
}
type baRoundSetting struct {
- chainID uint32
notarySet map[types.NodeID]struct{}
agr *agreement
recv *consensusBAReceiver
@@ -90,28 +90,17 @@ type agreementMgr struct {
logger common.Logger
cache *utils.NodeSetCache
signer *utils.Signer
- lattice *Lattice
+ bcModule *blockChain
ctx context.Context
lastEndTime time.Time
initRound uint64
configs []*agreementMgrConfig
- baModules []*agreement
+ baModule *agreement
processedBAResult map[types.Position]struct{}
- voteFilters []*utils.VoteFilter
+ voteFilter *utils.VoteFilter
waitGroup sync.WaitGroup
- pendingVotes map[uint64][]*types.Vote
- pendingBlocks map[uint64][]*types.Block
isRunning bool
-
- // This lock should be used when attempting to:
- // - add a new baModule.
- // - remove all baModules when stopping. In this case, the cleaner need
- // to wait for all routines runnning baModules finished.
- // - access a method of baModule.
- // - append a config from new round.
- // The routine running corresponding baModule, however, doesn't have to
- // acquire this lock.
- lock sync.RWMutex
+ lock sync.RWMutex
}
func newAgreementMgr(con *Consensus, initRound uint64,
@@ -125,7 +114,7 @@ func newAgreementMgr(con *Consensus, initRound uint64,
logger: con.logger,
cache: con.nodeSetCache,
signer: con.signer,
- lattice: con.lattice,
+ bcModule: con.bcModule,
ctx: con.ctx,
initRound: initRound,
lastEndTime: initRoundBeginTime,
@@ -133,6 +122,20 @@ func newAgreementMgr(con *Consensus, initRound uint64,
}
}
+func (mgr *agreementMgr) run() {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ if mgr.isRunning {
+ return
+ }
+ mgr.isRunning = true
+ mgr.waitGroup.Add(1)
+ go func() {
+ defer mgr.waitGroup.Done()
+ mgr.runBA(mgr.initRound)
+ }()
+}
+
func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig {
mgr.lock.RLock()
defer mgr.lock.RUnlock()
@@ -146,22 +149,6 @@ func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig {
return mgr.configs[roundIndex]
}
-func (mgr *agreementMgr) run() {
- mgr.lock.Lock()
- defer mgr.lock.Unlock()
- if mgr.isRunning {
- return
- }
- mgr.isRunning = true
- for i := uint32(0); i < uint32(len(mgr.baModules)); i++ {
- mgr.waitGroup.Add(1)
- go func(idx uint32) {
- defer mgr.waitGroup.Done()
- mgr.runBA(mgr.initRound, idx)
- }(i)
- }
-}
-
func (mgr *agreementMgr) appendConfig(
round uint64, config *types.Config, crs common.Hash) (err error) {
mgr.lock.Lock()
@@ -171,7 +158,6 @@ func (mgr *agreementMgr) appendConfig(
}
newConfig := &agreementMgrConfig{
beginTime: mgr.lastEndTime,
- numChains: config.NumChains,
roundInterval: config.RoundInterval,
notarySetSize: config.NotarySetSize,
lambdaBA: config.LambdaBA,
@@ -179,80 +165,61 @@ func (mgr *agreementMgr) appendConfig(
}
mgr.configs = append(mgr.configs, newConfig)
mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval)
- // Create baModule for newly added chain.
- for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ {
- // Prepare modules.
- recv := &consensusBAReceiver{
- consensus: mgr.con,
- chainID: i,
- restartNotary: make(chan types.Position, 1),
- roundValue: &atomic.Value{},
- }
- recv.roundValue.Store(uint64(0))
- agrModule := newAgreement(
- mgr.con.ID,
- recv,
- newLeaderSelector(genValidLeader(mgr), mgr.logger),
- mgr.signer,
- mgr.logger)
- // Hacky way to initialize first notarySet.
- nodes, err := mgr.cache.GetNodeSet(round)
- if err != nil {
- return err
- }
- agrModule.notarySet = nodes.GetSubSet(
- int(config.NotarySetSize),
- types.NewNotarySetTarget(crs, i))
- // Hacky way to make agreement module self contained.
- recv.agreementModule = agrModule
- mgr.baModules = append(mgr.baModules, agrModule)
- mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter())
- if mgr.isRunning {
- mgr.waitGroup.Add(1)
- go func(idx uint32) {
- defer mgr.waitGroup.Done()
- mgr.runBA(round, idx)
- }(i)
- }
+ // Prepare modules.
+ if mgr.baModule != nil {
+ return nil
+ }
+ recv := &consensusBAReceiver{
+ consensus: mgr.con,
+ restartNotary: make(chan types.Position, 1),
+ roundValue: &atomic.Value{},
}
+ recv.roundValue.Store(uint64(0))
+ agrModule := newAgreement(
+ mgr.con.ID,
+ recv,
+ newLeaderSelector(genValidLeader(mgr), mgr.logger),
+ mgr.signer,
+ mgr.logger)
+ // Hacky way to initialize first notarySet.
+ nodes, err := mgr.cache.GetNodeSet(round)
+ if err != nil {
+ return err
+ }
+ agrModule.notarySet = nodes.GetSubSet(
+ int(config.NotarySetSize), types.NewNotarySetTarget(crs))
+ // Hacky way to make agreement module self contained.
+ recv.agreementModule = agrModule
+ mgr.baModule = agrModule
+ mgr.voteFilter = utils.NewVoteFilter()
return nil
}
-func (mgr *agreementMgr) processVote(v *types.Vote) error {
- mgr.lock.RLock()
- defer mgr.lock.RUnlock()
- if v.Position.ChainID >= uint32(len(mgr.baModules)) {
+func (mgr *agreementMgr) processVote(v *types.Vote) (err error) {
+ if v.Position.ChainID > 0 {
mgr.logger.Error("Process vote for unknown chain to BA",
- "position", &v.Position,
- "baChain", len(mgr.baModules),
- "baRound", len(mgr.configs),
+ "position", v.Position,
"initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
- filter := mgr.voteFilters[v.Position.ChainID]
- if filter.Filter(v) {
+ if mgr.voteFilter.Filter(v) {
return nil
}
- v = v.Clone()
- err := mgr.baModules[v.Position.ChainID].processVote(v)
- if err == nil {
- mgr.baModules[v.Position.ChainID].updateFilter(filter)
+ if err = mgr.baModule.processVote(v); err == nil {
+ mgr.baModule.updateFilter(mgr.voteFilter)
}
- return err
+ return
}
func (mgr *agreementMgr) processBlock(b *types.Block) error {
- mgr.lock.RLock()
- defer mgr.lock.RUnlock()
- if b.Position.ChainID >= uint32(len(mgr.baModules)) {
+ if b.Position.ChainID > 0 {
mgr.logger.Error("Process block for unknown chain to BA",
- "position", &b.Position,
- "baChain", len(mgr.baModules),
+ "position", b.Position,
"baRound", len(mgr.configs),
"initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
- return mgr.baModules[b.Position.ChainID].processBlock(b)
+ return mgr.baModule.processBlock(b)
}
func (mgr *agreementMgr) touchAgreementResult(
@@ -280,30 +247,26 @@ func (mgr *agreementMgr) untouchAgreementResult(
func (mgr *agreementMgr) processAgreementResult(
result *types.AgreementResult) error {
- mgr.lock.RLock()
- defer mgr.lock.RUnlock()
- if result.Position.ChainID >= uint32(len(mgr.baModules)) {
+ if result.Position.ChainID > 0 {
mgr.logger.Error("Process unknown result for unknown chain to BA",
- "position", &result.Position,
- "baChain", len(mgr.baModules),
+ "position", result.Position,
"baRound", len(mgr.configs),
"initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
- agreement := mgr.baModules[result.Position.ChainID]
- aID := agreement.agreementID()
+ aID := mgr.baModule.agreementID()
if isStop(aID) {
return nil
}
- if result.Position == aID && !agreement.confirmed() {
- mgr.logger.Info("Syncing BA", "position", &result.Position)
+ if result.Position == aID && !mgr.baModule.confirmed() {
+ mgr.logger.Info("Syncing BA", "position", result.Position)
for key := range result.Votes {
- if err := agreement.processVote(&result.Votes[key]); err != nil {
+ if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
return err
}
}
- } else if result.Position.Newer(&aID) {
- mgr.logger.Info("Fast syncing BA", "position", &result.Position)
+ } else if result.Position.Newer(aID) {
+ mgr.logger.Info("Fast syncing BA", "position", result.Position)
nodes, err := mgr.cache.GetNodeSet(result.Position.Round)
if err != nil {
return err
@@ -316,9 +279,9 @@ func (mgr *agreementMgr) processAgreementResult(
nIDs := nodes.GetSubSet(
int(utils.GetConfigWithPanic(
mgr.gov, result.Position.Round, mgr.logger).NotarySetSize),
- types.NewNotarySetTarget(crs, result.Position.ChainID))
+ types.NewNotarySetTarget(crs))
for key := range result.Votes {
- if err := agreement.processVote(&result.Votes[key]); err != nil {
+ if err := mgr.baModule.processVote(&result.Votes[key]); err != nil {
return err
}
}
@@ -326,7 +289,7 @@ func (mgr *agreementMgr) processAgreementResult(
if err != nil {
return err
}
- agreement.restart(nIDs, result.Position, leader, crs)
+ mgr.baModule.restart(nIDs, result.Position, leader, crs)
}
return nil
}
@@ -336,30 +299,20 @@ func (mgr *agreementMgr) stop() {
func() {
mgr.lock.Lock()
defer mgr.lock.Unlock()
- for _, agr := range mgr.baModules {
- agr.stop()
- }
+ mgr.baModule.stop()
}()
// Block until all routines are done.
mgr.waitGroup.Wait()
}
-func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
- // Acquire agreement module.
- agr, recv := func() (*agreement, *consensusBAReceiver) {
- mgr.lock.RLock()
- defer mgr.lock.RUnlock()
- agr := mgr.baModules[chainID]
- return agr, agr.data.recv.(*consensusBAReceiver)
- }()
+func (mgr *agreementMgr) runBA(initRound uint64) {
// These are round based variables.
var (
currentRound uint64
nextRound = initRound
setting = baRoundSetting{
- chainID: chainID,
- agr: agr,
- recv: recv,
+ agr: mgr.baModule,
+ recv: mgr.baModule.data.recv.(*consensusBAReceiver),
}
roundBeginTime time.Time
roundEndTime time.Time
@@ -368,7 +321,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
// Check if this routine needs to awake in this round and prepare essential
// variables when yes.
- checkRound := func() (isNotary, isDisabled bool) {
+ checkRound := func() (isNotary bool) {
defer func() {
currentRound = nextRound
nextRound++
@@ -386,13 +339,8 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
// Set next checkpoint.
roundBeginTime = config.beginTime
roundEndTime = config.beginTime.Add(config.roundInterval)
- // Check if this chain handled by this routine included in this round.
- if chainID >= config.numChains {
- isDisabled = true
- return
- }
// Check if this node in notary set of this chain in this round.
- notarySet, err := mgr.cache.GetNotarySet(nextRound, chainID)
+ notarySet, err := mgr.cache.GetNotarySet(nextRound, 0)
if err != nil {
panic(err)
}
@@ -402,13 +350,11 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
if isNotary {
mgr.logger.Info("selected as notary set",
"ID", mgr.ID,
- "round", nextRound,
- "chainID", chainID)
+ "round", nextRound)
} else {
mgr.logger.Info("not selected as notary set",
"ID", mgr.ID,
- "round", nextRound,
- "chainID", chainID)
+ "round", nextRound)
}
// Setup ticker
if tickDuration != config.lambdaBA {
@@ -428,16 +374,7 @@ Loop:
default:
}
now := time.Now().UTC()
- var isDisabled bool
- setting.recv.isNotary, isDisabled = checkRound()
- if isDisabled {
- select {
- case <-mgr.ctx.Done():
- break Loop
- case <-time.After(roundEndTime.Sub(now)):
- continue Loop
- }
- }
+ setting.recv.isNotary = checkRound()
// Sleep until round begin. Here a biased round begin time would be
// used instead of the one in config. The reason it to disperse the load
// of fullnodes to verify confirmed blocks from each chain.
@@ -454,18 +391,17 @@ Loop:
<-setting.ticker.Tick()
}
// Run BA for this round.
- recv.roundValue.Store(currentRound)
- recv.changeNotaryTime = roundEndTime
- recv.restartNotary <- types.Position{
+ setting.recv.roundValue.Store(currentRound)
+ setting.recv.changeNotaryTime = roundEndTime
+ setting.recv.restartNotary <- types.Position{
Round: setting.recv.round(),
ChainID: math.MaxUint32,
}
- mgr.voteFilters[chainID] = utils.NewVoteFilter()
+ mgr.voteFilter = utils.NewVoteFilter()
if err := mgr.baRoutineForOneRound(&setting); err != nil {
mgr.logger.Error("BA routine failed",
"error", err,
- "nodeID", mgr.ID,
- "chain", chainID)
+ "nodeID", mgr.ID)
break Loop
}
}
@@ -485,13 +421,13 @@ func (mgr *agreementMgr) baRoutineForOneRound(
break
default:
}
- tipRound := mgr.lattice.TipRound(setting.chainID)
+ tipRound := mgr.bcModule.tipRound()
if tipRound > restartPos.Round {
// It's a vary rare that this go routine sleeps for entire round.
break
} else if tipRound != restartPos.Round {
- mgr.logger.Debug("Waiting lattice to change round...",
- "pos", &restartPos)
+ mgr.logger.Debug("Waiting blockChain to change round...",
+ "pos", restartPos)
} else {
break
}
@@ -501,7 +437,7 @@ func (mgr *agreementMgr) baRoutineForOneRound(
breakLoop = true
return
}
- if restartPos.Older(&oldPos) {
+ if restartPos.Older(oldPos) {
// The restartNotary event is triggered by 'BlockConfirmed'
// of some older block.
return
@@ -510,16 +446,7 @@ func (mgr *agreementMgr) baRoutineForOneRound(
var nextHeight uint64
var nextTime time.Time
for {
- nextHeight, nextTime, err =
- mgr.lattice.NextBlock(recv.round(), setting.chainID)
- if err != nil {
- mgr.logger.Debug("Error getting next height",
- "error", err,
- "round", recv.round(),
- "chainID", setting.chainID)
- err = nil
- nextHeight = restartPos.Height
- }
+ nextHeight, nextTime = mgr.bcModule.nextBlock()
if isStop(oldPos) && nextHeight == 0 {
break
}
@@ -529,14 +456,13 @@ func (mgr *agreementMgr) baRoutineForOneRound(
if nextHeight > restartPos.Height {
break
}
- mgr.logger.Debug("Lattice not ready!!!",
- "old", &oldPos, "restart", &restartPos, "next", nextHeight)
+ mgr.logger.Debug("BlockChain not ready!!!",
+ "old", oldPos, "restart", restartPos, "next", nextHeight)
time.Sleep(100 * time.Millisecond)
}
nextPos := types.Position{
- Round: recv.round(),
- ChainID: setting.chainID,
- Height: nextHeight,
+ Round: recv.round(),
+ Height: nextHeight,
}
oldPos = nextPos
var leader types.NodeID
@@ -591,7 +517,7 @@ Loop:
if agr.pullVotes() {
pos := agr.agreementID()
mgr.logger.Debug("Calling Network.PullVotes for syncing votes",
- "position", &pos)
+ "position", pos)
mgr.network.PullVotes(pos)
}
for i := 0; i < agr.clocks(); i++ {