diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go | 203 |
1 files changed, 87 insertions, 116 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 0d4a38a91..3353d1d60 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -387,6 +387,7 @@ type Consensus struct { event *common.Event logger common.Logger nonFinalizedBlockDelivered bool + resetRandomnessTicker chan struct{} } // NewConsensus construct an Consensus instance. @@ -398,8 +399,8 @@ func NewConsensus( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { - - return newConsensus(dMoment, app, gov, db, network, prv, logger, true) + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -412,19 +413,60 @@ func NewConsensusForSimulation( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false) +} - return newConsensus(dMoment, app, gov, db, network, prv, logger, false) +// NewConsensusFromSyncer constructs an Consensus instance from information +// provided from syncer. +// +// You need to provide the initial block for this newly created Consensus +// instance to bootstrap with. A proper choice is the last finalized block you +// delivered to syncer. +func NewConsensusFromSyncer( + initBlock *types.Block, + initRoundBeginTime time.Time, + app Application, + gov Governance, + db db.Database, + networkModule Network, + prv crypto.PrivateKey, + latticeModule *Lattice, + blocks []*types.Block, + randomnessResults []*types.BlockRandomnessResult, + logger common.Logger) (*Consensus, error) { + // Setup Consensus instance. + con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, + networkModule, prv, logger, latticeModule, true) + // Dump all BA-confirmed blocks to the consensus instance. + for _, b := range blocks { + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + // Dump all randomness result to the consensus instance. + for _, r := range randomnessResults { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { + con.logger.Error("failed to process randomness result when syncing", + "result", r) + continue + } + } + return con, nil } // newConsensus creates a Consensus instance. -func newConsensus( - dMoment time.Time, +func newConsensusForRound( + initBlock *types.Block, + initRoundBeginTime time.Time, app Application, gov Governance, db db.Database, network Network, prv crypto.PrivateKey, logger common.Logger, + latticeModule *Lattice, usingNonBlocking bool) *Consensus { // TODO(w): load latest blockHeight from DB, and use config at that height. @@ -436,12 +478,14 @@ func newConsensus( if a, ok := app.(Debug); ok { debugApp = a } - // Get configuration for genesis round. - var round uint64 - config := utils.GetConfigWithPanic(gov, round, logger) + // Get configuration for bootstrap round. + initRound := initBlock.Position.Round + initConfig := utils.GetConfigWithPanic(gov, initRound, logger) // Init lattice. - lattice := NewLattice( - dMoment, round, config, signer, app, debugApp, db, logger) + if latticeModule == nil { + latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig, + signer, app, debugApp, db, logger) + } // Init configuration chain. ID := types.NewNodeID(prv.PublicKey()) recv := &consensusDKGReceiver{ @@ -452,13 +496,7 @@ func newConsensus( network: network, logger: logger, } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) + cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger) recv.cfgModule = cfgModule appModule := app if usingNonBlocking { @@ -468,7 +506,7 @@ func newConsensus( con := &Consensus{ ID: ID, ccModule: newCompactionChain(gov), - lattice: lattice, + lattice: latticeModule, app: appModule, debugApp: debugApp, gov: gov, @@ -477,77 +515,6 @@ func newConsensus( baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - signer: signer, - event: common.NewEvent(), - logger: logger, - } - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, round, dMoment) - if err := con.prepare(&types.Block{}); err != nil { - panic(err) - } - return con -} - -// NewConsensusFromSyncer constructs an Consensus instance from information -// provided from syncer. -// -// You need to provide the initial block for this newly created Consensus -// instance to bootstrap with. A proper choice is the last finalized block you -// delivered to syncer. -func NewConsensusFromSyncer( - initBlock *types.Block, - initRoundBeginTime time.Time, - app Application, - gov Governance, - db db.Database, - networkModule Network, - prv crypto.PrivateKey, - latticeModule *Lattice, - blocks []*types.Block, - randomnessResults []*types.BlockRandomnessResult, - logger common.Logger) (*Consensus, error) { - // Setup the cache for node sets. - nodeSetCache := utils.NewNodeSetCache(gov) - // Setup signer module. - signer := utils.NewSigner(prv) - // Init configuration chain. - ID := types.NewNodeID(prv.PublicKey()) - recv := &consensusDKGReceiver{ - ID: ID, - gov: gov, - signer: signer, - nodeSetCache: nodeSetCache, - network: networkModule, - logger: logger, - } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) - recv.cfgModule = cfgModule - // Check if the application implement Debug interface. - var debugApp Debug - if a, ok := app.(Debug); ok { - debugApp = a - } - // Setup Consensus instance. - con := &Consensus{ - ID: ID, - ccModule: newCompactionChain(gov), - lattice: latticeModule, - app: newNonBlocking(app, debugApp), - gov: gov, - db: db, - network: networkModule, - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, dMoment: initRoundBeginTime, nodeSetCache: nodeSetCache, signer: signer, @@ -555,27 +522,11 @@ func NewConsensusFromSyncer( logger: logger, } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime) - // Bootstrap the consensus instance. + con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) if err := con.prepare(initBlock); err != nil { - return nil, err - } - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err - } - } - // Dump all randomness result to the consensus instance. - for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r, false); err != nil { - con.logger.Error("failed to process randomness result when syncing", - "result", r) - continue - } + panic(err) } - return con, nil + return con } // prepare the Consensus instance to be ready for blocks after 'initBlock'. @@ -634,6 +585,9 @@ func (con *Consensus) Run() { go con.processMsg(con.network.ReceiveChan()) // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Take some time to bootstrap. + time.Sleep(3 * time.Second) + go con.pullRandomness() // Block until done. select { case <-con.ctx.Done(): @@ -673,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) { con.logger.Debug("Calling Governance.CRS to check if already proposed", "round", round+1) if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Info("CRS already proposed", "round", round+1) + con.logger.Debug("CRS already proposed", "round", round+1) return } con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", @@ -744,7 +698,7 @@ func (con *Consensus) initialRound( if (nextCRS != common.Hash{}) { return true } - con.logger.Info("CRS is not ready yet. Try again later...", + con.logger.Debug("CRS is not ready yet. Try again later...", "nodeID", con.ID, "round", round) return false @@ -757,7 +711,7 @@ func (con *Consensus) initialRound( go func(nextRound uint64) { if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for baMgr", + con.logger.Debug("unable to prepare CRS for baMgr", "round", nextRound) return } @@ -781,7 +735,7 @@ func (con *Consensus) initialRound( // unexpected network fluctuation and ensure the robustness. if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for DKG set", + con.logger.Debug("unable to prepare CRS for DKG set", "round", nextRound) return } @@ -1034,9 +988,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( } if needBroadcast { con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash.String()[:6], - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) + "randomness", rand) con.network.BroadcastRandomnessResult(rand) } return con.deliverFinalizedBlocks() @@ -1051,8 +1003,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { return } +func (con *Consensus) pullRandomness() { + for { + select { + case <-con.ctx.Done(): + return + case <-con.resetRandomnessTicker: + case <-time.After(1500 * time.Millisecond): + // TODO(jimmy): pulling period should be related to lambdaBA. + hashes := con.ccModule.pendingBlocksWithoutRandomness() + con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes) + con.network.PullRandomness(hashes) + } + } +} + // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { + select { + case con.resetRandomnessTicker <- struct{}{}: + default: + } if err := con.db.UpdateBlock(*b); err != nil { panic(err) } @@ -1134,7 +1105,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if con.nonFinalizedBlockDelivered { panic(fmt.Errorf("attempting to skip finalized block: %s", b)) } - con.logger.Info("skip delivery of finalized block", + con.logger.Debug("skip delivery of finalized block", "block", b, "finalization-height", b.Finalization.Height) continue |