From 041eb53f043e6a4a7a9acab1ce46ecfd268fed57 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Tue, 4 Dec 2018 09:52:11 +0800 Subject: core: construct consensus from syncer (#352) --- core/agreement-mgr.go | 59 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 17 deletions(-) (limited to 'core/agreement-mgr.go') diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 6f50bfc..cbce6d2 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -89,11 +89,13 @@ type agreementMgr struct { lattice *Lattice ctx context.Context lastEndTime time.Time + initRound uint64 configs []*agreementMgrConfig baModules []*agreement waitGroup sync.WaitGroup pendingVotes map[uint64][]*types.Vote pendingBlocks map[uint64][]*types.Block + isRunning bool // This lock should be used when attempting to: // - add a new baModule. @@ -106,7 +108,8 @@ type agreementMgr struct { lock sync.RWMutex } -func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr { +func newAgreementMgr(con *Consensus, initRound uint64, + initRoundBeginTime time.Time) *agreementMgr { return &agreementMgr{ con: con, ID: con.ID, @@ -118,7 +121,33 @@ func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr { auth: con.authModule, lattice: con.lattice, ctx: con.ctx, - lastEndTime: dMoment, + initRound: initRound, + lastEndTime: initRoundBeginTime, + } +} + +func (mgr *agreementMgr) getConfig(round uint64) *agreementMgrConfig { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if round < mgr.initRound { + panic(ErrRoundOutOfRange) + } + roundIndex := round - mgr.initRound + if roundIndex >= uint64(len(mgr.configs)) { + return nil + } + return mgr.configs[roundIndex] +} + +func (mgr *agreementMgr) run() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + if mgr.isRunning { + return + } + mgr.isRunning = true + for i := uint32(0); i < uint32(len(mgr.baModules)); i++ { + go mgr.runBA(mgr.initRound, i) } } @@ -126,8 +155,7 @@ func (mgr *agreementMgr) appendConfig( round uint64, config *types.Config, crs common.Hash) (err error) { mgr.lock.Lock() defer mgr.lock.Unlock() - // TODO(mission): initiate this module from some round > 0. - if round != uint64(len(mgr.configs)) { + if round != uint64(len(mgr.configs))+mgr.initRound { return ErrRoundNotIncreasing } newConfig := &agreementMgrConfig{ @@ -156,7 +184,9 @@ func (mgr *agreementMgr) appendConfig( // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) - go mgr.runBA(round, i) + if mgr.isRunning { + go mgr.runBA(round, i) + } } return nil } @@ -169,7 +199,8 @@ func (mgr *agreementMgr) processVote(v *types.Vote) error { mgr.logger.Error("Process vote for unknown chain to BA", "position", &v.Position, "baChain", len(mgr.baModules), - "baRound", len(mgr.configs)) + "baRound", len(mgr.configs), + "initRound", mgr.initRound) return utils.ErrInvalidChainID } return mgr.baModules[v.Position.ChainID].processVote(v) @@ -182,7 +213,8 @@ func (mgr *agreementMgr) processBlock(b *types.Block) error { mgr.logger.Error("Process block for unknown chain to BA", "position", &b.Position, "baChain", len(mgr.baModules), - "baRound", len(mgr.configs)) + "baRound", len(mgr.configs), + "initRound", mgr.initRound) return utils.ErrInvalidChainID } return mgr.baModules[b.Position.ChainID].processBlock(b) @@ -196,7 +228,8 @@ func (mgr *agreementMgr) processAgreementResult( mgr.logger.Error("Process unknown result for unknown chain to BA", "position", &result.Position, "baChain", len(mgr.baModules), - "baRound", len(mgr.configs)) + "baRound", len(mgr.configs), + "initRound", mgr.initRound) return utils.ErrInvalidChainID } agreement := mgr.baModules[result.Position.ChainID] @@ -273,15 +306,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { // Wait until the configuartion for next round is ready. var config *agreementMgrConfig for { - config = func() *agreementMgrConfig { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if nextRound < uint64(len(mgr.configs)) { - return mgr.configs[nextRound] - } - return nil - }() - if config != nil { + if config = mgr.getConfig(nextRound); config != nil { break } else { mgr.logger.Info("round is not ready", "round", nextRound) -- cgit v1.2.3