aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/agreement-mgr.go59
-rw-r--r--core/consensus.go181
-rw-r--r--core/consensus_test.go2
-rw-r--r--core/syncer/agreement.go4
-rw-r--r--core/syncer/consensus.go42
5 files changed, 225 insertions, 63 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index 6f50bfc..cbce6d2 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -89,11 +89,13 @@ type agreementMgr struct {
lattice *Lattice
ctx context.Context
lastEndTime time.Time
+ initRound uint64
configs []*agreementMgrConfig
baModules []*agreement
waitGroup sync.WaitGroup
pendingVotes map[uint64][]*types.Vote
pendingBlocks map[uint64][]*types.Block
+ isRunning bool
// This lock should be used when attempting to:
// - add a new baModule.
@@ -106,7 +108,8 @@ type agreementMgr struct {
lock sync.RWMutex
}
-func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr {
+func newAgreementMgr(con *Consensus, initRound uint64,
+ initRoundBeginTime time.Time) *agreementMgr {
return &agreementMgr{
con: con,
ID: con.ID,
@@ -118,7 +121,33 @@ func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr {
auth: con.authModule,
lattice: con.lattice,
ctx: con.ctx,
- lastEndTime: dMoment,
+ initRound: initRound,
+ lastEndTime: initRoundBeginTime,
+ }
+}
+
+func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig {
+ mgr.lock.RLock()
+ defer mgr.lock.RUnlock()
+ if round < mgr.initRound {
+ panic(ErrRoundOutOfRange)
+ }
+ roundIndex := round - mgr.initRound
+ if roundIndex >= uint64(len(mgr.configs)) {
+ return nil
+ }
+ return mgr.configs[roundIndex]
+}
+
+func (mgr *agreementMgr) run() {
+ mgr.lock.Lock()
+ defer mgr.lock.Unlock()
+ if mgr.isRunning {
+ return
+ }
+ mgr.isRunning = true
+ for i := uint32(0); i < uint32(len(mgr.baModules)); i++ {
+ go mgr.runBA(mgr.initRound, i)
}
}
@@ -126,8 +155,7 @@ func (mgr *agreementMgr) appendConfig(
round uint64, config *types.Config, crs common.Hash) (err error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
- // TODO(mission): initiate this module from some round > 0.
- if round != uint64(len(mgr.configs)) {
+ if round != uint64(len(mgr.configs))+mgr.initRound {
return ErrRoundNotIncreasing
}
newConfig := &agreementMgrConfig{
@@ -156,7 +184,9 @@ func (mgr *agreementMgr) appendConfig(
// Hacky way to make agreement module self contained.
recv.agreementModule = agrModule
mgr.baModules = append(mgr.baModules, agrModule)
- go mgr.runBA(round, i)
+ if mgr.isRunning {
+ go mgr.runBA(round, i)
+ }
}
return nil
}
@@ -169,7 +199,8 @@ func (mgr *agreementMgr) processVote(v *types.Vote) error {
mgr.logger.Error("Process vote for unknown chain to BA",
"position", &v.Position,
"baChain", len(mgr.baModules),
- "baRound", len(mgr.configs))
+ "baRound", len(mgr.configs),
+ "initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
return mgr.baModules[v.Position.ChainID].processVote(v)
@@ -182,7 +213,8 @@ func (mgr *agreementMgr) processBlock(b *types.Block) error {
mgr.logger.Error("Process block for unknown chain to BA",
"position", &b.Position,
"baChain", len(mgr.baModules),
- "baRound", len(mgr.configs))
+ "baRound", len(mgr.configs),
+ "initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
return mgr.baModules[b.Position.ChainID].processBlock(b)
@@ -196,7 +228,8 @@ func (mgr *agreementMgr) processAgreementResult(
mgr.logger.Error("Process unknown result for unknown chain to BA",
"position", &result.Position,
"baChain", len(mgr.baModules),
- "baRound", len(mgr.configs))
+ "baRound", len(mgr.configs),
+ "initRound", mgr.initRound)
return utils.ErrInvalidChainID
}
agreement := mgr.baModules[result.Position.ChainID]
@@ -273,15 +306,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
// Wait until the configuartion for next round is ready.
var config *agreementMgrConfig
for {
- config = func() *agreementMgrConfig {
- mgr.lock.RLock()
- defer mgr.lock.RUnlock()
- if nextRound < uint64(len(mgr.configs)) {
- return mgr.configs[nextRound]
- }
- return nil
- }()
- if config != nil {
+ if config = mgr.getConfig(nextRound); config != nil {
break
} else {
mgr.logger.Info("round is not ready", "round", nextRound)
diff --git a/core/consensus.go b/core/consensus.go
index 253c9a5..efd3ab7 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -58,6 +58,8 @@ var (
"incorrect vote proposer")
ErrCRSNotReady = fmt.Errorf(
"CRS not ready")
+ ErrConfigurationNotReady = fmt.Errorf(
+ "Configuration not ready")
)
// consensusBAReceiver implements agreementReceiver.
@@ -338,11 +340,10 @@ type Consensus struct {
toSyncer *totalOrderingSyncer
// Interfaces.
- db blockdb.BlockDatabase
- app Application
- gov Governance
- network Network
- tickerObj Ticker
+ db blockdb.BlockDatabase
+ app Application
+ gov Governance
+ network Network
// Misc.
dMoment time.Time
@@ -367,15 +368,19 @@ func NewConsensus(
logger common.Logger) *Consensus {
// TODO(w): load latest blockHeight from DB, and use config at that height.
- var round uint64
- logger.Debug("Calling Governance.Configuration", "round", round)
- config := gov.Configuration(round)
nodeSetCache := utils.NewNodeSetCache(gov)
- logger.Debug("Calling Governance.CRS", "round", round)
// Setup auth module.
authModule := NewAuthenticator(prv)
// Check if the application implement Debug interface.
debugApp, _ := app.(Debug)
+ // Get configuration for genesis round.
+ var round uint64
+ logger.Debug("Calling Governance.Configuration", "round", round)
+ config := gov.Configuration(round)
+ if config == nil {
+ logger.Error("Unable to get configuration", "round", round)
+ return nil
+ }
// Init lattice.
lattice := NewLattice(
dMoment, round, config, authModule, app, debugApp, db, logger)
@@ -405,7 +410,6 @@ func NewConsensus(
gov: gov,
db: db,
network: network,
- tickerObj: newTicker(gov, round, TickerBA),
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
@@ -416,12 +420,100 @@ func NewConsensus(
logger: logger,
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, dMoment)
+ con.baMgr = newAgreementMgr(con, round, dMoment)
+ if err := con.prepare(&types.Block{}); err != nil {
+ panic(err)
+ }
return con
}
-// Run starts running DEXON Consensus.
-func (con *Consensus) Run(initBlock *types.Block) {
+// NewConsensusFromSyncer constructs an Consensus instance from information
+// provided from syncer.
+//
+// You need to provide the initial block for this newly created Consensus
+// instance to bootstrap with. A proper choice is the last finalized block you
+// delivered to syncer.
+func NewConsensusFromSyncer(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
+ app Application,
+ gov Governance,
+ db blockdb.BlockDatabase,
+ networkModule Network,
+ prv crypto.PrivateKey,
+ latticeModule *Lattice,
+ blocks []*types.Block,
+ randomnessResults []*types.BlockRandomnessResult,
+ logger common.Logger) (*Consensus, error) {
+ // Setup the cache for node sets.
+ nodeSetCache := utils.NewNodeSetCache(gov)
+ // Setup auth module.
+ authModule := NewAuthenticator(prv)
+ // Init configuration chain.
+ ID := types.NewNodeID(prv.PublicKey())
+ recv := &consensusDKGReceiver{
+ ID: ID,
+ gov: gov,
+ authModule: authModule,
+ nodeSetCache: nodeSetCache,
+ network: networkModule,
+ logger: logger,
+ }
+ cfgModule := newConfigurationChain(
+ ID,
+ recv,
+ gov,
+ nodeSetCache,
+ logger)
+ recv.cfgModule = cfgModule
+ // Setup Consensus instance.
+ con := &Consensus{
+ ID: ID,
+ ccModule: newCompactionChain(gov),
+ lattice: latticeModule,
+ app: app,
+ gov: gov,
+ db: db,
+ network: networkModule,
+ baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
+ dkgReady: sync.NewCond(&sync.Mutex{}),
+ cfgModule: cfgModule,
+ dMoment: initRoundBeginTime,
+ nodeSetCache: nodeSetCache,
+ authModule: authModule,
+ event: common.NewEvent(),
+ logger: logger,
+ }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime)
+ // Bootstrap the consensus instance.
+ if err := con.prepare(initBlock); err != nil {
+ return nil, err
+ }
+ // Dump all BA-confirmed blocks to the consensus instance.
+ for _, b := range blocks {
+ con.app.BlockConfirmed(*b)
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ // Dump all randomness result to the consensus instance.
+ for _, r := range randomnessResults {
+ if err := con.ProcessBlockRandomnessResult(r); err != nil {
+ con.logger.Error("failed to process randomness result when syncing",
+ "result", r)
+ continue
+ }
+ }
+ return con, nil
+}
+
+// prepare the Consensus instance to be ready for blocks after 'initBlock'.
+// 'initBlock' could be either:
+// - an empty block
+// - the last finalized block
+func (con *Consensus) prepare(initBlock *types.Block) error {
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
con.roundToNotify = initBlock.Position.Round + 1
@@ -430,36 +522,33 @@ func (con *Consensus) Run(initBlock *types.Block) {
initConfig := con.gov.Configuration(initRound)
// Setup context.
con.ccModule.init(initBlock)
- // TODO(jimmy-dexon): change AppendConfig to add config for specific round.
- for i := uint64(0); i <= initRound+1; i++ {
- con.logger.Debug("Calling Governance.Configuration", "round", i)
- cfg := con.gov.Configuration(i)
- // 0 round is already given to core.Lattice module when constructing.
- if i > 0 {
- if err := con.lattice.AppendConfig(i, cfg); err != nil {
- panic(err)
- }
- }
- // Corresponding CRS might not be ready for next round to initRound.
- if i < initRound+1 {
- con.logger.Debug("Calling Governance.CRS", "round", i)
- crs := con.gov.CRS(i)
- if (crs == common.Hash{}) {
- panic(ErrCRSNotReady)
- }
- if err := con.baMgr.appendConfig(i, cfg, crs); err != nil {
- panic(err)
- }
- }
+ // Setup agreementMgr module.
+ con.logger.Debug("Calling Governance.Configuration", "round", initRound)
+ initCfg := con.gov.Configuration(initRound)
+ if initCfg == nil {
+ return ErrConfigurationNotReady
+ }
+ con.logger.Debug("Calling Governance.CRS", "round", initRound)
+ initCRS := con.gov.CRS(initRound)
+ if (initCRS == common.Hash{}) {
+ return ErrCRSNotReady
+ }
+ if err := con.baMgr.appendConfig(initRound, initCfg, initCRS); err != nil {
+ return err
+ }
+ // Setup lattice module.
+ initPlusOneCfg := con.gov.Configuration(initRound + 1)
+ if initPlusOneCfg == nil {
+ return ErrConfigurationNotReady
}
+ if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil {
+ return err
+ }
+ // Register events.
dkgSet, err := con.nodeSetCache.GetDKGSet(initRound)
if err != nil {
- panic(err)
+ return err
}
- con.logger.Debug("Calling Network.ReceiveChan")
- go con.processMsg(con.network.ReceiveChan())
- // Sleep until dMoment come.
- time.Sleep(con.dMoment.Sub(time.Now().UTC()))
if _, exist := dkgSet[con.ID]; exist {
con.logger.Info("Selected as DKG set", "round", initRound)
con.cfgModule.registerDKG(initRound, int(initConfig.DKGSetSize)/3+1)
@@ -469,6 +558,18 @@ func (con *Consensus) Run(initBlock *types.Block) {
})
}
con.initialRound(con.dMoment, initRound, initConfig)
+ return nil
+}
+
+// Run starts running DEXON Consensus.
+func (con *Consensus) Run() {
+ // Launch BA routines.
+ con.baMgr.run()
+ // Launch network handler.
+ con.logger.Debug("Calling Network.ReceiveChan")
+ go con.processMsg(con.network.ReceiveChan())
+ // Sleep until dMoment come.
+ time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Block until done.
select {
case <-con.ctx.Done():
@@ -705,6 +806,8 @@ MessageLoop:
case *types.BlockRandomnessResult:
if err := con.ProcessBlockRandomnessResult(val); err != nil {
con.logger.Error("Failed to process block randomness result",
+ "hash", val.BlockHash.String()[:6],
+ "position", &val.Position,
"error", err)
}
case *typesDKG.PrivateShare:
diff --git a/core/consensus_test.go b/core/consensus_test.go
index dab5440..7d9c653 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -553,7 +553,7 @@ func (s *ConsensusTestSuite) TestSyncBA() {
s.Require().NoError(err)
prvKey := prvKeys[0]
_, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn)
- go con.Run(&types.Block{})
+ go con.Run()
hash := common.NewRandomHash()
auths := make([]*Authenticator, 0, len(prvKeys))
for _, prvKey := range prvKeys {
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index 89b8c8d..fee4624 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -105,6 +105,10 @@ func (a *agreement) processBlock(b *types.Block) {
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
// Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
+ a.logger.Info("agreement result already confirmed", "result", r)
+ return
+ }
if r.Position.Round > a.latestCRSRound {
pendingsForRound, exists := a.pendings[r.Position.Round]
if !exists {
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index d84f168..ce6fdc3 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -54,6 +54,7 @@ type Consensus struct {
lattice *core.Lattice
latticeLastRound uint64
+ randomnessResults []*types.BlockRandomnessResult
blocks []types.ByPosition
agreements []*agreement
configs []*types.Config
@@ -175,11 +176,11 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) {
// con.blocks are all in the same round, for avoiding config change while
// syncing.
func (con *Consensus) ensureAgreementOverlapRound() bool {
+ con.lock.Lock()
+ defer con.lock.Unlock()
if con.agreementRoundCut > 0 {
return true
}
- con.lock.Lock()
- defer con.lock.Unlock()
// Clean empty blocks on tips of chains.
for idx, bs := range con.blocks {
for len(bs) > 0 && con.isEmptyBlock(bs[0]) {
@@ -412,10 +413,30 @@ func (con *Consensus) SyncBlocks(
con.moduleWaitGroup.Wait()
// Stop agreements.
con.stopAgreement()
- // TODO: flush all blocks in con.blocks into core.Consensus, and build
+ // flush all blocks in con.blocks into core.Consensus, and build
// core.Consensus from syncer.
- con.logger.Info("syncer.Consensus synced")
- return &core.Consensus{}, nil
+ lastBlock := blocks[len(blocks)-1]
+ con.logger.Info("syncer.Consensus synced", "last-block", lastBlock)
+ confirmedBlocks := []*types.Block{}
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for _, bs := range con.blocks {
+ confirmedBlocks = append(confirmedBlocks, bs...)
+ }
+ }()
+ return core.NewConsensusFromSyncer(
+ lastBlock,
+ con.roundBeginTimes[lastBlock.Position.Round],
+ con.app,
+ con.gov,
+ con.db,
+ con.network,
+ con.prv,
+ con.lattice,
+ confirmedBlocks,
+ con.randomnessResults,
+ con.logger)
}
return nil, nil
}
@@ -470,7 +491,7 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) {
con.resizeByNumChains(curMaxNumChains)
// Notify core.Lattice for new configs.
if con.lattice != nil {
- for con.latticeLastRound+1 < uint64(len(con.configs)) {
+ for con.latticeLastRound+1 <= maxRound {
con.latticeLastRound++
if err := con.lattice.AppendConfig(
con.latticeLastRound,
@@ -546,6 +567,15 @@ func (con *Consensus) startNetwork() {
pos = v.Position
case *types.AgreementResult:
pos = v.Position
+ case *types.BlockRandomnessResult:
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if v.Position.Round >= con.agreementRoundCut {
+ con.randomnessResults = append(con.randomnessResults, v)
+ }
+ }()
+ continue Loop
default:
continue Loop
}