aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go203
1 files changed, 87 insertions, 116 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index 0d4a38a91..3353d1d60 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -387,6 +387,7 @@ type Consensus struct {
event *common.Event
logger common.Logger
nonFinalizedBlockDelivered bool
+ resetRandomnessTicker chan struct{}
}
// NewConsensus construct an Consensus instance.
@@ -398,8 +399,8 @@ func NewConsensus(
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
-
- return newConsensus(dMoment, app, gov, db, network, prv, logger, true)
+ return newConsensusForRound(
+ &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true)
}
// NewConsensusForSimulation creates an instance of Consensus for simulation,
@@ -412,19 +413,60 @@ func NewConsensusForSimulation(
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
+ return newConsensusForRound(
+ &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false)
+}
- return newConsensus(dMoment, app, gov, db, network, prv, logger, false)
+// NewConsensusFromSyncer constructs an Consensus instance from information
+// provided from syncer.
+//
+// You need to provide the initial block for this newly created Consensus
+// instance to bootstrap with. A proper choice is the last finalized block you
+// delivered to syncer.
+func NewConsensusFromSyncer(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ networkModule Network,
+ prv crypto.PrivateKey,
+ latticeModule *Lattice,
+ blocks []*types.Block,
+ randomnessResults []*types.BlockRandomnessResult,
+ logger common.Logger) (*Consensus, error) {
+ // Setup Consensus instance.
+ con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
+ networkModule, prv, logger, latticeModule, true)
+ // Dump all BA-confirmed blocks to the consensus instance.
+ for _, b := range blocks {
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ // Dump all randomness result to the consensus instance.
+ for _, r := range randomnessResults {
+ if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
+ con.logger.Error("failed to process randomness result when syncing",
+ "result", r)
+ continue
+ }
+ }
+ return con, nil
}
// newConsensus creates a Consensus instance.
-func newConsensus(
- dMoment time.Time,
+func newConsensusForRound(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
app Application,
gov Governance,
db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger,
+ latticeModule *Lattice,
usingNonBlocking bool) *Consensus {
// TODO(w): load latest blockHeight from DB, and use config at that height.
@@ -436,12 +478,14 @@ func newConsensus(
if a, ok := app.(Debug); ok {
debugApp = a
}
- // Get configuration for genesis round.
- var round uint64
- config := utils.GetConfigWithPanic(gov, round, logger)
+ // Get configuration for bootstrap round.
+ initRound := initBlock.Position.Round
+ initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
// Init lattice.
- lattice := NewLattice(
- dMoment, round, config, signer, app, debugApp, db, logger)
+ if latticeModule == nil {
+ latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig,
+ signer, app, debugApp, db, logger)
+ }
// Init configuration chain.
ID := types.NewNodeID(prv.PublicKey())
recv := &consensusDKGReceiver{
@@ -452,13 +496,7 @@ func newConsensus(
network: network,
logger: logger,
}
- cfgModule := newConfigurationChain(
- ID,
- recv,
- gov,
- nodeSetCache,
- db,
- logger)
+ cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger)
recv.cfgModule = cfgModule
appModule := app
if usingNonBlocking {
@@ -468,7 +506,7 @@ func newConsensus(
con := &Consensus{
ID: ID,
ccModule: newCompactionChain(gov),
- lattice: lattice,
+ lattice: latticeModule,
app: appModule,
debugApp: debugApp,
gov: gov,
@@ -477,77 +515,6 @@ func newConsensus(
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
- dMoment: dMoment,
- nodeSetCache: nodeSetCache,
- signer: signer,
- event: common.NewEvent(),
- logger: logger,
- }
- con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, round, dMoment)
- if err := con.prepare(&types.Block{}); err != nil {
- panic(err)
- }
- return con
-}
-
-// NewConsensusFromSyncer constructs an Consensus instance from information
-// provided from syncer.
-//
-// You need to provide the initial block for this newly created Consensus
-// instance to bootstrap with. A proper choice is the last finalized block you
-// delivered to syncer.
-func NewConsensusFromSyncer(
- initBlock *types.Block,
- initRoundBeginTime time.Time,
- app Application,
- gov Governance,
- db db.Database,
- networkModule Network,
- prv crypto.PrivateKey,
- latticeModule *Lattice,
- blocks []*types.Block,
- randomnessResults []*types.BlockRandomnessResult,
- logger common.Logger) (*Consensus, error) {
- // Setup the cache for node sets.
- nodeSetCache := utils.NewNodeSetCache(gov)
- // Setup signer module.
- signer := utils.NewSigner(prv)
- // Init configuration chain.
- ID := types.NewNodeID(prv.PublicKey())
- recv := &consensusDKGReceiver{
- ID: ID,
- gov: gov,
- signer: signer,
- nodeSetCache: nodeSetCache,
- network: networkModule,
- logger: logger,
- }
- cfgModule := newConfigurationChain(
- ID,
- recv,
- gov,
- nodeSetCache,
- db,
- logger)
- recv.cfgModule = cfgModule
- // Check if the application implement Debug interface.
- var debugApp Debug
- if a, ok := app.(Debug); ok {
- debugApp = a
- }
- // Setup Consensus instance.
- con := &Consensus{
- ID: ID,
- ccModule: newCompactionChain(gov),
- lattice: latticeModule,
- app: newNonBlocking(app, debugApp),
- gov: gov,
- db: db,
- network: networkModule,
- baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
dMoment: initRoundBeginTime,
nodeSetCache: nodeSetCache,
signer: signer,
@@ -555,27 +522,11 @@ func NewConsensusFromSyncer(
logger: logger,
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime)
- // Bootstrap the consensus instance.
+ con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
if err := con.prepare(initBlock); err != nil {
- return nil, err
- }
- // Dump all BA-confirmed blocks to the consensus instance.
- for _, b := range blocks {
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
- }
- }
- // Dump all randomness result to the consensus instance.
- for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
- con.logger.Error("failed to process randomness result when syncing",
- "result", r)
- continue
- }
+ panic(err)
}
- return con, nil
+ return con
}
// prepare the Consensus instance to be ready for blocks after 'initBlock'.
@@ -634,6 +585,9 @@ func (con *Consensus) Run() {
go con.processMsg(con.network.ReceiveChan())
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Take some time to bootstrap.
+ time.Sleep(3 * time.Second)
+ go con.pullRandomness()
// Block until done.
select {
case <-con.ctx.Done():
@@ -673,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) {
con.logger.Debug("Calling Governance.CRS to check if already proposed",
"round", round+1)
if (con.gov.CRS(round+1) != common.Hash{}) {
- con.logger.Info("CRS already proposed", "round", round+1)
+ con.logger.Debug("CRS already proposed", "round", round+1)
return
}
con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
@@ -744,7 +698,7 @@ func (con *Consensus) initialRound(
if (nextCRS != common.Hash{}) {
return true
}
- con.logger.Info("CRS is not ready yet. Try again later...",
+ con.logger.Debug("CRS is not ready yet. Try again later...",
"nodeID", con.ID,
"round", round)
return false
@@ -757,7 +711,7 @@ func (con *Consensus) initialRound(
go func(nextRound uint64) {
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Info("unable to prepare CRS for baMgr",
+ con.logger.Debug("unable to prepare CRS for baMgr",
"round", nextRound)
return
}
@@ -781,7 +735,7 @@ func (con *Consensus) initialRound(
// unexpected network fluctuation and ensure the robustness.
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Info("unable to prepare CRS for DKG set",
+ con.logger.Debug("unable to prepare CRS for DKG set",
"round", nextRound)
return
}
@@ -1034,9 +988,7 @@ func (con *Consensus) ProcessBlockRandomnessResult(
}
if needBroadcast {
con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "hash", rand.BlockHash.String()[:6],
- "position", &rand.Position,
- "randomness", hex.EncodeToString(rand.Randomness))
+ "randomness", rand)
con.network.BroadcastRandomnessResult(rand)
}
return con.deliverFinalizedBlocks()
@@ -1051,8 +1003,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
return
}
+func (con *Consensus) pullRandomness() {
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-con.resetRandomnessTicker:
+ case <-time.After(1500 * time.Millisecond):
+ // TODO(jimmy): pulling period should be related to lambdaBA.
+ hashes := con.ccModule.pendingBlocksWithoutRandomness()
+ con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes)
+ con.network.PullRandomness(hashes)
+ }
+ }
+}
+
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
+ select {
+ case con.resetRandomnessTicker <- struct{}{}:
+ default:
+ }
if err := con.db.UpdateBlock(*b); err != nil {
panic(err)
}
@@ -1134,7 +1105,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
if con.nonFinalizedBlockDelivered {
panic(fmt.Errorf("attempting to skip finalized block: %s", b))
}
- con.logger.Info("skip delivery of finalized block",
+ con.logger.Debug("skip delivery of finalized block",
"block", b,
"finalization-height", b.Finalization.Height)
continue