aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-12-26 15:23:54 +0800
committerGitHub <noreply@github.com>2018-12-26 15:23:54 +0800
commit39c02fe0f7c81491ea897fafcf32595d280bbdbe (patch)
tree1ac3d002de42bb7471624656713e331db55aaea2
parent00416c9df2fec5398389863fb6f885a1fe11a6cc (diff)
downloadtangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar
tangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.gz
tangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.bz2
tangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.lz
tangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.xz
tangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.tar.zst
tangerine-consensus-39c02fe0f7c81491ea897fafcf32595d280bbdbe.zip
core: fix stuffs (#383)
* Merge core.Consensus constructors * Downgrade severity of logs * Refine logic to add blocks from pool to lattice * Add test.LaunchDummyReceiver
-rw-r--r--core/agreement-mgr.go2
-rw-r--r--core/configuration-chain.go4
-rw-r--r--core/consensus.go181
-rw-r--r--core/lattice.go17
-rw-r--r--core/syncer/agreement.go4
-rw-r--r--core/syncer/consensus.go12
-rw-r--r--core/test/utils.go19
-rw-r--r--integration_test/consensus_test.go70
8 files changed, 128 insertions, 181 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index a9fa21d..2b5c4bc 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -313,7 +313,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
if config = mgr.getConfig(nextRound); config != nil {
break
} else {
- mgr.logger.Info("round is not ready", "round", nextRound)
+ mgr.logger.Debug("round is not ready", "round", nextRound)
time.Sleep(1 * time.Second)
}
}
diff --git a/core/configuration-chain.go b/core/configuration-chain.go
index ad24e44..5c389a7 100644
--- a/core/configuration-chain.go
+++ b/core/configuration-chain.go
@@ -135,7 +135,7 @@ func (cc *configurationChain) runDKG(round uint64) error {
}
cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round)
for !cc.gov.IsDKGMPKReady(round) {
- cc.logger.Info("DKG MPKs are not ready yet. Try again later...",
+ cc.logger.Debug("DKG MPKs are not ready yet. Try again later...",
"nodeID", cc.ID)
cc.dkgLock.Unlock()
time.Sleep(500 * time.Millisecond)
@@ -206,7 +206,7 @@ func (cc *configurationChain) runDKG(round uint64) error {
// unexpected network fluctuation and ensure the robustness of DKG protocol.
cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round)
for !cc.gov.IsDKGFinal(round) {
- cc.logger.Info("DKG is not ready yet. Try again later...",
+ cc.logger.Debug("DKG is not ready yet. Try again later...",
"nodeID", cc.ID)
time.Sleep(500 * time.Millisecond)
}
diff --git a/core/consensus.go b/core/consensus.go
index 7f77df9..cab28cb 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -399,8 +399,8 @@ func NewConsensus(
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
-
- return newConsensus(dMoment, app, gov, db, network, prv, logger, true)
+ return newConsensusForRound(
+ &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true)
}
// NewConsensusForSimulation creates an instance of Consensus for simulation,
@@ -413,19 +413,60 @@ func NewConsensusForSimulation(
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
+ return newConsensusForRound(
+ &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false)
+}
- return newConsensus(dMoment, app, gov, db, network, prv, logger, false)
+// NewConsensusFromSyncer constructs an Consensus instance from information
+// provided from syncer.
+//
+// You need to provide the initial block for this newly created Consensus
+// instance to bootstrap with. A proper choice is the last finalized block you
+// delivered to syncer.
+func NewConsensusFromSyncer(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ networkModule Network,
+ prv crypto.PrivateKey,
+ latticeModule *Lattice,
+ blocks []*types.Block,
+ randomnessResults []*types.BlockRandomnessResult,
+ logger common.Logger) (*Consensus, error) {
+ // Setup Consensus instance.
+ con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
+ networkModule, prv, logger, latticeModule, true)
+ // Dump all BA-confirmed blocks to the consensus instance.
+ for _, b := range blocks {
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ // Dump all randomness result to the consensus instance.
+ for _, r := range randomnessResults {
+ if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
+ con.logger.Error("failed to process randomness result when syncing",
+ "result", r)
+ continue
+ }
+ }
+ return con, nil
}
// newConsensus creates a Consensus instance.
-func newConsensus(
- dMoment time.Time,
+func newConsensusForRound(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
app Application,
gov Governance,
db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger,
+ latticeModule *Lattice,
usingNonBlocking bool) *Consensus {
// TODO(w): load latest blockHeight from DB, and use config at that height.
@@ -437,12 +478,14 @@ func newConsensus(
if a, ok := app.(Debug); ok {
debugApp = a
}
- // Get configuration for genesis round.
- var round uint64
- config := utils.GetConfigWithPanic(gov, round, logger)
+ // Get configuration for bootstrap round.
+ initRound := initBlock.Position.Round
+ initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
// Init lattice.
- lattice := NewLattice(
- dMoment, round, config, signer, app, debugApp, db, logger)
+ if latticeModule == nil {
+ latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig,
+ signer, app, debugApp, db, logger)
+ }
// Init configuration chain.
ID := types.NewNodeID(prv.PublicKey())
recv := &consensusDKGReceiver{
@@ -453,13 +496,7 @@ func newConsensus(
network: network,
logger: logger,
}
- cfgModule := newConfigurationChain(
- ID,
- recv,
- gov,
- nodeSetCache,
- db,
- logger)
+ cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger)
recv.cfgModule = cfgModule
appModule := app
if usingNonBlocking {
@@ -467,86 +504,14 @@ func newConsensus(
}
// Construct Consensus instance.
con := &Consensus{
- ID: ID,
- ccModule: newCompactionChain(gov),
- lattice: lattice,
- app: appModule,
- debugApp: debugApp,
- gov: gov,
- db: db,
- network: network,
- baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
- dMoment: dMoment,
- nodeSetCache: nodeSetCache,
- signer: signer,
- event: common.NewEvent(),
- logger: logger,
- resetRandomnessTicker: make(chan struct{}),
- }
- con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, round, dMoment)
- if err := con.prepare(&types.Block{}); err != nil {
- panic(err)
- }
- return con
-}
-
-// NewConsensusFromSyncer constructs an Consensus instance from information
-// provided from syncer.
-//
-// You need to provide the initial block for this newly created Consensus
-// instance to bootstrap with. A proper choice is the last finalized block you
-// delivered to syncer.
-func NewConsensusFromSyncer(
- initBlock *types.Block,
- initRoundBeginTime time.Time,
- app Application,
- gov Governance,
- db db.Database,
- networkModule Network,
- prv crypto.PrivateKey,
- latticeModule *Lattice,
- blocks []*types.Block,
- randomnessResults []*types.BlockRandomnessResult,
- logger common.Logger) (*Consensus, error) {
- // Setup the cache for node sets.
- nodeSetCache := utils.NewNodeSetCache(gov)
- // Setup signer module.
- signer := utils.NewSigner(prv)
- // Init configuration chain.
- ID := types.NewNodeID(prv.PublicKey())
- recv := &consensusDKGReceiver{
- ID: ID,
- gov: gov,
- signer: signer,
- nodeSetCache: nodeSetCache,
- network: networkModule,
- logger: logger,
- }
- cfgModule := newConfigurationChain(
- ID,
- recv,
- gov,
- nodeSetCache,
- db,
- logger)
- recv.cfgModule = cfgModule
- // Check if the application implement Debug interface.
- var debugApp Debug
- if a, ok := app.(Debug); ok {
- debugApp = a
- }
- // Setup Consensus instance.
- con := &Consensus{
ID: ID,
ccModule: newCompactionChain(gov),
lattice: latticeModule,
- app: newNonBlocking(app, debugApp),
+ app: appModule,
+ debugApp: debugApp,
gov: gov,
db: db,
- network: networkModule,
+ network: network,
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
@@ -557,27 +522,11 @@ func NewConsensusFromSyncer(
logger: logger,
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime)
- // Bootstrap the consensus instance.
+ con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
if err := con.prepare(initBlock); err != nil {
- return nil, err
- }
- // Dump all BA-confirmed blocks to the consensus instance.
- for _, b := range blocks {
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
- }
- }
- // Dump all randomness result to the consensus instance.
- for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
- con.logger.Error("failed to process randomness result when syncing",
- "result", r)
- continue
- }
+ panic(err)
}
- return con, nil
+ return con
}
// prepare the Consensus instance to be ready for blocks after 'initBlock'.
@@ -678,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) {
con.logger.Debug("Calling Governance.CRS to check if already proposed",
"round", round+1)
if (con.gov.CRS(round+1) != common.Hash{}) {
- con.logger.Info("CRS already proposed", "round", round+1)
+ con.logger.Debug("CRS already proposed", "round", round+1)
return
}
con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
@@ -749,7 +698,7 @@ func (con *Consensus) initialRound(
if (nextCRS != common.Hash{}) {
return true
}
- con.logger.Info("CRS is not ready yet. Try again later...",
+ con.logger.Debug("CRS is not ready yet. Try again later...",
"nodeID", con.ID,
"round", round)
return false
@@ -762,7 +711,7 @@ func (con *Consensus) initialRound(
go func(nextRound uint64) {
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Info("unable to prepare CRS for baMgr",
+ con.logger.Debug("unable to prepare CRS for baMgr",
"round", nextRound)
return
}
@@ -786,7 +735,7 @@ func (con *Consensus) initialRound(
// unexpected network fluctuation and ensure the robustness.
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Info("unable to prepare CRS for DKG set",
+ con.logger.Debug("unable to prepare CRS for DKG set",
"round", nextRound)
return
}
@@ -1158,7 +1107,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
if con.nonFinalizedBlockDelivered {
panic(fmt.Errorf("attempting to skip finalized block: %s", b))
}
- con.logger.Info("skip delivery of finalized block",
+ con.logger.Debug("skip delivery of finalized block",
"block", b,
"finalization-height", b.Finalization.Height)
continue
diff --git a/core/lattice.go b/core/lattice.go
index db19cf9..591c63d 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -193,17 +193,28 @@ func (l *Lattice) addBlockToLattice(
if err == nil {
var output []*types.Block
if output, err = l.data.addBlock(tip); err != nil {
- l.logger.Error("Sanity Check failed", "error", err)
- continue
+ // We should be able to add this block once sanity check
+ // passed.
+ l.logger.Error("Failed to add sanity-checked block",
+ "block", tip, "error", err)
+ panic(err)
}
hasOutput = true
outputBlocks = append(outputBlocks, output...)
+ l.pool.removeTip(i)
+ continue
}
if _, ok := err.(*ErrAckingBlockNotExists); ok {
+ l.logger.Debug("Pending block for lattice",
+ "pending", tip,
+ "last", l.data.chains[tip.Position.ChainID])
err = nil
continue
+ } else {
+ l.logger.Error("Unexpected sanity check error",
+ "block", tip, "error", err)
+ panic(err)
}
- l.pool.removeTip(i)
}
if !hasOutput {
break
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index fee4624..32ea654 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -106,7 +106,7 @@ func (a *agreement) processBlock(b *types.Block) {
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
- a.logger.Info("agreement result already confirmed", "result", r)
+ a.logger.Debug("agreement result already confirmed", "result", r)
return
}
if r.Position.Round > a.latestCRSRound {
@@ -116,7 +116,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) {
a.pendings[r.Position.Round] = pendingsForRound
}
pendingsForRound[r.BlockHash] = r
- a.logger.Info("agreement result cached", "result", r)
+ a.logger.Debug("agreement result cached", "result", r)
return
}
if err := core.VerifyAgreementResult(r, a.cache); err != nil {
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 32bbab3..c767a6d 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -153,7 +153,7 @@ func (con *Consensus) checkIfValidated() bool {
if validatedChainCount == numChains {
return true
}
- con.logger.Info("not validated yet", "validated-chain", validatedChainCount)
+ con.logger.Debug("not validated yet", "validated-chain", validatedChainCount)
return false
}
@@ -197,7 +197,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
if overlapCount == numChains {
return true
}
- con.logger.Info("not synced yet",
+ con.logger.Debug("not synced yet",
"overlap-count", overlapCount,
"num-chain", numChains,
"last-block", blocks[len(blocks)-1])
@@ -262,7 +262,7 @@ func (con *Consensus) ensureAgreementOverlapRound() bool {
}
if tipRoundMap[r] == con.configs[r].NumChains {
con.agreementRoundCut = r
- con.logger.Info("agreement round cut found, round", r)
+ con.logger.Debug("agreement round cut found, round", r)
return true
}
}
@@ -411,7 +411,7 @@ func (con *Consensus) SyncBlocks(
"expected", tipHeight+1)
return false, ErrInvalidSyncingFinalizationHeight
}
- con.logger.Info("syncBlocks",
+ con.logger.Debug("syncBlocks",
"position", &blocks[0].Position,
"final height", blocks[0].Finalization.Height,
"len", len(blocks),
@@ -446,7 +446,7 @@ func (con *Consensus) SyncBlocks(
return false, err
}
if syncBlock != nil {
- con.logger.Info("deliver set found", "block", syncBlock)
+ con.logger.Debug("deliver set found", "block", syncBlock)
// New lattice with the round of syncBlock.
con.initConsensusObj(syncBlock)
con.setupConfigs(blocks)
@@ -700,7 +700,7 @@ func (con *Consensus) startCRSMonitor() {
if round == lastNotifiedRound {
return
}
- con.logger.Info("CRS is ready", "round", round)
+ con.logger.Debug("CRS is ready", "round", round)
con.lock.RLock()
defer con.lock.RUnlock()
lastNotifiedRound = round
diff --git a/core/test/utils.go b/core/test/utils.go
index 84c90a9..8a14ebf 100644
--- a/core/test/utils.go
+++ b/core/test/utils.go
@@ -18,6 +18,7 @@
package test
import (
+ "context"
"errors"
"fmt"
"math"
@@ -225,3 +226,21 @@ func VerifyDB(db db.Database) error {
}
return nil
}
+
+// LaunchDummyReceiver launches a go routine to receive and drop messages from
+// a network module. An optional context could be passed to stop the go routine.
+func LaunchDummyReceiver(
+ ctx context.Context, networkModule *Network) context.CancelFunc {
+ dummyCtx, dummyCancel := context.WithCancel(ctx)
+ go func() {
+ loop:
+ for {
+ select {
+ case <-dummyCtx.Done():
+ break loop
+ case <-networkModule.ReceiveChan():
+ }
+ }
+ }()
+ return dummyCancel
+}
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index c1b812b..d70ae4f 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -353,53 +353,27 @@ func (s *ConsensusTestSuite) TestSync() {
}
// Clean syncNode's network receive channel, or it might exceed the limit
// and block other go routines.
- dummyReceiverCtx, dummyReceiverCtxCancel := context.WithCancel(
- context.Background())
- go func() {
- Loop:
- for {
- select {
- case <-syncNode.network.ReceiveChan():
- case <-dummyReceiverCtx.Done():
- break Loop
- }
- }
- }()
- // Print status every 5 seconds so CI won't fail.
- monitorCtx, monitorCancel := context.WithCancel(
- context.Background())
- defer monitorCancel()
- go func() {
- for {
- select {
- case <-time.After(5 * time.Second):
- for _, n := range nodes {
- pos := n.app.GetLatestDeliveredPosition()
- fmt.Println("latestPos", n.ID, &pos)
- break
- }
- case <-monitorCtx.Done():
- return
- }
- }
- }()
+ dummyReceiverCtxCancel := test.LaunchDummyReceiver(
+ context.Background(), syncNode.network)
ReachAlive:
for {
+ // Check if any error happened or sleep for a period of time.
+ select {
+ case err := <-errChan:
+ req.NoError(err)
+ case <-time.After(5 * time.Second):
+ }
// If all nodes excepts syncNode have reached aliveRound, call syncNode's
// Run() and send it all blocks in one of normal node's compaction chain.
for id, n := range nodes {
if id == syncNode.ID {
continue
}
- if n.app.GetLatestDeliveredPosition().Round < aliveRound {
+ pos := n.app.GetLatestDeliveredPosition()
+ if pos.Round < aliveRound {
+ fmt.Println("latestPos", n.ID, &pos)
continue ReachAlive
}
- // Check if any error happened or sleep for a period of time.
- select {
- case err := <-errChan:
- req.NoError(err)
- case <-time.After(5 * time.Second):
- }
}
dummyReceiverCtxCancel()
break
@@ -446,34 +420,28 @@ ReachAlive:
}()
// Wait until all nodes reach 'untilRound'.
go func() {
+ n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition()
ReachFinished:
for {
+ fmt.Println("latestPos", n.ID, &pos)
time.Sleep(5 * time.Second)
- for _, n := range nodes {
+ for _, n = range nodes {
+ pos = n.app.GetLatestDeliveredPosition()
if n.ID == stoppedNode.ID {
if n.con == nil {
continue
}
- if n.app.GetLatestDeliveredPosition().Round < stopRound {
- continue
+ if pos.Round < stopRound {
+ continue ReachFinished
}
// Stop a node, we should still be able to proceed.
stoppedNode.con.Stop()
stoppedNode.con = nil
fmt.Println("one node stopped", stoppedNode.ID)
- // Initiate a dummy routine to consume the receive channel.
- go func() {
- for {
- select {
- case <-runnerCtx.Done():
- return
- case <-stoppedNode.network.ReceiveChan():
- }
- }
- }()
+ test.LaunchDummyReceiver(runnerCtx, stoppedNode.network)
continue
}
- if n.app.GetLatestDeliveredPosition().Round < untilRound {
+ if pos.Round < untilRound {
continue ReachFinished
}
}