aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/consensus.go42
-rw-r--r--core/syncer/consensus.go24
-rw-r--r--core/total-ordering.go20
-rw-r--r--core/utils.go20
4 files changed, 78 insertions, 28 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 44ce43f..0d4a38a 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -601,7 +601,11 @@ func (con *Consensus) prepare(initBlock *types.Block) error {
// Setup lattice module.
initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger)
if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil {
- return err
+ if err == ErrRoundNotIncreasing {
+ err = nil
+ } else {
+ return err
+ }
}
// Register events.
dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
@@ -732,23 +736,37 @@ func (con *Consensus) initialRound(
}()
})
}
+ // checkCRS is a generator of checker to check if CRS for that round is
+ // ready or not.
+ checkCRS := func(round uint64) func() bool {
+ return func() bool {
+ nextCRS := con.gov.CRS(round)
+ if (nextCRS != common.Hash{}) {
+ return true
+ }
+ con.logger.Info("CRS is not ready yet. Try again later...",
+ "nodeID", con.ID,
+ "round", round)
+ return false
+ }
+ }
// Initiate BA modules.
con.event.RegisterTime(
startTime.Add(config.RoundInterval/2+config.LambdaDKG),
func(time.Time) {
go func(nextRound uint64) {
- for (con.gov.CRS(nextRound) == common.Hash{}) {
- con.logger.Info("CRS is not ready yet. Try again later...",
- "nodeID", con.ID,
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Info("unable to prepare CRS for baMgr",
"round", nextRound)
- time.Sleep(500 * time.Millisecond)
+ return
}
// Notify BA for new round.
nextConfig := utils.GetConfigWithPanic(
con.gov, nextRound, con.logger)
- con.logger.Debug("Calling Governance.CRS",
- "round", nextRound)
- nextCRS := utils.GetCRSWithPanic(con.gov, nextRound, con.logger)
+ nextCRS := utils.GetCRSWithPanic(
+ con.gov, nextRound, con.logger)
+ con.logger.Info("appendConfig for baMgr", "round", nextRound)
if err := con.baMgr.appendConfig(
nextRound, nextConfig, nextCRS); err != nil {
panic(err)
@@ -761,11 +779,11 @@ func (con *Consensus) initialRound(
go func(nextRound uint64) {
// Normally, gov.CRS would return non-nil. Use this for in case of
// unexpected network fluctuation and ensure the robustness.
- for (con.gov.CRS(nextRound) == common.Hash{}) {
- con.logger.Info("CRS is not ready yet. Try again later...",
- "nodeID", con.ID,
+ if !checkWithCancel(
+ con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
+ con.logger.Info("unable to prepare CRS for DKG set",
"round", nextRound)
- time.Sleep(500 * time.Millisecond)
+ return
}
nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
if err != nil {
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 43a9e21..32bbab3 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -446,9 +446,10 @@ func (con *Consensus) SyncBlocks(
return false, err
}
if syncBlock != nil {
- con.logger.Info("deliver set found", syncBlock)
+ con.logger.Info("deliver set found", "block", syncBlock)
// New lattice with the round of syncBlock.
con.initConsensusObj(syncBlock)
+ con.setupConfigs(blocks)
// Process blocks from syncBlock to blocks' last block.
b := blocks[len(blocks)-1]
blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
@@ -586,7 +587,7 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) {
con.resizeByNumChains(curMaxNumChains)
// Notify core.Lattice for new configs.
if con.lattice != nil {
- for con.latticeLastRound+1 <= maxRound {
+ for con.latticeLastRound+1 <= untilRound {
con.latticeLastRound++
if err := con.lattice.AppendConfig(
con.latticeLastRound,
@@ -714,14 +715,12 @@ func (con *Consensus) startCRSMonitor() {
select {
case <-con.ctx.Done():
return
- case <-time.After(1 * time.Second):
- // Notify agreement modules for the latest round that CRS is
- // available if the round is not notified yet.
- var crsRound = lastNotifiedRound
- for (con.gov.CRS(crsRound+1) != common.Hash{}) {
- crsRound++
- }
- notifyNewCRS(crsRound)
+ case <-time.After(500 * time.Millisecond):
+ }
+ // Notify agreement modules for the latest round that CRS is
+ // available if the round is not notified yet.
+ if (con.gov.CRS(lastNotifiedRound+1) != common.Hash{}) {
+ notifyNewCRS(lastNotifiedRound + 1)
}
}
}()
@@ -732,7 +731,10 @@ func (con *Consensus) stopAgreement() {
con.lock.Lock()
defer con.lock.Unlock()
for _, a := range con.agreements {
- close(a.inputChan)
+ if a.inputChan != nil {
+ close(a.inputChan)
+ a.inputChan = nil
+ }
}
}()
con.agreementWaitGroup.Wait()
diff --git a/core/total-ordering.go b/core/total-ordering.go
index 3bf6946..744471a 100644
--- a/core/total-ordering.go
+++ b/core/total-ordering.go
@@ -19,6 +19,7 @@ package core
import (
"errors"
+ "fmt"
"math"
"sort"
"sync"
@@ -1017,7 +1018,7 @@ func (to *totalOrdering) generateDeliverSet() (
chainID, otherChainID uint32
info, otherInfo *totalOrderingCandidateInfo
precedings = make(map[uint32]struct{})
- cfg = to.configs[to.curRound-to.configs[0].roundID]
+ cfg = to.getCurrentConfig()
)
mode = TotalOrderingModeNormal
to.globalVector.updateCandidateInfo(to.dirtyChainIDs, to.objCache)
@@ -1162,7 +1163,7 @@ func (to *totalOrdering) flushBlocks(
b *types.Block) (flushed []*types.Block, mode uint32, err error) {
mode = TotalOrderingModeFlush
- cfg := to.configs[to.curRound-to.configs[0].roundID]
+ cfg := to.getCurrentConfig()
if cfg.isLastBlock(b) {
to.flushReadyChains[b.Position.ChainID] = struct{}{}
}
@@ -1216,7 +1217,7 @@ func (to *totalOrdering) flushBlocks(
to.globalVector.cachedCandidateInfo = nil
to.switchRound()
// Force picking new candidates.
- numChains := to.configs[to.curRound-to.configs[0].roundID].numChains
+ numChains := to.getCurrentConfig().numChains
to.output(map[common.Hash]struct{}{}, numChains)
return
}
@@ -1226,7 +1227,7 @@ func (to *totalOrdering) deliverBlocks() (
delivered []*types.Block, mode uint32, err error) {
hashes, mode := to.generateDeliverSet()
- cfg := to.configs[to.curRound-to.configs[0].roundID]
+ cfg := to.getCurrentConfig()
// Output precedings.
delivered = to.output(hashes, cfg.numChains)
// Check if any block in delivered set is the last block in this round, if
@@ -1271,12 +1272,21 @@ func (to *totalOrdering) deliverBlocks() (
return
}
+func (to *totalOrdering) getCurrentConfig() *totalOrderingConfig {
+ cfgIdx := to.curRound - to.configs[0].roundID
+ if cfgIdx >= uint64(len(to.configs)) {
+ panic(fmt.Errorf("total ordering config is not ready: %v, %v, %v",
+ to.curRound, to.configs[0].roundID, len(to.configs)))
+ }
+ return to.configs[cfgIdx]
+}
+
// processBlock is the entry point of totalOrdering.
func (to *totalOrdering) processBlock(
b *types.Block) ([]*types.Block, uint32, error) {
// NOTE: Block b is assumed to be in topologically sorted, i.e., all its
// acking blocks are during or after total ordering stage.
- cfg := to.configs[to.curRound-to.configs[0].roundID]
+ cfg := to.getCurrentConfig()
to.pendings[b.Hash] = b
to.buildBlockRelation(b)
isOldest, err := to.updateVectors(b)
diff --git a/core/utils.go b/core/utils.go
index 838369c..0c2d155 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -18,6 +18,7 @@
package core
import (
+ "context"
"errors"
"fmt"
"math/rand"
@@ -214,3 +215,22 @@ func isTravisCI() bool {
func getDKGThreshold(config *types.Config) int {
return int(config.DKGSetSize/3) + 1
}
+
+// checkWithCancel is a helper to perform periodic checking with cancel.
+func checkWithCancel(parentCtx context.Context, interval time.Duration,
+ checker func() bool) (ret bool) {
+ ctx, cancel := context.WithCancel(parentCtx)
+ defer cancel()
+Loop:
+ for {
+ select {
+ case <-ctx.Done():
+ break Loop
+ case <-time.After(interval):
+ }
+ if ret = checker(); ret {
+ return
+ }
+ }
+ return
+}