diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/agreement-mgr.go | 59 | ||||
-rw-r--r-- | core/consensus.go | 181 | ||||
-rw-r--r-- | core/consensus_test.go | 2 | ||||
-rw-r--r-- | core/syncer/agreement.go | 4 | ||||
-rw-r--r-- | core/syncer/consensus.go | 42 |
5 files changed, 225 insertions, 63 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 6f50bfc..cbce6d2 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -89,11 +89,13 @@ type agreementMgr struct { lattice *Lattice ctx context.Context lastEndTime time.Time + initRound uint64 configs []*agreementMgrConfig baModules []*agreement waitGroup sync.WaitGroup pendingVotes map[uint64][]*types.Vote pendingBlocks map[uint64][]*types.Block + isRunning bool // This lock should be used when attempting to: // - add a new baModule. @@ -106,7 +108,8 @@ type agreementMgr struct { lock sync.RWMutex } -func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr { +func newAgreementMgr(con *Consensus, initRound uint64, + initRoundBeginTime time.Time) *agreementMgr { return &agreementMgr{ con: con, ID: con.ID, @@ -118,7 +121,33 @@ func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr { auth: con.authModule, lattice: con.lattice, ctx: con.ctx, - lastEndTime: dMoment, + initRound: initRound, + lastEndTime: initRoundBeginTime, + } +} + +func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if round < mgr.initRound { + panic(ErrRoundOutOfRange) + } + roundIndex := round - mgr.initRound + if roundIndex >= uint64(len(mgr.configs)) { + return nil + } + return mgr.configs[roundIndex] +} + +func (mgr *agreementMgr) run() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + if mgr.isRunning { + return + } + mgr.isRunning = true + for i := uint32(0); i < uint32(len(mgr.baModules)); i++ { + go mgr.runBA(mgr.initRound, i) } } @@ -126,8 +155,7 @@ func (mgr *agreementMgr) appendConfig( round uint64, config *types.Config, crs common.Hash) (err error) { mgr.lock.Lock() defer mgr.lock.Unlock() - // TODO(mission): initiate this module from some round > 0. - if round != uint64(len(mgr.configs)) { + if round != uint64(len(mgr.configs))+mgr.initRound { return ErrRoundNotIncreasing } newConfig := &agreementMgrConfig{ @@ -156,7 +184,9 @@ func (mgr *agreementMgr) appendConfig( // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) - go mgr.runBA(round, i) + if mgr.isRunning { + go mgr.runBA(round, i) + } } return nil } @@ -169,7 +199,8 @@ func (mgr *agreementMgr) processVote(v *types.Vote) error { mgr.logger.Error("Process vote for unknown chain to BA", "position", &v.Position, "baChain", len(mgr.baModules), - "baRound", len(mgr.configs)) + "baRound", len(mgr.configs), + "initRound", mgr.initRound) return utils.ErrInvalidChainID } return mgr.baModules[v.Position.ChainID].processVote(v) @@ -182,7 +213,8 @@ func (mgr *agreementMgr) processBlock(b *types.Block) error { mgr.logger.Error("Process block for unknown chain to BA", "position", &b.Position, "baChain", len(mgr.baModules), - "baRound", len(mgr.configs)) + "baRound", len(mgr.configs), + "initRound", mgr.initRound) return utils.ErrInvalidChainID } return mgr.baModules[b.Position.ChainID].processBlock(b) @@ -196,7 +228,8 @@ func (mgr *agreementMgr) processAgreementResult( mgr.logger.Error("Process unknown result for unknown chain to BA", "position", &result.Position, "baChain", len(mgr.baModules), - "baRound", len(mgr.configs)) + "baRound", len(mgr.configs), + "initRound", mgr.initRound) return utils.ErrInvalidChainID } agreement := mgr.baModules[result.Position.ChainID] @@ -273,15 +306,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Wait until the configuartion for next round is ready. var config *agreementMgrConfig for { - config = func() *agreementMgrConfig { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if nextRound < uint64(len(mgr.configs)) { - return mgr.configs[nextRound] - } - return nil - }() - if config != nil { + if config = mgr.getConfig(nextRound); config != nil { break } else { mgr.logger.Info("round is not ready", "round", nextRound) diff --git a/core/consensus.go b/core/consensus.go index 253c9a5..efd3ab7 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -58,6 +58,8 @@ var ( "incorrect vote proposer") ErrCRSNotReady = fmt.Errorf( "CRS not ready") + ErrConfigurationNotReady = fmt.Errorf( + "Configuration not ready") ) // consensusBAReceiver implements agreementReceiver. @@ -338,11 +340,10 @@ type Consensus struct { toSyncer *totalOrderingSyncer // Interfaces. - db blockdb.BlockDatabase - app Application - gov Governance - network Network - tickerObj Ticker + db blockdb.BlockDatabase + app Application + gov Governance + network Network // Misc. dMoment time.Time @@ -367,15 +368,19 @@ func NewConsensus( logger common.Logger) *Consensus { // TODO(w): load latest blockHeight from DB, and use config at that height. - var round uint64 - logger.Debug("Calling Governance.Configuration", "round", round) - config := gov.Configuration(round) nodeSetCache := utils.NewNodeSetCache(gov) - logger.Debug("Calling Governance.CRS", "round", round) // Setup auth module. authModule := NewAuthenticator(prv) // Check if the application implement Debug interface. debugApp, _ := app.(Debug) + // Get configuration for genesis round. + var round uint64 + logger.Debug("Calling Governance.Configuration", "round", round) + config := gov.Configuration(round) + if config == nil { + logger.Error("Unable to get configuration", "round", round) + return nil + } // Init lattice. lattice := NewLattice( dMoment, round, config, authModule, app, debugApp, db, logger) @@ -405,7 +410,6 @@ func NewConsensus( gov: gov, db: db, network: network, - tickerObj: newTicker(gov, round, TickerBA), baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, @@ -416,12 +420,100 @@ func NewConsensus( logger: logger, } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, dMoment) + con.baMgr = newAgreementMgr(con, round, dMoment) + if err := con.prepare(&types.Block{}); err != nil { + panic(err) + } return con } -// Run starts running DEXON Consensus. -func (con *Consensus) Run(initBlock *types.Block) { +// NewConsensusFromSyncer constructs an Consensus instance from information +// provided from syncer. +// +// You need to provide the initial block for this newly created Consensus +// instance to bootstrap with. A proper choice is the last finalized block you +// delivered to syncer. +func NewConsensusFromSyncer( + initBlock *types.Block, + initRoundBeginTime time.Time, + app Application, + gov Governance, + db blockdb.BlockDatabase, + networkModule Network, + prv crypto.PrivateKey, + latticeModule *Lattice, + blocks []*types.Block, + randomnessResults []*types.BlockRandomnessResult, + logger common.Logger) (*Consensus, error) { + // Setup the cache for node sets. + nodeSetCache := utils.NewNodeSetCache(gov) + // Setup auth module. + authModule := NewAuthenticator(prv) + // Init configuration chain. + ID := types.NewNodeID(prv.PublicKey()) + recv := &consensusDKGReceiver{ + ID: ID, + gov: gov, + authModule: authModule, + nodeSetCache: nodeSetCache, + network: networkModule, + logger: logger, + } + cfgModule := newConfigurationChain( + ID, + recv, + gov, + nodeSetCache, + logger) + recv.cfgModule = cfgModule + // Setup Consensus instance. + con := &Consensus{ + ID: ID, + ccModule: newCompactionChain(gov), + lattice: latticeModule, + app: app, + gov: gov, + db: db, + network: networkModule, + baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), + dkgReady: sync.NewCond(&sync.Mutex{}), + cfgModule: cfgModule, + dMoment: initRoundBeginTime, + nodeSetCache: nodeSetCache, + authModule: authModule, + event: common.NewEvent(), + logger: logger, + } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime) + // Bootstrap the consensus instance. + if err := con.prepare(initBlock); err != nil { + return nil, err + } + // Dump all BA-confirmed blocks to the consensus instance. + for _, b := range blocks { + con.app.BlockConfirmed(*b) + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + // Dump all randomness result to the consensus instance. + for _, r := range randomnessResults { + if err := con.ProcessBlockRandomnessResult(r); err != nil { + con.logger.Error("failed to process randomness result when syncing", + "result", r) + continue + } + } + return con, nil +} + +// prepare the Consensus instance to be ready for blocks after 'initBlock'. +// 'initBlock' could be either: +// - an empty block +// - the last finalized block +func (con *Consensus) prepare(initBlock *types.Block) error { // The block past from full node should be delivered already or known by // full node. We don't have to notify it. con.roundToNotify = initBlock.Position.Round + 1 @@ -430,36 +522,33 @@ func (con *Consensus) Run(initBlock *types.Block) { initConfig := con.gov.Configuration(initRound) // Setup context. con.ccModule.init(initBlock) - // TODO(jimmy-dexon): change AppendConfig to add config for specific round. - for i := uint64(0); i <= initRound+1; i++ { - con.logger.Debug("Calling Governance.Configuration", "round", i) - cfg := con.gov.Configuration(i) - // 0 round is already given to core.Lattice module when constructing. - if i > 0 { - if err := con.lattice.AppendConfig(i, cfg); err != nil { - panic(err) - } - } - // Corresponding CRS might not be ready for next round to initRound. - if i < initRound+1 { - con.logger.Debug("Calling Governance.CRS", "round", i) - crs := con.gov.CRS(i) - if (crs == common.Hash{}) { - panic(ErrCRSNotReady) - } - if err := con.baMgr.appendConfig(i, cfg, crs); err != nil { - panic(err) - } - } + // Setup agreementMgr module. + con.logger.Debug("Calling Governance.Configuration", "round", initRound) + initCfg := con.gov.Configuration(initRound) + if initCfg == nil { + return ErrConfigurationNotReady + } + con.logger.Debug("Calling Governance.CRS", "round", initRound) + initCRS := con.gov.CRS(initRound) + if (initCRS == common.Hash{}) { + return ErrCRSNotReady + } + if err := con.baMgr.appendConfig(initRound, initCfg, initCRS); err != nil { + return err + } + // Setup lattice module. + initPlusOneCfg := con.gov.Configuration(initRound + 1) + if initPlusOneCfg == nil { + return ErrConfigurationNotReady } + if err := con.lattice.AppendConfig(initRound+1, initPlusOneCfg); err != nil { + return err + } + // Register events. dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) if err != nil { - panic(err) + return err } - con.logger.Debug("Calling Network.ReceiveChan") - go con.processMsg(con.network.ReceiveChan()) - // Sleep until dMoment come. - time.Sleep(con.dMoment.Sub(time.Now().UTC())) if _, exist := dkgSet[con.ID]; exist { con.logger.Info("Selected as DKG set", "round", initRound) con.cfgModule.registerDKG(initRound, int(initConfig.DKGSetSize)/3+1) @@ -469,6 +558,18 @@ func (con *Consensus) Run(initBlock *types.Block) { }) } con.initialRound(con.dMoment, initRound, initConfig) + return nil +} + +// Run starts running DEXON Consensus. +func (con *Consensus) Run() { + // Launch BA routines. + con.baMgr.run() + // Launch network handler. + con.logger.Debug("Calling Network.ReceiveChan") + go con.processMsg(con.network.ReceiveChan()) + // Sleep until dMoment come. + time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Block until done. select { case <-con.ctx.Done(): @@ -705,6 +806,8 @@ MessageLoop: case *types.BlockRandomnessResult: if err := con.ProcessBlockRandomnessResult(val); err != nil { con.logger.Error("Failed to process block randomness result", + "hash", val.BlockHash.String()[:6], + "position", &val.Position, "error", err) } case *typesDKG.PrivateShare: diff --git a/core/consensus_test.go b/core/consensus_test.go index dab5440..7d9c653 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -553,7 +553,7 @@ func (s *ConsensusTestSuite) TestSyncBA() { s.Require().NoError(err) prvKey := prvKeys[0] _, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn) - go con.Run(&types.Block{}) + go con.Run() hash := common.NewRandomHash() auths := make([]*Authenticator, 0, len(prvKeys)) for _, prvKey := range prvKeys { diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index 89b8c8d..fee4624 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -105,6 +105,10 @@ func (a *agreement) processBlock(b *types.Block) { func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[r.BlockHash]; exists { + a.logger.Info("agreement result already confirmed", "result", r) + return + } if r.Position.Round > a.latestCRSRound { pendingsForRound, exists := a.pendings[r.Position.Round] if !exists { diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index d84f168..ce6fdc3 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -54,6 +54,7 @@ type Consensus struct { lattice *core.Lattice latticeLastRound uint64 + randomnessResults []*types.BlockRandomnessResult blocks []types.ByPosition agreements []*agreement configs []*types.Config @@ -175,11 +176,11 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) { // con.blocks are all in the same round, for avoiding config change while // syncing. func (con *Consensus) ensureAgreementOverlapRound() bool { + con.lock.Lock() + defer con.lock.Unlock() if con.agreementRoundCut > 0 { return true } - con.lock.Lock() - defer con.lock.Unlock() // Clean empty blocks on tips of chains. for idx, bs := range con.blocks { for len(bs) > 0 && con.isEmptyBlock(bs[0]) { @@ -412,10 +413,30 @@ func (con *Consensus) SyncBlocks( con.moduleWaitGroup.Wait() // Stop agreements. con.stopAgreement() - // TODO: flush all blocks in con.blocks into core.Consensus, and build + // flush all blocks in con.blocks into core.Consensus, and build // core.Consensus from syncer. - con.logger.Info("syncer.Consensus synced") - return &core.Consensus{}, nil + lastBlock := blocks[len(blocks)-1] + con.logger.Info("syncer.Consensus synced", "last-block", lastBlock) + confirmedBlocks := []*types.Block{} + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, bs := range con.blocks { + confirmedBlocks = append(confirmedBlocks, bs...) + } + }() + return core.NewConsensusFromSyncer( + lastBlock, + con.roundBeginTimes[lastBlock.Position.Round], + con.app, + con.gov, + con.db, + con.network, + con.prv, + con.lattice, + confirmedBlocks, + con.randomnessResults, + con.logger) } return nil, nil } @@ -470,7 +491,7 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) { con.resizeByNumChains(curMaxNumChains) // Notify core.Lattice for new configs. if con.lattice != nil { - for con.latticeLastRound+1 < uint64(len(con.configs)) { + for con.latticeLastRound+1 <= maxRound { con.latticeLastRound++ if err := con.lattice.AppendConfig( con.latticeLastRound, @@ -546,6 +567,15 @@ func (con *Consensus) startNetwork() { pos = v.Position case *types.AgreementResult: pos = v.Position + case *types.BlockRandomnessResult: + func() { + con.lock.Lock() + defer con.lock.Unlock() + if v.Position.Round >= con.agreementRoundCut { + con.randomnessResults = append(con.randomnessResults, v) + } + }() + continue Loop default: continue Loop } |