diff options
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 286 |
1 files changed, 87 insertions, 199 deletions
diff --git a/core/consensus.go b/core/consensus.go index e09ee25..49874d3 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -56,6 +56,8 @@ var ( "incorrect vote position") ErrIncorrectVoteProposer = fmt.Errorf( "incorrect vote proposer") + ErrCRSNotReady = fmt.Errorf( + "CRS not ready") ) // consensusBAReceiver implements agreementReceiver. @@ -103,20 +105,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { func (recv *consensusBAReceiver) ConfirmBlock( hash common.Hash, votes map[types.NodeID]*types.Vote) { var block *types.Block - if (hash == common.Hash{}) { + isEmptyBlockConfirmed := hash == common.Hash{} + if isEmptyBlockConfirmed { aID := recv.agreementModule.agreementID() recv.consensus.logger.Info("Empty block is confirmed", "position", &aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.chainID) + block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID) if err != nil { recv.consensus.logger.Error("Propose empty block failed", "error", err) return } } else { var exist bool - block, exist = recv.consensus.baModules[recv.chainID]. - findCandidateBlockNoLock(hash) + block, exist = recv.agreementModule.findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", "hash", hash, @@ -150,9 +152,10 @@ func (recv *consensusBAReceiver) ConfirmBlock( voteList = append(voteList, *vote) } result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + IsEmptyBlock: isEmptyBlockConfirmed, } recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult", "result", result) @@ -273,8 +276,7 @@ type Consensus struct { authModule *Authenticator // BA. - baModules []*agreement - receivers []*consensusBAReceiver + baMgr *agreementMgr baConfirmedBlock map[common.Hash]chan<- *types.Block // DKG. @@ -365,49 +367,8 @@ func NewConsensus( event: common.NewEvent(), logger: logger, } - - validLeader := func(block *types.Block) (bool, error) { - if block.Timestamp.After(time.Now()) { - return false, nil - } - if err := lattice.SanityCheck(block); err != nil { - if err == ErrRetrySanityCheckLater { - return false, nil - } - return false, err - } - logger.Debug("Calling Application.VerifyBlock", "block", block) - switch app.VerifyBlock(block) { - case types.VerifyInvalidBlock: - return false, ErrInvalidBlock - case types.VerifyRetryLater: - return false, nil - default: - } - return true, nil - } - - con.baModules = make([]*agreement, config.NumChains) - con.receivers = make([]*consensusBAReceiver, config.NumChains) - for i := uint32(0); i < config.NumChains; i++ { - chainID := i - recv := &consensusBAReceiver{ - consensus: con, - chainID: chainID, - restartNotary: make(chan bool, 1), - } - agreementModule := newAgreement( - con.ID, - recv, - newLeaderSelector(validLeader, logger), - con.authModule, - ) - // Hacky way to make agreement module self contained. - recv.agreementModule = agreementModule - recv.changeNotaryTime = dMoment - con.baModules[chainID] = agreementModule - con.receivers[chainID] = recv - } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + con.baMgr = newAgreementMgr(con, dMoment) return con } @@ -420,14 +381,27 @@ func (con *Consensus) Run(initBlock *types.Block) { con.logger.Debug("Calling Governance.Configuration", "round", initRound) initConfig := con.gov.Configuration(initRound) // Setup context. - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.ccModule.init(initBlock) // TODO(jimmy-dexon): change AppendConfig to add config for specific round. - for i := uint64(0); i <= initRound; i++ { - con.logger.Debug("Calling Governance.Configuration", "round", i+1) - cfg := con.gov.Configuration(i + 1) - if err := con.lattice.AppendConfig(i+1, cfg); err != nil { - panic(err) + for i := uint64(0); i <= initRound+1; i++ { + con.logger.Debug("Calling Governance.Configuration", "round", i) + cfg := con.gov.Configuration(i) + // 0 round is already given to core.Lattice module when constructing. + if i > 0 { + if err := con.lattice.AppendConfig(i, cfg); err != nil { + panic(err) + } + } + // Corresponding CRS might not be ready for next round to initRound. + if i < initRound+1 { + con.logger.Debug("Calling Governance.CRS", "round", i) + crs := con.gov.CRS(i) + if (crs == common.Hash{}) { + panic(ErrCRSNotReady) + } + if err := con.baMgr.appendConfig(i, cfg, crs); err != nil { + panic(err) + } } } dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) @@ -447,103 +421,9 @@ func (con *Consensus) Run(initBlock *types.Block) { }) } con.initialRound(con.dMoment, initRound, initConfig) - ticks := make([]chan struct{}, 0, initConfig.NumChains) - for i := uint32(0); i < initConfig.NumChains; i++ { - tick := make(chan struct{}) - ticks = append(ticks, tick) - // TODO(jimmy-dexon): this is a temporary solution to offset BA time. - // The complelete solution should be delivered along with config change. - offset := time.Duration(i*uint32(4)/initConfig.NumChains) * - initConfig.LambdaBA - go func(chainID uint32, offset time.Duration) { - time.Sleep(offset) - con.runBA(chainID, tick) - }(i, offset) - } - - // Reset ticker. - <-con.tickerObj.Tick() - <-con.tickerObj.Tick() - for { - <-con.tickerObj.Tick() - for _, tick := range ticks { - select { - case tick <- struct{}{}: - default: - } - } - } -} - -func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) { - // TODO(jimmy-dexon): move this function inside agreement. - agreement := con.baModules[chainID] - recv := con.receivers[chainID] - recv.restartNotary <- true - nIDs := make(map[types.NodeID]struct{}) - crs := common.Hash{} - // Reset ticker - <-tick -BALoop: - for { - select { - case <-con.ctx.Done(): - break BALoop - default: - } - select { - case newNotary := <-recv.restartNotary: - if newNotary { - con.logger.Debug("Calling Governance.CRS", "round", recv.round) - crs = con.gov.CRS(recv.round) - if (crs == common.Hash{}) { - // Governance is out-of-sync. - continue BALoop - } - configForNewRound := con.gov.Configuration(recv.round) - recv.changeNotaryTime = - recv.changeNotaryTime.Add(configForNewRound.RoundInterval) - nodes, err := con.nodeSetCache.GetNodeSet(recv.round) - if err != nil { - panic(err) - } - con.logger.Debug("Calling Governance.Configuration", - "round", recv.round) - nIDs = nodes.GetSubSet( - int(configForNewRound.NotarySetSize), - types.NewNotarySetTarget(crs, chainID)) - } - nextPos := con.lattice.NextPosition(chainID) - nextPos.Round = recv.round - agreement.restart(nIDs, nextPos, crs) - default: - } - if agreement.pullVotes() { - pos := agreement.agreementID() - con.logger.Debug("Calling Network.PullVotes for syncing votes", - "position", &pos) - con.network.PullVotes(pos) - } - err := agreement.nextState() - if err != nil { - con.logger.Error("Failed to proceed to next state", - "nodeID", con.ID.String(), - "error", err) - break BALoop - } - for i := 0; i < agreement.clocks(); i++ { - // Priority select for agreement.done(). - select { - case <-agreement.done(): - continue BALoop - default: - } - select { - case <-agreement.done(): - continue BALoop - case <-tick: - } - } + // Block until done. + select { + case <-con.ctx.Done(): } } @@ -622,6 +502,7 @@ func (con *Consensus) initialRound( con.logger.Error("Error getting DKG set", "round", round, "error", err) curDkgSet = make(map[types.NodeID]struct{}) } + // Initiate CRS routine. if _, exist := curDkgSet[con.ID]; exist { con.event.RegisterTime(startTime.Add(config.RoundInterval/2), func(time.Time) { @@ -630,7 +511,31 @@ func (con *Consensus) initialRound( }() }) } - + // Initiate BA modules. + con.event.RegisterTime( + startTime.Add(config.RoundInterval/2+config.LambdaDKG), + func(time.Time) { + go func(nextRound uint64) { + for (con.gov.CRS(nextRound) == common.Hash{}) { + con.logger.Info("CRS is not ready yet. Try again later...", + "nodeID", con.ID, + "round", nextRound) + time.Sleep(500 * time.Millisecond) + } + // Notify BA for new round. + con.logger.Debug("Calling Governance.Configuration", + "round", nextRound) + nextConfig := con.gov.Configuration(nextRound) + con.logger.Debug("Calling Governance.CRS", + "round", nextRound) + nextCRS := con.gov.CRS(nextRound) + if err := con.baMgr.appendConfig( + nextRound, nextConfig, nextCRS); err != nil { + panic(err) + } + }(round + 1) + }) + // Initiate DKG for this round. con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG), func(time.Time) { go func(nextRound uint64) { @@ -670,6 +575,7 @@ func (con *Consensus) initialRound( }) }(round + 1) }) + // Prepare lattice module for next round and next "initialRound" routine. con.event.RegisterTime(startTime.Add(config.RoundInterval), func(time.Time) { // Change round. @@ -685,9 +591,7 @@ func (con *Consensus) initialRound( // Stop the Consensus core. func (con *Consensus) Stop() { - for _, a := range con.baModules { - a.stop() - } + con.baMgr.stop() con.event.Reset() con.ctxCancel() } @@ -785,9 +689,10 @@ func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block { } func (con *Consensus) proposeEmptyBlock( - chainID uint32) (*types.Block, error) { + round uint64, chainID uint32) (*types.Block, error) { block := &types.Block{ Position: types.Position{ + Round: round, ChainID: chainID, }, } @@ -799,15 +704,9 @@ func (con *Consensus) proposeEmptyBlock( // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - if vote.Position.ChainID >= uint32(len(con.baModules)) { - return nil - } - if isStop(con.baModules[vote.Position.ChainID].agreementID()) { - return nil - } v := vote.Clone() - err = con.baModules[v.Position.ChainID].processVote(v) - return err + err = con.baMgr.processVote(v) + return } // ProcessAgreementResult processes the randomness request. @@ -826,8 +725,14 @@ func (con *Consensus) ProcessAgreementResult( return ErrIncorrectVoteProposer } for _, vote := range rand.Votes { - if vote.BlockHash != rand.BlockHash { - return ErrIncorrectVoteBlockHash + if rand.IsEmptyBlock { + if (vote.BlockHash != common.Hash{}) { + return ErrIncorrectVoteBlockHash + } + } else { + if vote.BlockHash != rand.BlockHash { + return ErrIncorrectVoteBlockHash + } } if vote.Type != types.VoteCom { return ErrIncorrectVoteType @@ -847,29 +752,8 @@ func (con *Consensus) ProcessAgreementResult( } } // Syncing BA Module. - agreement := con.baModules[rand.Position.ChainID] - aID := agreement.agreementID() - if isStop(aID) { - return nil - } - if rand.Position.Newer(&aID) { - con.logger.Info("Syncing BA", "position", &rand.Position) - nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round) - if err != nil { - return err - } - con.logger.Debug("Calling Network.PullBlocks for syncing BA", - "hash", rand.BlockHash) - con.network.PullBlocks(common.Hashes{rand.BlockHash}) - con.logger.Debug("Calling Governance.CRS", "round", rand.Position.Round) - crs := con.gov.CRS(rand.Position.Round) - nIDs := nodes.GetSubSet( - int(con.gov.Configuration(rand.Position.Round).NotarySetSize), - types.NewNotarySetTarget(crs, rand.Position.ChainID)) - for _, vote := range rand.Votes { - agreement.processVote(&vote) - } - agreement.restart(nIDs, rand.Position, crs) + if err := con.baMgr.processAgreementResult(rand); err != nil { + return err } // Calculating randomness. if rand.Position.Round == 0 { @@ -950,9 +834,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { - return err - } + err = con.baMgr.processBlock(b) return } @@ -1064,9 +946,15 @@ func (con *Consensus) prepareBlock(b *types.Block, if err = con.lattice.PrepareBlock(b, proposeTime); err != nil { return } - con.logger.Debug("Calling Governance.CRS", "round", 0) - if err = - con.authModule.SignCRS(b, con.gov.CRS(b.Position.Round)); err != nil { + con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round) + crs := con.gov.CRS(b.Position.Round) + if crs.Equal(common.Hash{}) { + con.logger.Error("CRS for round is not ready, unable to prepare block", + "position", &b.Position) + err = ErrCRSNotReady + return + } + if err = con.authModule.SignCRS(b, crs); err != nil { return } return |