aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-12-04 09:52:11 +0800
committerGitHub <noreply@github.com>2018-12-04 09:52:11 +0800
commit041eb53f043e6a4a7a9acab1ce46ecfd268fed57 (patch)
tree90ed112f4dbdab02f7a0542c8db40e9060b5ef32 /core/consensus.go
parent2e119344b3ecddd2cf07094c89249ab631901c4f (diff)
downloaddexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.tar
dexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.tar.gz
dexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.tar.bz2
dexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.tar.lz
dexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.tar.xz
dexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.tar.zst
dexon-consensus-041eb53f043e6a4a7a9acab1ce46ecfd268fed57.zip
core: construct consensus from syncer (#352)
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go181
1 files changed, 142 insertions, 39 deletions
diff --git a/core/consensus.go b/core/consensus.go
index 253c9a5..efd3ab7 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -58,6 +58,8 @@ var (
"incorrect vote proposer")
ErrCRSNotReady = fmt.Errorf(
"CRS not ready")
+ ErrConfigurationNotReady = fmt.Errorf(
+ "Configuration not ready")
)
// consensusBAReceiver implements agreementReceiver.
@@ -338,11 +340,10 @@ type Consensus struct {
toSyncer *totalOrderingSyncer
// Interfaces.
- db blockdb.BlockDatabase
- app Application
- gov Governance
- network Network
- tickerObj Ticker
+ db blockdb.BlockDatabase
+ app Application
+ gov Governance
+ network Network
// Misc.
dMoment time.Time
@@ -367,15 +368,19 @@ func NewConsensus(
logger common.Logger) *Consensus {
// TODO(w): load latest blockHeight from DB, and use config at that height.
- var round uint64
- logger.Debug("Calling Governance.Configuration", "round", round)
- config := gov.Configuration(round)
nodeSetCache := utils.NewNodeSetCache(gov)
- logger.Debug("Calling Governance.CRS", "round", round)
// Setup auth module.
authModule := NewAuthenticator(prv)
// Check if the application implement Debug interface.
debugApp, _ := app.(Debug)
+ // Get configuration for genesis round.
+ var round uint64
+ logger.Debug("Calling Governance.Configuration", "round", round)
+ config := gov.Configuration(round)
+ if config == nil {
+ logger.Error("Unable to get configuration", "round", round)
+ return nil
+ }
// Init lattice.
lattice := NewLattice(
dMoment, round, config, authModule, app, debugApp, db, logger)
@@ -405,7 +410,6 @@ func NewConsensus(
gov: gov,
db: db,
network: network,
- tickerObj: newTicker(gov, round, TickerBA),
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
@@ -416,12 +420,100 @@ func NewConsensus(
logger: logger,
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, dMoment)
+ con.baMgr = newAgreementMgr(con, round, dMoment)
+ if err := con.prepare(&types.Block{}); err != nil {
+ panic(err)
+ }
return con
}
-// Run starts running DEXON Consensus.
-func (con *Consensus) Run(initBlock *types.Block) {
+// NewConsensusFromSyncer constructs an Consensus instance from information
+// provided from syncer.
+//
+// You need to provide the initial block for this newly created Consensus
+// instance to bootstrap with. A proper choice is the last finalized block you
+// delivered to syncer.
+func NewConsensusFromSyncer(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
+ app Application,
+ gov Governance,
+ db blockdb.BlockDatabase,
+ networkModule Network,
+ prv crypto.PrivateKey,
+ latticeModule *Lattice,
+ blocks []*types.Block,
+ randomnessResults []*types.BlockRandomnessResult,
+ logger common.Logger) (*Consensus, error) {
+ // Setup the cache for node sets.
+ nodeSetCache := utils.NewNodeSetCache(gov)
+ // Setup auth module.
+ authModule := NewAuthenticator(prv)
+ // Init configuration chain.
+ ID := types.NewNodeID(prv.PublicKey())
+ recv := &consensusDKGReceiver{
+ ID: ID,
+ gov: gov,
+ authModule: authModule,
+ nodeSetCache: nodeSetCache,
+ network: networkModule,
+ logger: logger,
+ }
+ cfgModule := newConfigurationChain(
+ ID,
+ recv,
+ gov,
+ nodeSetCache,
+ logger)
+ recv.cfgModule = cfgModule
+ // Setup Consensus instance.
+ con := &Consensus{
+ ID: ID,
+ ccModule: newCompactionChain(gov),
+ lattice: latticeModule,
+ app: app,
+ gov: gov,
+ db: db,
+ network: networkModule,
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ dMoment: initRoundBeginTime,
+ nodeSetCache: nodeSetCache,
+ authModule: authModule,
+ event: common.NewEvent(),
+ logger: logger,
+ }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime)
+ // Bootstrap the consensus instance.
+ if err := con.prepare(initBlock); err != nil {
+ return nil, err
+ }
+ // Dump all BA-confirmed blocks to the consensus instance.
+ for _, b := range blocks {
+ con.app.BlockConfirmed(*b)
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ // Dump all randomness result to the consensus instance.
+ for _, r := range randomnessResults {
+ if err := con.ProcessBlockRandomnessResult(r); err != nil {
+ con.logger.Error("failed to process randomness result when syncing",
+ "result", r)
+ continue
+ }
+ }
+ return con, nil
+}
+
+// prepare the Consensus instance to be ready for blocks after 'initBlock'.
+// 'initBlock' could be either:
+// - an empty block
+// - the last finalized block
+func (con *Consensus) prepare(initBlock *types.Block) error {
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
con.roundToNotify = initBlock.Position.Round + 1
@@ -430,36 +522,33 @@ func (con *Consensus) Run(initBlock *types.Block) {
initConfig := con.gov.Configuration(initRound)
// Setup context.
con.ccModule.init(initBlock)
- // TODO(jimmy-dexon): change AppendConfig to add config for specific round.
- for i := uint64(0); i <= initRound+1; i++ {
- con.logger.Debug("Calling Governance.Configuration", "round", i)
- cfg := con.gov.Configuration(i)
- // 0 round is already given to core.Lattice module when constructing.
- if i > 0 {
- if err := con.lattice.AppendConfig(i, cfg); err != nil {
- panic(err)
- }
- }
- // Corresponding CRS might not be ready for next round to initRound.
- if i < initRound+1 {
- con.logger.Debug("Calling Governance.CRS", "round", i)
- crs := con.gov.CRS(i)
- if (crs == common.Hash{}) {
- panic(ErrCRSNotReady)
- }
- if err := con.baMgr.appendConfig(i, cfg, crs); err != nil {
- panic(err)
- }
- }
+ // Setup agreementMgr module.
+ con.logger.Debug("Calling Governance.Configuration", "round", initRound)
+ initCfg := con.gov.Configuration(initRound)
+ if initCfg == nil {
+ return ErrConfigurationNotReady
+ }
+ con.logger.Debug("Calling Governance.CRS", "round", initRound)
+ initCRS := con.gov.CRS(initRound)
+ if (initCRS == common.Hash{}) {
+ return ErrCRSNotReady
+ }
+ if err := con.baMgr.appendConfig(initRound, initCfg, initCRS); err != nil {
+ return err
+ }
+ // Setup lattice module.
+ initPlusOneCfg := con.gov.Configuration(initRound + 1)
+ if initPlusOneCfg == nil {
+ return ErrConfigurationNotReady
}
+ if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil {
+ return err
+ }
+ // Register events.
dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
if err != nil {
- panic(err)
+ return err
}
- con.logger.Debug("Calling Network.ReceiveChan")
- go con.processMsg(con.network.ReceiveChan())
- // Sleep until dMoment come.
- time.Sleep(con.dMoment.Sub(time.Now().UTC()))
if _, exist := dkgSet[con.ID]; exist {
con.logger.Info("Selected as DKG set", "round", initRound)
con.cfgModule.registerDKG(initRound, int(initConfig.DKGSetSize)/3+1)
@@ -469,6 +558,18 @@ func (con *Consensus) Run(initBlock *types.Block) {
})
}
con.initialRound(con.dMoment, initRound, initConfig)
+ return nil
+}
+
+// Run starts running DEXON Consensus.
+func (con *Consensus) Run() {
+ // Launch BA routines.
+ con.baMgr.run()
+ // Launch network handler.
+ con.logger.Debug("Calling Network.ReceiveChan")
+ go con.processMsg(con.network.ReceiveChan())
+ // Sleep until dMoment come.
+ time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Block until done.
select {
case <-con.ctx.Done():
@@ -705,6 +806,8 @@ MessageLoop:
case *types.BlockRandomnessResult:
if err := con.ProcessBlockRandomnessResult(val); err != nil {
con.logger.Error("Failed to process block randomness result",
+ "hash", val.BlockHash.String()[:6],
+ "position", &val.Position,
"error", err)
}
case *typesDKG.PrivateShare: