aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-11-20 13:57:24 +0800
committerGitHub <noreply@github.com>2018-11-20 13:57:24 +0800
commit6d95559bf7eb62e6c114ca4d4040c44ffd553629 (patch)
tree5c62252e4cfe59c318ef3bb67b6c55891a58a4d3
parente5891f7ca08737c3f3bc37fd523537cb243f8b0d (diff)
downloaddexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar
dexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.gz
dexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.bz2
dexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.lz
dexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.xz
dexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.zst
dexon-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.zip
core: support NumChains change for BA modules (#339)
-rw-r--r--core/agreement-mgr.go423
-rw-r--r--core/agreement.go11
-rw-r--r--core/consensus.go286
-rw-r--r--core/consensus_test.go37
-rw-r--r--core/lattice-data.go113
-rw-r--r--core/lattice-data_test.go14
-rw-r--r--core/lattice.go58
-rw-r--r--core/test/app.go3
-rw-r--r--core/test/governance.go12
-rw-r--r--core/test/governance_test.go6
-rw-r--r--core/types/block-randomness.go7
-rw-r--r--core/utils.go6
-rw-r--r--integration_test/consensus_test.go92
-rw-r--r--integration_test/with_scheduler_test.go26
-rw-r--r--simulation/simulation.go2
15 files changed, 763 insertions, 333 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
new file mode 100644
index 0000000..10469de
--- /dev/null
+++ b/core/agreement-mgr.go
@@ -0,0 +1,423 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+// Errors returned from BA modules
+var (
+ ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished")
+)
+
+// genValidLeader generate a validLeader function for agreement modules.
+func genValidLeader(
+ mgr *agreementMgr) func(*types.Block) (bool, error) {
+ return func(block *types.Block) (bool, error) {
+ if block.Timestamp.After(time.Now()) {
+ return false, nil
+ }
+ if err := mgr.lattice.SanityCheck(block); err != nil {
+ if err == ErrRetrySanityCheckLater {
+ return false, nil
+ }
+ return false, err
+ }
+ mgr.logger.Debug("Calling Application.VerifyBlock", "block", block)
+ switch mgr.app.VerifyBlock(block) {
+ case types.VerifyInvalidBlock:
+ return false, ErrInvalidBlock
+ case types.VerifyRetryLater:
+ return false, nil
+ default:
+ }
+ return true, nil
+ }
+}
+
+type agreementMgrConfig struct {
+ beginTime time.Time
+ numChains uint32
+ roundInterval time.Duration
+ notarySetSize uint32
+ lambdaBA time.Duration
+ crs common.Hash
+}
+
+type baRoundSetting struct {
+ chainID uint32
+ notarySet map[types.NodeID]struct{}
+ agr *agreement
+ recv *consensusBAReceiver
+ ticker Ticker
+ crs common.Hash
+}
+
+type agreementMgr struct {
+ // TODO(mission): unbound Consensus instance from this module.
+ con *Consensus
+ ID types.NodeID
+ app Application
+ gov Governance
+ network Network
+ logger common.Logger
+ cache *utils.NodeSetCache
+ auth *Authenticator
+ lattice *Lattice
+ ctx context.Context
+ lastEndTime time.Time
+ configs []*agreementMgrConfig
+ baModules []*agreement
+ waitGroup sync.WaitGroup
+ pendingVotes map[uint64][]*types.Vote
+ pendingBlocks map[uint64][]*types.Block
+
+ // This lock should be used when attempting to:
+ // - add a new baModule.
+ // - remove all baModules when stopping. In this case, the cleaner need
+ // to wait for all routines runnning baModules finished.
+ // - access a method of baModule.
+ // - append a config from new round.
+ // The routine running corresponding baModule, however, doesn't have to
+ // acquire this lock.
+ lock sync.RWMutex
+}
+
+func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr {
+ return &agreementMgr{
+ con: con,
+ ID: con.ID,
+ app: con.app,
+ gov: con.gov,
+ network: con.network,
+ logger: con.logger,
+ cache: con.nodeSetCache,
+ auth: con.authModule,
+ lattice: con.lattice,
+ ctx: con.ctx,
+ lastEndTime: dMoment,
+ }
+}
+
+func (mgr *agreementMgr) appendConfig(
+ round uint64, config *types.Config, crs common.Hash) (err error) {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ // TODO(mission): initiate this module from some round > 0.
+ if round != uint64(len(mgr.configs)) {
+ return ErrRoundNotIncreasing
+ }
+ newConfig := &agreementMgrConfig{
+ beginTime: mgr.lastEndTime,
+ numChains: config.NumChains,
+ roundInterval: config.RoundInterval,
+ notarySetSize: config.NotarySetSize,
+ lambdaBA: config.LambdaBA,
+ crs: crs,
+ }
+ mgr.configs = append(mgr.configs, newConfig)
+ mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval)
+ // Create baModule for newly added chain.
+ for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ {
+ // Prepare modules.
+ recv := &consensusBAReceiver{
+ consensus: mgr.con,
+ chainID: i,
+ restartNotary: make(chan bool, 1),
+ }
+ agrModule := newAgreement(
+ mgr.con.ID,
+ recv,
+ newLeaderSelector(genValidLeader(mgr), mgr.logger),
+ mgr.auth)
+ // Hacky way to make agreement module self contained.
+ recv.agreementModule = agrModule
+ mgr.baModules = append(mgr.baModules, agrModule)
+ go mgr.runBA(round, i)
+ }
+ return nil
+}
+
+func (mgr *agreementMgr) processVote(v *types.Vote) error {
+ v = v.Clone()
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ if v.Position.ChainID >= uint32(len(mgr.baModules)) {
+ mgr.logger.Error("Process vote for unknown chain to BA",
+ "position", &v.Position,
+ "baChain", len(mgr.baModules),
+ "baRound", len(mgr.configs))
+ return utils.ErrInvalidChainID
+ }
+ return mgr.baModules[v.Position.ChainID].processVote(v)
+}
+
+func (mgr *agreementMgr) processBlock(b *types.Block) error {
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ if b.Position.ChainID >= uint32(len(mgr.baModules)) {
+ mgr.logger.Error("Process block for unknown chain to BA",
+ "position", &b.Position,
+ "baChain", len(mgr.baModules),
+ "baRound", len(mgr.configs))
+ return utils.ErrInvalidChainID
+ }
+ return mgr.baModules[b.Position.ChainID].processBlock(b)
+}
+
+func (mgr *agreementMgr) processAgreementResult(
+ result *types.AgreementResult) error {
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ if result.Position.ChainID >= uint32(len(mgr.baModules)) {
+ mgr.logger.Error("Process unknown result for unknown chain to BA",
+ "position", &result.Position,
+ "baChain", len(mgr.baModules),
+ "baRound", len(mgr.configs))
+ return utils.ErrInvalidChainID
+ }
+ agreement := mgr.baModules[result.Position.ChainID]
+ aID := agreement.agreementID()
+ if isStop(aID) {
+ return nil
+ }
+ if result.Position.Newer(&aID) {
+ mgr.logger.Info("Syncing BA", "position", &result.Position)
+ nodes, err := mgr.cache.GetNodeSet(result.Position.Round)
+ if err != nil {
+ return err
+ }
+ mgr.logger.Debug("Calling Network.PullBlocks for syncing BA",
+ "hash", result.BlockHash)
+ mgr.network.PullBlocks(common.Hashes{result.BlockHash})
+ mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round)
+ crs := mgr.gov.CRS(result.Position.Round)
+ nIDs := nodes.GetSubSet(
+ int(mgr.gov.Configuration(result.Position.Round).NotarySetSize),
+ types.NewNotarySetTarget(crs, result.Position.ChainID))
+ for _, vote := range result.Votes {
+ agreement.processVote(&vote)
+ }
+ agreement.restart(nIDs, result.Position, crs)
+ }
+ return nil
+}
+
+func (mgr *agreementMgr) stop() {
+ // Stop all running agreement modules.
+ func() {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ for _, agr := range mgr.baModules {
+ agr.stop()
+ }
+ }()
+ // Block until all routines are done.
+ mgr.waitGroup.Wait()
+}
+
+func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
+ mgr.waitGroup.Add(1)
+ defer mgr.waitGroup.Done()
+ // Acquire agreement module.
+ agr, recv := func() (*agreement, *consensusBAReceiver) {
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ agr := mgr.baModules[chainID]
+ return agr, agr.data.recv.(*consensusBAReceiver)
+ }()
+ // These are round based variables.
+ var (
+ currentRound uint64
+ nextRound = initRound
+ setting = baRoundSetting{
+ chainID: chainID,
+ agr: agr,
+ recv: recv,
+ }
+ roundBeginTime time.Time
+ roundEndTime time.Time
+ tickDuration time.Duration
+ )
+
+ // Check if this routine needs to awake in this round and prepare essential
+ // variables when yes.
+ checkRound := func() (awake bool) {
+ defer func() {
+ currentRound = nextRound
+ nextRound++
+ }()
+ // Wait until the configuartion for next round is ready.
+ var config *agreementMgrConfig
+ for {
+ config = func() *agreementMgrConfig {
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ if nextRound < uint64(len(mgr.configs)) {
+ return mgr.configs[nextRound]
+ }
+ return nil
+ }()
+ if config != nil {
+ break
+ } else {
+ mgr.logger.Info("round is not ready", "round", nextRound)
+ time.Sleep(1 * time.Second)
+ }
+ }
+ // Set next checkpoint.
+ roundBeginTime = config.beginTime
+ roundEndTime = config.beginTime.Add(config.roundInterval)
+ // Check if this chain handled by this routine included in this round.
+ if chainID >= config.numChains {
+ return false
+ }
+ // Check if this node in notary set of this chain in this round.
+ nodeSet, err := mgr.cache.GetNodeSet(nextRound)
+ if err != nil {
+ panic(err)
+ }
+ setting.crs = config.crs
+ setting.notarySet = nodeSet.GetSubSet(
+ int(config.notarySetSize),
+ types.NewNotarySetTarget(config.crs, chainID))
+ _, awake = setting.notarySet[mgr.ID]
+ // Setup ticker
+ if tickDuration != config.lambdaBA {
+ if setting.ticker != nil {
+ setting.ticker.Stop()
+ }
+ setting.ticker = newTicker(mgr.gov, nextRound, TickerBA)
+ tickDuration = config.lambdaBA
+ }
+ return
+ }
+Loop:
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break Loop
+ default:
+ }
+ now := time.Now().UTC()
+ if !checkRound() {
+ if now.After(roundEndTime) {
+ // That round is passed.
+ continue Loop
+ }
+ // Sleep until next checkpoint.
+ select {
+ case <-mgr.ctx.Done():
+ break Loop
+ case <-time.After(roundEndTime.Sub(now)):
+ continue Loop
+ }
+ }
+ // Sleep until round begin. Here a biased round begin time would be
+ // used instead of the one in config. The reason it to disperse the load
+ // of fullnodes to verify confirmed blocks from each chain.
+ if now.Before(pickBiasedTime(roundBeginTime, 4*tickDuration)) {
+ select {
+ case <-mgr.ctx.Done():
+ break Loop
+ case <-time.After(roundBeginTime.Sub(now)):
+ }
+ // Clean the tick channel after awake: the tick would be queued in
+ // channel, thus the first few ticks would not tick on expected
+ // interval.
+ <-setting.ticker.Tick()
+ <-setting.ticker.Tick()
+ }
+ // Run BA for this round.
+ recv.round = currentRound
+ recv.changeNotaryTime = roundEndTime
+ recv.restartNotary <- false
+ if err := mgr.baRoutineForOneRound(&setting); err != nil {
+ mgr.logger.Error("BA routine failed",
+ "error", err,
+ "nodeID", mgr.ID,
+ "chain", chainID)
+ break Loop
+ }
+ }
+}
+
+func (mgr *agreementMgr) baRoutineForOneRound(
+ setting *baRoundSetting) (err error) {
+ agr := setting.agr
+ recv := setting.recv
+Loop:
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break Loop
+ default:
+ }
+ select {
+ case newNotary := <-recv.restartNotary:
+ if newNotary {
+ // This round is finished.
+ break Loop
+ }
+ nextHeight, err := mgr.lattice.NextHeight(recv.round, setting.chainID)
+ if err != nil {
+ panic(err)
+ }
+ agr.restart(setting.notarySet, types.Position{
+ Round: recv.round,
+ ChainID: setting.chainID,
+ Height: nextHeight,
+ }, setting.crs)
+ default:
+ }
+ if agr.pullVotes() {
+ pos := agr.agreementID()
+ mgr.logger.Debug("Calling Network.PullVotes for syncing votes",
+ "position", &pos)
+ mgr.network.PullVotes(pos)
+ }
+ if err = agr.nextState(); err != nil {
+ mgr.logger.Error("Failed to proceed to next state",
+ "nodeID", mgr.ID.String(),
+ "error", err)
+ break Loop
+ }
+ for i := 0; i < agr.clocks(); i++ {
+ // Priority select for agreement.done().
+ select {
+ case <-agr.done():
+ continue Loop
+ default:
+ }
+ select {
+ case <-agr.done():
+ continue Loop
+ case <-setting.ticker.Tick():
+ }
+ }
+ }
+ return nil
+}
diff --git a/core/agreement.go b/core/agreement.go
index ff1c71a..4fb0dea 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -311,12 +311,13 @@ func (a *agreement) processVote(vote *types.Vote) error {
return err
}
aID := a.agreementID()
+ // Agreement module has stopped.
+ if isStop(aID) {
+ return nil
+ }
if vote.Position != aID {
- // Agreement module has stopped.
- if !isStop(aID) {
- if aID.Newer(&vote.Position) {
- return nil
- }
+ if aID.Newer(&vote.Position) {
+ return nil
}
a.pendingVote = append(a.pendingVote, pendingVote{
vote: vote,
diff --git a/core/consensus.go b/core/consensus.go
index e09ee25..49874d3 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -56,6 +56,8 @@ var (
"incorrect vote position")
ErrIncorrectVoteProposer = fmt.Errorf(
"incorrect vote proposer")
+ ErrCRSNotReady = fmt.Errorf(
+ "CRS not ready")
)
// consensusBAReceiver implements agreementReceiver.
@@ -103,20 +105,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
func (recv *consensusBAReceiver) ConfirmBlock(
hash common.Hash, votes map[types.NodeID]*types.Vote) {
var block *types.Block
- if (hash == common.Hash{}) {
+ isEmptyBlockConfirmed := hash == common.Hash{}
+ if isEmptyBlockConfirmed {
aID := recv.agreementModule.agreementID()
recv.consensus.logger.Info("Empty block is confirmed",
"position", &aID)
var err error
- block, err = recv.consensus.proposeEmptyBlock(recv.chainID)
+ block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID)
if err != nil {
recv.consensus.logger.Error("Propose empty block failed", "error", err)
return
}
} else {
var exist bool
- block, exist = recv.consensus.baModules[recv.chainID].
- findCandidateBlockNoLock(hash)
+ block, exist = recv.agreementModule.findCandidateBlockNoLock(hash)
if !exist {
recv.consensus.logger.Error("Unknown block confirmed",
"hash", hash,
@@ -150,9 +152,10 @@ func (recv *consensusBAReceiver) ConfirmBlock(
voteList = append(voteList, *vote)
}
result := &types.AgreementResult{
- BlockHash: block.Hash,
- Position: block.Position,
- Votes: voteList,
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Votes: voteList,
+ IsEmptyBlock: isEmptyBlockConfirmed,
}
recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult",
"result", result)
@@ -273,8 +276,7 @@ type Consensus struct {
authModule *Authenticator
// BA.
- baModules []*agreement
- receivers []*consensusBAReceiver
+ baMgr *agreementMgr
baConfirmedBlock map[common.Hash]chan<- *types.Block
// DKG.
@@ -365,49 +367,8 @@ func NewConsensus(
event: common.NewEvent(),
logger: logger,
}
-
- validLeader := func(block *types.Block) (bool, error) {
- if block.Timestamp.After(time.Now()) {
- return false, nil
- }
- if err := lattice.SanityCheck(block); err != nil {
- if err == ErrRetrySanityCheckLater {
- return false, nil
- }
- return false, err
- }
- logger.Debug("Calling Application.VerifyBlock", "block", block)
- switch app.VerifyBlock(block) {
- case types.VerifyInvalidBlock:
- return false, ErrInvalidBlock
- case types.VerifyRetryLater:
- return false, nil
- default:
- }
- return true, nil
- }
-
- con.baModules = make([]*agreement, config.NumChains)
- con.receivers = make([]*consensusBAReceiver, config.NumChains)
- for i := uint32(0); i < config.NumChains; i++ {
- chainID := i
- recv := &consensusBAReceiver{
- consensus: con,
- chainID: chainID,
- restartNotary: make(chan bool, 1),
- }
- agreementModule := newAgreement(
- con.ID,
- recv,
- newLeaderSelector(validLeader, logger),
- con.authModule,
- )
- // Hacky way to make agreement module self contained.
- recv.agreementModule = agreementModule
- recv.changeNotaryTime = dMoment
- con.baModules[chainID] = agreementModule
- con.receivers[chainID] = recv
- }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ con.baMgr = newAgreementMgr(con, dMoment)
return con
}
@@ -420,14 +381,27 @@ func (con *Consensus) Run(initBlock *types.Block) {
con.logger.Debug("Calling Governance.Configuration", "round", initRound)
initConfig := con.gov.Configuration(initRound)
// Setup context.
- con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.ccModule.init(initBlock)
// TODO(jimmy-dexon): change AppendConfig to add config for specific round.
- for i := uint64(0); i <= initRound; i++ {
- con.logger.Debug("Calling Governance.Configuration", "round", i+1)
- cfg := con.gov.Configuration(i + 1)
- if err := con.lattice.AppendConfig(i+1, cfg); err != nil {
- panic(err)
+ for i := uint64(0); i <= initRound+1; i++ {
+ con.logger.Debug("Calling Governance.Configuration", "round", i)
+ cfg := con.gov.Configuration(i)
+ // 0 round is already given to core.Lattice module when constructing.
+ if i > 0 {
+ if err := con.lattice.AppendConfig(i, cfg); err != nil {
+ panic(err)
+ }
+ }
+ // Corresponding CRS might not be ready for next round to initRound.
+ if i < initRound+1 {
+ con.logger.Debug("Calling Governance.CRS", "round", i)
+ crs := con.gov.CRS(i)
+ if (crs == common.Hash{}) {
+ panic(ErrCRSNotReady)
+ }
+ if err := con.baMgr.appendConfig(i, cfg, crs); err != nil {
+ panic(err)
+ }
}
}
dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
@@ -447,103 +421,9 @@ func (con *Consensus) Run(initBlock *types.Block) {
})
}
con.initialRound(con.dMoment, initRound, initConfig)
- ticks := make([]chan struct{}, 0, initConfig.NumChains)
- for i := uint32(0); i < initConfig.NumChains; i++ {
- tick := make(chan struct{})
- ticks = append(ticks, tick)
- // TODO(jimmy-dexon): this is a temporary solution to offset BA time.
- // The complelete solution should be delivered along with config change.
- offset := time.Duration(i*uint32(4)/initConfig.NumChains) *
- initConfig.LambdaBA
- go func(chainID uint32, offset time.Duration) {
- time.Sleep(offset)
- con.runBA(chainID, tick)
- }(i, offset)
- }
-
- // Reset ticker.
- <-con.tickerObj.Tick()
- <-con.tickerObj.Tick()
- for {
- <-con.tickerObj.Tick()
- for _, tick := range ticks {
- select {
- case tick <- struct{}{}:
- default:
- }
- }
- }
-}
-
-func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) {
- // TODO(jimmy-dexon): move this function inside agreement.
- agreement := con.baModules[chainID]
- recv := con.receivers[chainID]
- recv.restartNotary <- true
- nIDs := make(map[types.NodeID]struct{})
- crs := common.Hash{}
- // Reset ticker
- <-tick
-BALoop:
- for {
- select {
- case <-con.ctx.Done():
- break BALoop
- default:
- }
- select {
- case newNotary := <-recv.restartNotary:
- if newNotary {
- con.logger.Debug("Calling Governance.CRS", "round", recv.round)
- crs = con.gov.CRS(recv.round)
- if (crs == common.Hash{}) {
- // Governance is out-of-sync.
- continue BALoop
- }
- configForNewRound := con.gov.Configuration(recv.round)
- recv.changeNotaryTime =
- recv.changeNotaryTime.Add(configForNewRound.RoundInterval)
- nodes, err := con.nodeSetCache.GetNodeSet(recv.round)
- if err != nil {
- panic(err)
- }
- con.logger.Debug("Calling Governance.Configuration",
- "round", recv.round)
- nIDs = nodes.GetSubSet(
- int(configForNewRound.NotarySetSize),
- types.NewNotarySetTarget(crs, chainID))
- }
- nextPos := con.lattice.NextPosition(chainID)
- nextPos.Round = recv.round
- agreement.restart(nIDs, nextPos, crs)
- default:
- }
- if agreement.pullVotes() {
- pos := agreement.agreementID()
- con.logger.Debug("Calling Network.PullVotes for syncing votes",
- "position", &pos)
- con.network.PullVotes(pos)
- }
- err := agreement.nextState()
- if err != nil {
- con.logger.Error("Failed to proceed to next state",
- "nodeID", con.ID.String(),
- "error", err)
- break BALoop
- }
- for i := 0; i < agreement.clocks(); i++ {
- // Priority select for agreement.done().
- select {
- case <-agreement.done():
- continue BALoop
- default:
- }
- select {
- case <-agreement.done():
- continue BALoop
- case <-tick:
- }
- }
+ // Block until done.
+ select {
+ case <-con.ctx.Done():
}
}
@@ -622,6 +502,7 @@ func (con *Consensus) initialRound(
con.logger.Error("Error getting DKG set", "round", round, "error", err)
curDkgSet = make(map[types.NodeID]struct{})
}
+ // Initiate CRS routine.
if _, exist := curDkgSet[con.ID]; exist {
con.event.RegisterTime(startTime.Add(config.RoundInterval/2),
func(time.Time) {
@@ -630,7 +511,31 @@ func (con *Consensus) initialRound(
}()
})
}
-
+ // Initiate BA modules.
+ con.event.RegisterTime(
+ startTime.Add(config.RoundInterval/2+config.LambdaDKG),
+ func(time.Time) {
+ go func(nextRound uint64) {
+ for (con.gov.CRS(nextRound) == common.Hash{}) {
+ con.logger.Info("CRS is not ready yet. Try again later...",
+ "nodeID", con.ID,
+ "round", nextRound)
+ time.Sleep(500 * time.Millisecond)
+ }
+ // Notify BA for new round.
+ con.logger.Debug("Calling Governance.Configuration",
+ "round", nextRound)
+ nextConfig := con.gov.Configuration(nextRound)
+ con.logger.Debug("Calling Governance.CRS",
+ "round", nextRound)
+ nextCRS := con.gov.CRS(nextRound)
+ if err := con.baMgr.appendConfig(
+ nextRound, nextConfig, nextCRS); err != nil {
+ panic(err)
+ }
+ }(round + 1)
+ })
+ // Initiate DKG for this round.
con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG),
func(time.Time) {
go func(nextRound uint64) {
@@ -670,6 +575,7 @@ func (con *Consensus) initialRound(
})
}(round + 1)
})
+ // Prepare lattice module for next round and next "initialRound" routine.
con.event.RegisterTime(startTime.Add(config.RoundInterval),
func(time.Time) {
// Change round.
@@ -685,9 +591,7 @@ func (con *Consensus) initialRound(
// Stop the Consensus core.
func (con *Consensus) Stop() {
- for _, a := range con.baModules {
- a.stop()
- }
+ con.baMgr.stop()
con.event.Reset()
con.ctxCancel()
}
@@ -785,9 +689,10 @@ func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block {
}
func (con *Consensus) proposeEmptyBlock(
- chainID uint32) (*types.Block, error) {
+ round uint64, chainID uint32) (*types.Block, error) {
block := &types.Block{
Position: types.Position{
+ Round: round,
ChainID: chainID,
},
}
@@ -799,15 +704,9 @@ func (con *Consensus) proposeEmptyBlock(
// ProcessVote is the entry point to submit ont vote to a Consensus instance.
func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
- if vote.Position.ChainID >= uint32(len(con.baModules)) {
- return nil
- }
- if isStop(con.baModules[vote.Position.ChainID].agreementID()) {
- return nil
- }
v := vote.Clone()
- err = con.baModules[v.Position.ChainID].processVote(v)
- return err
+ err = con.baMgr.processVote(v)
+ return
}
// ProcessAgreementResult processes the randomness request.
@@ -826,8 +725,14 @@ func (con *Consensus) ProcessAgreementResult(
return ErrIncorrectVoteProposer
}
for _, vote := range rand.Votes {
- if vote.BlockHash != rand.BlockHash {
- return ErrIncorrectVoteBlockHash
+ if rand.IsEmptyBlock {
+ if (vote.BlockHash != common.Hash{}) {
+ return ErrIncorrectVoteBlockHash
+ }
+ } else {
+ if vote.BlockHash != rand.BlockHash {
+ return ErrIncorrectVoteBlockHash
+ }
}
if vote.Type != types.VoteCom {
return ErrIncorrectVoteType
@@ -847,29 +752,8 @@ func (con *Consensus) ProcessAgreementResult(
}
}
// Syncing BA Module.
- agreement := con.baModules[rand.Position.ChainID]
- aID := agreement.agreementID()
- if isStop(aID) {
- return nil
- }
- if rand.Position.Newer(&aID) {
- con.logger.Info("Syncing BA", "position", &rand.Position)
- nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round)
- if err != nil {
- return err
- }
- con.logger.Debug("Calling Network.PullBlocks for syncing BA",
- "hash", rand.BlockHash)
- con.network.PullBlocks(common.Hashes{rand.BlockHash})
- con.logger.Debug("Calling Governance.CRS", "round", rand.Position.Round)
- crs := con.gov.CRS(rand.Position.Round)
- nIDs := nodes.GetSubSet(
- int(con.gov.Configuration(rand.Position.Round).NotarySetSize),
- types.NewNotarySetTarget(crs, rand.Position.ChainID))
- for _, vote := range rand.Votes {
- agreement.processVote(&vote)
- }
- agreement.restart(nIDs, rand.Position, crs)
+ if err := con.baMgr.processAgreementResult(rand); err != nil {
+ return err
}
// Calculating randomness.
if rand.Position.Round == 0 {
@@ -950,9 +834,7 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil {
- return err
- }
+ err = con.baMgr.processBlock(b)
return
}
@@ -1064,9 +946,15 @@ func (con *Consensus) prepareBlock(b *types.Block,
if err = con.lattice.PrepareBlock(b, proposeTime); err != nil {
return
}
- con.logger.Debug("Calling Governance.CRS", "round", 0)
- if err =
- con.authModule.SignCRS(b, con.gov.CRS(b.Position.Round)); err != nil {
+ con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round)
+ crs := con.gov.CRS(b.Position.Round)
+ if crs.Equal(common.Hash{}) {
+ con.logger.Error("CRS for round is not ready, unable to prepare block",
+ "position", &b.Position)
+ err = ErrCRSNotReady
+ return
+ }
+ if err = con.authModule.SignCRS(b, crs); err != nil {
return
}
return
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 802182a..f5f1182 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -187,24 +187,18 @@ func (s *ConsensusTestSuite) prepareConsensus(
dMoment, app, gov, db, network, prvKey, &common.NullLogger{})
con.ccModule.init(&types.Block{})
conn.setCon(nID, con)
- round := uint64(0)
- nodes, err := con.nodeSetCache.GetNodeSet(round)
- s.Require().NoError(err)
- for i, agreement := range con.baModules {
- chainID := uint32(i)
- nIDs := nodes.GetSubSet(
- int(gov.Configuration(round).NotarySetSize),
- types.NewNotarySetTarget(
- gov.CRS(round), chainID))
- agreement.restart(nIDs, types.Position{
- Round: round,
- ChainID: chainID,
- Height: uint64(0),
- }, gov.CRS(round))
- }
return app, con
}
+func (s *ConsensusTestSuite) prepareAgreementMgrWithoutRunning(
+ con *Consensus, numChains uint32) {
+ // This is a workaround to setup agreementMgr.
+ con.baMgr.appendConfig(0, &types.Config{
+ NumChains: numChains,
+ RoundInterval: time.Hour,
+ }, common.NewRandomHash())
+}
+
func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
// This test scenario:
// o o o o <- this layer makes older blocks strongly acked.
@@ -453,6 +447,7 @@ func (s *ConsensusTestSuite) TestPrepareBlock() {
cons := map[types.NodeID]*Consensus{}
for _, key := range prvKeys {
_, con := s.prepareConsensus(dMoment, gov, key, conn)
+ s.prepareAgreementMgrWithoutRunning(con, 4)
nID := types.NewNodeID(key.PublicKey())
cons[nID] = con
nodes = append(nodes, nID)
@@ -492,6 +487,7 @@ func (s *ConsensusTestSuite) TestPrepareGenesisBlock() {
s.Require().NoError(err)
prvKey := prvKeys[0]
_, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn)
+ s.prepareAgreementMgrWithoutRunning(con, 4)
block := &types.Block{
Position: types.Position{ChainID: 0},
}
@@ -547,13 +543,15 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
}
func (s *ConsensusTestSuite) TestSyncBA() {
+ lambdaBA := time.Second
conn := s.newNetworkConnection()
prvKeys, pubKeys, err := test.NewKeys(4)
s.Require().NoError(err)
- gov, err := test.NewGovernance(pubKeys, time.Second, ConfigRoundShift)
+ gov, err := test.NewGovernance(pubKeys, lambdaBA, ConfigRoundShift)
s.Require().NoError(err)
prvKey := prvKeys[0]
_, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn)
+ go con.Run(&types.Block{})
hash := common.NewRandomHash()
auths := make([]*Authenticator, 0, len(prvKeys))
for _, prvKey := range prvKeys {
@@ -574,8 +572,13 @@ func (s *ConsensusTestSuite) TestSyncBA() {
s.Require().NoError(auth.SignVote(vote))
baResult.Votes = append(baResult.Votes, *vote)
}
+ // Make sure each agreement module is running. ProcessAgreementResult only
+ // works properly when agreement module is running:
+ // - the bias for round begin time would be 4 * lambda.
+ // - the ticker is 1 lambdaa.
+ time.Sleep(5 * lambdaBA)
s.Require().NoError(con.ProcessAgreementResult(baResult))
- aID := con.baModules[0].agreementID()
+ aID := con.baMgr.baModules[0].agreementID()
s.Equal(pos, aID)
// Test negative case.
diff --git a/core/lattice-data.go b/core/lattice-data.go
index 6fe810a..f1ab2de 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -362,6 +362,35 @@ func (data *latticeData) addFinalizedBlock(block *types.Block) (err error) {
return
}
+// isBindTip checks if a block's fields should follow up its parent block.
+func (data *latticeData) isBindTip(
+ pos types.Position, tip *types.Block) (bindTip bool, err error) {
+ if tip == nil {
+ return
+ }
+ if pos.Round < tip.Position.Round {
+ err = ErrInvalidRoundID
+ return
+ }
+ tipConfig := data.getConfig(tip.Position.Round)
+ if tip.Timestamp.After(tipConfig.roundEndTime) {
+ if pos.Round == tip.Position.Round {
+ err = ErrRoundNotSwitch
+ return
+ }
+ if pos.Round == tip.Position.Round+1 {
+ bindTip = true
+ }
+ } else {
+ if pos.Round != tip.Position.Round {
+ err = ErrInvalidRoundID
+ return
+ }
+ bindTip = true
+ }
+ return
+}
+
// prepareBlock setups fields of a block based on its ChainID and Round,
// including:
// - Acks
@@ -375,7 +404,6 @@ func (data *latticeData) prepareBlock(b *types.Block) error {
config *latticeDataConfig
acks common.Hashes
bindTip bool
- chainTip *types.Block
)
if config = data.getConfig(b.Position.Round); config == nil {
return ErrUnknownRoundID
@@ -388,30 +416,16 @@ func (data *latticeData) prepareBlock(b *types.Block) error {
b.Position.Height = 0
b.ParentHash = common.Hash{}
// Decide valid timestamp range.
- homeChain := data.chains[b.Position.ChainID]
- if homeChain.tip != nil {
- chainTip = homeChain.tip
- if b.Position.Round < chainTip.Position.Round {
- return ErrInvalidRoundID
- }
- chainTipConfig := data.getConfig(chainTip.Position.Round)
- if chainTip.Timestamp.After(chainTipConfig.roundEndTime) {
- if b.Position.Round == chainTip.Position.Round {
- return ErrRoundNotSwitch
- }
- if b.Position.Round == chainTip.Position.Round+1 {
- bindTip = true
- }
- } else {
- if b.Position.Round != chainTip.Position.Round {
- return ErrInvalidRoundID
- }
- bindTip = true
- }
+ chainTip := data.chains[b.Position.ChainID].tip
+ if chainTip != nil {
// TODO(mission): find a way to prevent us to assign a witness height
// from Jurassic period.
b.Witness.Height = chainTip.Witness.Height
}
+ bindTip, err := data.isBindTip(b.Position, chainTip)
+ if err != nil {
+ return err
+ }
// For blocks with continuous round ID, assign timestamp range based on
// parent block and bound config.
if bindTip {
@@ -461,40 +475,48 @@ func (data *latticeData) prepareBlock(b *types.Block) error {
// - ParentHash and Height from parent block. If there is no valid parent block
// (ex. Newly added chain or bootstrap), these fields would be setup as
// genesis block.
-func (data *latticeData) prepareEmptyBlock(b *types.Block) {
+func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) {
// emptyBlock has no proposer.
b.ProposerID = types.NodeID{}
- var acks common.Hashes
// Reset fields to make sure we got these information from parent block.
b.Position.Height = 0
- b.Position.Round = 0
b.ParentHash = common.Hash{}
b.Timestamp = time.Time{}
// Decide valid timestamp range.
- homeChain := data.chains[b.Position.ChainID]
- if homeChain.tip != nil {
- chainTip := homeChain.tip
+ config := data.getConfig(b.Position.Round)
+ chainTip := data.chains[b.Position.ChainID].tip
+ bindTip, err := data.isBindTip(b.Position, chainTip)
+ if err != nil {
+ return
+ }
+ if bindTip {
b.ParentHash = chainTip.Hash
- chainTipConfig := data.getConfig(chainTip.Position.Round)
- if chainTip.Timestamp.After(chainTipConfig.roundEndTime) {
- b.Position.Round = chainTip.Position.Round + 1
- } else {
- b.Position.Round = chainTip.Position.Round
- }
b.Position.Height = chainTip.Position.Height + 1
- b.Timestamp = chainTip.Timestamp.Add(chainTipConfig.minBlockTimeInterval)
+ b.Timestamp = chainTip.Timestamp.Add(config.minBlockTimeInterval)
b.Witness.Height = chainTip.Witness.Height
b.Witness.Data = make([]byte, len(chainTip.Witness.Data))
copy(b.Witness.Data, chainTip.Witness.Data)
- acks = append(acks, chainTip.Hash)
+ b.Acks = common.NewSortedHashes(common.Hashes{chainTip.Hash})
+ } else {
+ b.Timestamp = config.roundBeginTime
}
- b.Acks = common.NewSortedHashes(acks)
+ return
}
// TODO(mission): make more abstraction for this method.
// nextHeight returns the next height of a chain.
-func (data *latticeData) nextPosition(chainID uint32) types.Position {
- return data.chains[chainID].nextPosition()
+func (data *latticeData) nextHeight(
+ round uint64, chainID uint32) (uint64, error) {
+ chainTip := data.chains[chainID].tip
+ bindTip, err := data.isBindTip(
+ types.Position{Round: round, ChainID: chainID}, chainTip)
+ if err != nil {
+ return 0, err
+ }
+ if bindTip {
+ return chainTip.Position.Height + 1, nil
+ }
+ return 0, nil
}
// findBlock seeks blocks in memory or db.
@@ -609,21 +631,6 @@ func (s *chainStatus) addBlock(b *types.Block) {
s.tip = b
}
-// TODO(mission): change back to nextHeight.
-// nextPosition returns a valid position for new block in this chain.
-func (s *chainStatus) nextPosition() types.Position {
- if s.tip == nil {
- return types.Position{
- ChainID: s.ID,
- Height: 0,
- }
- }
- return types.Position{
- ChainID: s.ID,
- Height: s.tip.Position.Height + 1,
- }
-}
-
// purgeBlock purges a block from cache, make sure this block is already saved
// in blockdb.
func (s *chainStatus) purgeBlock(b *types.Block) error {
diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go
index 24f45e6..e939c81 100644
--- a/core/lattice-data_test.go
+++ b/core/lattice-data_test.go
@@ -550,11 +550,13 @@ func (s *LatticeDataTestSuite) TestPrepareBlock() {
req.Equal(b01.Position.Height, uint64(1))
}
-func (s *LatticeDataTestSuite) TestNextPosition() {
- // Test 'NextPosition' method when lattice is ready.
+func (s *LatticeDataTestSuite) TestNextHeight() {
+ // Test 'NextHeight' method when lattice is ready.
data, _ := s.genTestCase1()
- s.Equal(data.nextPosition(0), types.Position{ChainID: 0, Height: 4})
- // Test 'NextPosition' method when lattice is empty.
+ h, err := data.nextHeight(0, 0)
+ s.Require().NoError(err)
+ s.Require().Equal(h, uint64(4))
+ // Test 'NextHeight' method when lattice is empty.
// Setup a configuration that no restriction on block interval and
// round cutting.
genesisConfig := &types.Config{
@@ -563,7 +565,9 @@ func (s *LatticeDataTestSuite) TestNextPosition() {
MinBlockInterval: 1 * time.Second,
}
data = newLatticeData(nil, time.Now().UTC(), 0, genesisConfig)
- s.Equal(data.nextPosition(0), types.Position{ChainID: 0, Height: 0})
+ h, err = data.nextHeight(0, 0)
+ s.Require().NoError(err)
+ s.Require().Equal(h, uint64(0))
}
func (s *LatticeDataTestSuite) TestPrepareEmptyBlock() {
diff --git a/core/lattice.go b/core/lattice.go
index 7b66bd5..402a468 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -41,7 +41,6 @@ type Lattice struct {
pool blockPool
retryAdd bool
data *latticeData
- toSyncer *totalOrderingSyncer
toModule *totalOrdering
ctModule *consensusTimestamp
logger common.Logger
@@ -65,7 +64,6 @@ func NewLattice(
debug: debug,
pool: newBlockPool(cfg.NumChains),
data: newLatticeData(db, dMoment, round, cfg),
- toSyncer: newTotalOrderingSyncer(cfg.NumChains),
toModule: newTotalOrdering(dMoment, cfg),
ctModule: newConsensusTimestamp(dMoment, round, cfg.NumChains),
logger: logger,
@@ -102,7 +100,9 @@ func (l *Lattice) PrepareBlock(
func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) {
l.lock.RLock()
defer l.lock.RUnlock()
- l.data.prepareEmptyBlock(b)
+ if err = l.data.prepareEmptyBlock(b); err != nil {
+ return
+ }
if b.Hash, err = hashBlock(b); err != nil {
return
}
@@ -237,40 +237,37 @@ func (l *Lattice) ProcessBlock(
return
}
- for _, blockToSyncer := range inLattice {
- toTotalOrdering := l.toSyncer.processBlock(blockToSyncer)
- // Perform total ordering for each block added to lattice.
- for _, b = range toTotalOrdering {
- toDelivered, deliveredMode, err = l.toModule.processBlock(b)
- if err != nil {
- // All errors from total ordering is serious, should panic.
- panic(err)
- }
- if len(toDelivered) == 0 {
- continue
- }
- hashes := make(common.Hashes, len(toDelivered))
- for idx := range toDelivered {
- hashes[idx] = toDelivered[idx].Hash
- }
- if l.debug != nil {
- l.debug.TotalOrderingDelivered(hashes, deliveredMode)
- }
- // Perform consensus timestamp module.
- if err = l.ctModule.processBlocks(toDelivered); err != nil {
- return
- }
- delivered = append(delivered, toDelivered...)
+ for _, b = range inLattice {
+ toDelivered, deliveredMode, err = l.toModule.processBlock(b)
+ if err != nil {
+ // All errors from total ordering is serious, should panic.
+ panic(err)
+ }
+ if len(toDelivered) == 0 {
+ continue
+ }
+ hashes := make(common.Hashes, len(toDelivered))
+ for idx := range toDelivered {
+ hashes[idx] = toDelivered[idx].Hash
+ }
+ if l.debug != nil {
+ l.debug.TotalOrderingDelivered(hashes, deliveredMode)
+ }
+ // Perform consensus timestamp module.
+ if err = l.ctModule.processBlocks(toDelivered); err != nil {
+ return
}
+ delivered = append(delivered, toDelivered...)
}
return
}
-// NextPosition returns expected position of incoming block for specified chain.
-func (l *Lattice) NextPosition(chainID uint32) types.Position {
+// NextHeight returns expected height of incoming block for specified chain and
+// given round.
+func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) {
l.lock.RLock()
defer l.lock.RUnlock()
- return l.data.nextPosition(chainID)
+ return l.data.nextHeight(round, chainID)
}
// PurgeBlocks purges blocks' cache in memory, this is called when the caller
@@ -308,5 +305,4 @@ func (l *Lattice) ProcessFinalizedBlock(b *types.Block) {
panic(err)
}
l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
- l.toSyncer.processFinalizedBlock(b)
}
diff --git a/core/test/app.go b/core/test/app.go
index a5d0270..e67d5c9 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -198,6 +198,9 @@ func (app *App) GetLatestDeliveredPosition() types.Position {
defer app.deliveredLock.RUnlock()
app.blocksLock.RLock()
defer app.blocksLock.RUnlock()
+ if len(app.DeliverSequence) == 0 {
+ return types.Position{}
+ }
return app.blocks[app.DeliverSequence[len(app.DeliverSequence)-1]].Position
}
diff --git a/core/test/governance.go b/core/test/governance.go
index f96e9e7..14c8177 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -33,6 +33,9 @@ import (
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)
+// TODO(mission): add a method to compare config/crs between governance
+// instances.
+
// Governance is an implementation of Goverance for testing purpose.
type Governance struct {
roundShift uint64
@@ -105,6 +108,13 @@ func (g *Governance) NotifyRoundHeight(round, height uint64) {
func() {
g.lock.Lock()
defer g.lock.Unlock()
+ // Check if there is any pending changes for previous rounds.
+ for r := range g.pendingConfigChanges {
+ if r < shiftedRound+1 {
+ panic(fmt.Errorf("pending change no longer applied: %v, now: %v",
+ r, shiftedRound+1))
+ }
+ }
for t, v := range g.pendingConfigChanges[shiftedRound+1] {
if err := g.stateModule.RequestChange(t, v); err != nil {
panic(err)
@@ -346,7 +356,7 @@ func (g *Governance) RegisterConfigChange(
}
g.lock.Lock()
defer g.lock.Unlock()
- if round <= uint64(len(g.configs)) {
+ if round < uint64(len(g.configs)) {
return errors.New(
"attempt to register state change for prepared rounds")
}
diff --git a/core/test/governance_test.go b/core/test/governance_test.go
index 07b0d46..01993f9 100644
--- a/core/test/governance_test.go
+++ b/core/test/governance_test.go
@@ -77,13 +77,13 @@ func (s *GovernanceTestSuite) TestRegisterChange() {
req.Equal(g.Configuration(4).NumChains, uint32(20))
// Unable to register change for prepared round.
req.Error(g.RegisterConfigChange(4, StateChangeNumChains, uint32(32)))
- // Unable to register change for next notified round.
- req.Error(g.RegisterConfigChange(5, StateChangeNumChains, uint32(32)))
// It's ok to make some change when condition is met.
+ req.NoError(g.RegisterConfigChange(5, StateChangeNumChains, uint32(32)))
req.NoError(g.RegisterConfigChange(6, StateChangeNumChains, uint32(32)))
req.NoError(g.RegisterConfigChange(7, StateChangeNumChains, uint32(40)))
// In local mode, state for round 6 would be ready after notified with
- // round 5.
+ // round 2.
+ g.NotifyRoundHeight(2, 0)
g.NotifyRoundHeight(3, 0)
// In local mode, state for round 7 would be ready after notified with
// round 6.
diff --git a/core/types/block-randomness.go b/core/types/block-randomness.go
index 6df245b..1c64d4a 100644
--- a/core/types/block-randomness.go
+++ b/core/types/block-randomness.go
@@ -25,9 +25,10 @@ import (
// AgreementResult describes an agremeent result.
type AgreementResult struct {
- BlockHash common.Hash `json:"block_hash"`
- Position Position `json:"position"`
- Votes []Vote `json:"votes"`
+ BlockHash common.Hash `json:"block_hash"`
+ Position Position `json:"position"`
+ Votes []Vote `json:"votes"`
+ IsEmptyBlock bool `json:"is_empty_block"`
}
func (r *AgreementResult) String() string {
diff --git a/core/utils.go b/core/utils.go
index f6be461..4e9cfdc 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -20,6 +20,7 @@ package core
import (
"errors"
"fmt"
+ "math/rand"
"os"
"sort"
"time"
@@ -114,6 +115,11 @@ func removeFromSortedUint32Slice(xs []uint32, x uint32) []uint32 {
return append(xs[:indexToRemove], xs[indexToRemove+1:]...)
}
+// pickBiasedTime returns a biased time based on a given range.
+func pickBiasedTime(base time.Time, biasedRange time.Duration) time.Time {
+ return base.Add(time.Duration(rand.Intn(int(biasedRange))))
+}
+
// HashConfigurationBlock returns the hash value of configuration block.
func HashConfigurationBlock(
notarySet map[types.NodeID]struct{},
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 6bc6c4b..8fd3fa4 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -95,6 +95,18 @@ func (s *ConsensusTestSuite) setupNodes(
return nodes
}
+func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) {
+ for ID, node := range nodes {
+ s.Require().NoError(node.app.Verify())
+ for otherID, otherNode := range nodes {
+ if ID == otherID {
+ continue
+ }
+ s.Require().NoError(node.app.Compare(otherNode.app))
+ }
+ }
+}
+
func (s *ConsensusTestSuite) TestSimple() {
// The simplest test case:
// - Node set is equals to DKG set and notary set for each chain in each
@@ -108,15 +120,18 @@ func (s *ConsensusTestSuite) TestSimple() {
dMoment = time.Now().UTC()
untilRound = uint64(5)
)
+ if testing.Short() {
+ untilRound = 2
+ }
prvKeys, pubKeys, err := test.NewKeys(peerCount)
req.NoError(err)
// Setup seed governance instance. Give a short latency to make this test
// run faster.
seedGov, err := test.NewGovernance(
- pubKeys, 30*time.Millisecond, core.ConfigRoundShift)
+ pubKeys, 100*time.Millisecond, core.ConfigRoundShift)
req.NoError(err)
req.NoError(seedGov.State().RequestChange(
- test.StateChangeRoundInterval, 25*time.Second))
+ test.StateChangeRoundInterval, 50*time.Second))
// A short round interval.
nodes := s.setupNodes(dMoment, prvKeys, seedGov)
for _, n := range nodes {
@@ -136,6 +151,79 @@ Loop:
// Oh ya.
break
}
+ s.verifyNodes(nodes)
+}
+
+func (s *ConsensusTestSuite) TestNumChainsChange() {
+ var (
+ req = s.Require()
+ peerCount = 4
+ dMoment = time.Now().UTC()
+ untilRound = uint64(6)
+ )
+ if testing.Short() {
+ // Short test won't test configuration change packed as payload of
+ // blocks and applied when delivered.
+ untilRound = 5
+ }
+ prvKeys, pubKeys, err := test.NewKeys(peerCount)
+ req.NoError(err)
+ // Setup seed governance instance.
+ seedGov, err := test.NewGovernance(
+ pubKeys, 100*time.Millisecond, core.ConfigRoundShift)
+ req.NoError(err)
+ // Setup configuration for round 0 and round 1.
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundInterval, 45*time.Second))
+ seedGov.CatchUpWithRound(1)
+ // Setup configuration for round 2.
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeNumChains, uint32(5)))
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundInterval, 55*time.Second))
+ seedGov.CatchUpWithRound(2)
+ // Setup configuration for round 3.
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeNumChains, uint32(6)))
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundInterval, 75*time.Second))
+ seedGov.CatchUpWithRound(3)
+ // Setup nodes.
+ nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+ // Pick master node, and register changes on it.
+ var pickedNode *node
+ for _, pickedNode = range nodes {
+ break
+ }
+ // Register configuration changes for round 4.
+ req.NoError(pickedNode.gov.RegisterConfigChange(
+ 4, test.StateChangeNumChains, uint32(4)))
+ req.NoError(pickedNode.gov.RegisterConfigChange(
+ 4, test.StateChangeRoundInterval, 45*time.Second))
+ // Register configuration changes for round 5.
+ req.NoError(pickedNode.gov.RegisterConfigChange(
+ 5, test.StateChangeNumChains, uint32(5)))
+ req.NoError(pickedNode.gov.RegisterConfigChange(
+ 5, test.StateChangeRoundInterval, 55*time.Second))
+ // Run test.
+ for _, n := range nodes {
+ go n.con.Run(&types.Block{})
+ }
+Loop:
+ for {
+ <-time.After(5 * time.Second)
+ s.T().Log("check latest position delivered by each node")
+ for _, n := range nodes {
+ latestPos := n.app.GetLatestDeliveredPosition()
+ s.T().Log("latestPos", n.con.ID, &latestPos)
+ if latestPos.Round < untilRound {
+ continue Loop
+ }
+ }
+ // Oh ya.
+ break
+ }
+ s.verifyNodes(nodes)
}
func TestConsensus(t *testing.T) {
diff --git a/integration_test/with_scheduler_test.go b/integration_test/with_scheduler_test.go
index a9f229a..6a94f01 100644
--- a/integration_test/with_scheduler_test.go
+++ b/integration_test/with_scheduler_test.go
@@ -112,28 +112,28 @@ func (s *WithSchedulerTestSuite) TestConfigurationChange() {
for _, pickedNode = range nodes {
break
}
- // Config changes for round 4, numChains from 4 to 7.
+ // Config changes for round 5, numChains from 4 to 7.
req.NoError(pickedNode.gov().RegisterConfigChange(
- 4, test.StateChangeNumChains, uint32(7)))
+ 5, test.StateChangeNumChains, uint32(7)))
req.NoError(pickedNode.gov().RegisterConfigChange(
- 4, test.StateChangeK, 3))
+ 5, test.StateChangeK, 3))
req.NoError(pickedNode.gov().RegisterConfigChange(
- 4, test.StateChangePhiRatio, float32(0.5)))
- // Config changes for round 5, numChains from 7 to 9.
+ 5, test.StateChangePhiRatio, float32(0.5)))
+ // Config changes for round 6, numChains from 7 to 9.
req.NoError(pickedNode.gov().RegisterConfigChange(
- 5, test.StateChangeNumChains, maxNumChains))
+ 6, test.StateChangeNumChains, maxNumChains))
req.NoError(pickedNode.gov().RegisterConfigChange(
- 5, test.StateChangeK, 0))
- // Config changes for round 6, numChains from 9 to 7.
+ 6, test.StateChangeK, 0))
+ // Config changes for round 7, numChains from 9 to 7.
req.NoError(pickedNode.gov().RegisterConfigChange(
- 6, test.StateChangeNumChains, uint32(7)))
+ 7, test.StateChangeNumChains, uint32(7)))
req.NoError(pickedNode.gov().RegisterConfigChange(
- 6, test.StateChangeK, 1))
- // Config changes for round 6, numChains from 7 to 5.
+ 7, test.StateChangeK, 1))
+ // Config changes for round 8, numChains from 7 to 5.
req.NoError(pickedNode.gov().RegisterConfigChange(
- 7, test.StateChangeNumChains, uint32(5)))
+ 8, test.StateChangeNumChains, uint32(5)))
req.NoError(pickedNode.gov().RegisterConfigChange(
- 7, test.StateChangeK, 1))
+ 8, test.StateChangeK, 1))
// Perform test.
sch.Run(4)
// Check results by comparing test.App instances.
diff --git a/simulation/simulation.go b/simulation/simulation.go
index 4e97900..dcb7225 100644
--- a/simulation/simulation.go
+++ b/simulation/simulation.go
@@ -44,7 +44,7 @@ func Run(cfg *config.Config) {
panic(fmt.Errorf("DKGSetSze should not be larger the node num"))
}
- dMoment := time.Now().UTC().Add(1 * time.Second)
+ dMoment := time.Now().UTC()
// init is a function to init a node.
init := func(serverEndpoint interface{}) {