diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/agreement-mgr.go | 51 | ||||
-rw-r--r-- | core/agreement.go | 4 | ||||
-rw-r--r-- | core/consensus.go | 81 |
3 files changed, 45 insertions, 91 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 423174c..3e0851d 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -22,7 +22,6 @@ import ( "errors" "math" "sync" - "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -89,6 +88,7 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, } type baRoundSetting struct { + round uint64 notarySet map[types.NodeID]struct{} ticker Ticker crs common.Hash @@ -132,13 +132,9 @@ func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) { voteFilter: utils.NewVoteFilter(), } mgr.recv = &consensusBAReceiver{ - consensus: con, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, - changeNotaryHeightValue: &atomic.Value{}, + consensus: con, + restartNotary: make(chan types.Position, 1), } - mgr.recv.updateRound(uint64(0)) - mgr.recv.changeNotaryHeightValue.Store(uint64(0)) return mgr, nil } @@ -201,13 +197,6 @@ func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { } if lastCfg.RoundID() == e.Round { mgr.configs[len(mgr.configs)-1].ExtendLength() - // It's not an atomic operation to update an atomic value based - // on another. However, it's the best way so far to extend - // length of round without refactoring. - if mgr.recv.round() == e.Round { - mgr.recv.changeNotaryHeightValue.Store( - mgr.configs[len(mgr.configs)-1].RoundEndHeight()) - } } else if lastCfg.RoundID()+1 == e.Round { mgr.configs = append(mgr.configs, newAgreementMgrConfig( lastCfg, e.Config, e.CRS)) @@ -366,6 +355,7 @@ func (mgr *agreementMgr) runBA(initRound uint64) { } setting.crs = curConfig.crs setting.notarySet = notarySet + setting.round = nextRound _, isNotary = setting.notarySet[mgr.ID] if isNotary { mgr.logger.Info("Selected as notary set", @@ -395,14 +385,24 @@ Loop: } mgr.recv.isNotary = checkRound() // Run BA for this round. - mgr.recv.updateRound(currentRound) - mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight()) mgr.recv.restartNotary <- types.Position{ - Round: mgr.recv.round(), + Round: currentRound, Height: math.MaxUint64, } mgr.voteFilter = utils.NewVoteFilter() mgr.recv.emptyBlockHashMap = &sync.Map{} + if currentRound >= DKGDelayRound && mgr.recv.isNotary { + var err error + mgr.recv.npks, mgr.recv.psigSigner, err = + mgr.con.cfgModule.getDKGInfo(currentRound, false) + if err != nil { + mgr.logger.Warn("cannot get dkg info", + "round", currentRound, "error", err) + } + } else { + mgr.recv.npks = nil + mgr.recv.psigSigner = nil + } if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, @@ -419,7 +419,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( oldPos := agr.agreementID() restart := func(restartPos types.Position) (breakLoop bool, err error) { if !isStop(restartPos) { - if restartPos.Round > oldPos.Round { + if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() { for { select { case <-mgr.ctx.Done(): @@ -427,14 +427,12 @@ func (mgr *agreementMgr) baRoutineForOneRound( default: } tipRound := mgr.bcModule.tipRound() - if tipRound > restartPos.Round { - // It's a vary rare that this go routine sleeps for entire round. + if tipRound > setting.round { break - } else if tipRound != restartPos.Round { - mgr.logger.Debug("Waiting blockChain to change round...", - "pos", restartPos) } else { - break + mgr.logger.Debug("Waiting blockChain to change round...", + "curRound", setting.round, + "tipRound", tipRound) } time.Sleep(100 * time.Millisecond) } @@ -459,9 +457,6 @@ func (mgr *agreementMgr) baRoutineForOneRound( default: } nextHeight, nextTime = mgr.bcModule.nextBlock() - if isStop(oldPos) && nextHeight == 0 { - break - } if isStop(restartPos) { break } @@ -473,7 +468,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round(), + Round: setting.round, Height: nextHeight, } oldPos = nextPos diff --git a/core/agreement.go b/core/agreement.go index f2a9e3d..3745848 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -399,6 +399,9 @@ func (a *agreement) prepareVote(vote *types.Vote) (err error) { } func (a *agreement) updateFilter(filter *utils.VoteFilter) { + if isStop(a.agreementID()) { + return + } a.lock.RLock() defer a.lock.RUnlock() a.data.lock.RLock() @@ -417,6 +420,7 @@ func (a *agreement) processVote(vote *types.Vote) error { return err } aID := a.agreementID() + // Agreement module has stopped. if isStop(aID) { // Hacky way to not drop first votes for genesis height. diff --git a/core/consensus.go b/core/consensus.go index 57b3038..9702231 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -22,7 +22,6 @@ import ( "encoding/hex" "fmt" "sync" - "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" @@ -60,23 +59,13 @@ var ( // consensusBAReceiver implements agreementReceiver. type consensusBAReceiver struct { - consensus *Consensus - agreementModule *agreement - changeNotaryHeightValue *atomic.Value - roundValue *atomic.Value - emptyBlockHashMap *sync.Map - isNotary bool - restartNotary chan types.Position - npks *typesDKG.NodePublicKeys - psigSigner *dkgShareSecret -} - -func (recv *consensusBAReceiver) round() uint64 { - return recv.roundValue.Load().(uint64) -} - -func (recv *consensusBAReceiver) changeNotaryHeight() uint64 { - return recv.changeNotaryHeightValue.Load().(uint64) + consensus *Consensus + agreementModule *agreement + emptyBlockHashMap *sync.Map + isNotary bool + restartNotary chan types.Position + npks *typesDKG.NodePublicKeys + psigSigner *dkgShareSecret } func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) ( @@ -99,17 +88,13 @@ func (recv *consensusBAReceiver) emptyBlockHash(pos types.Position) ( } func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool { - if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { + if vote.Position.Round >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { - if recv.npks == nil || recv.npks.Round != vote.Position.Round { - var err error - recv.npks, _, err = - recv.consensus.cfgModule.getDKGInfo(vote.Position.Round, true) - if err != nil || recv.npks == nil { - recv.consensus.logger.Warn("cannot get npks", - "round", vote.Position.Round, "error", err) - return false - } + if recv.npks == nil { + return false + } + if vote.Position.Round != recv.npks.Round { + return false } pubKey, exist := recv.npks.PublicKeys[vote.ProposerID] if !exist { @@ -138,11 +123,9 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return } - if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { + if recv.psigSigner != nil && + vote.BlockHash != types.SkipBlockHash { if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { - if recv.psigSigner == nil { - return - } if vote.BlockHash == types.NullBlockHash { hash, err := recv.emptyBlockHash(vote.Position) if err != nil { @@ -272,7 +255,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( if vote.BlockHash != hash { continue } - if recv.round() >= DKGDelayRound { + if block.Position.Round >= DKGDelayRound { ID, exist := recv.npks.IDMap[vote.ProposerID] if !exist { continue @@ -282,7 +265,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( } voteList = append(voteList, *vote) } - if recv.round() >= DKGDelayRound { + if block.Position.Round >= DKGDelayRound { rand, err := cryptoDKG.RecoverSignature(psigs, IDs) if err != nil { recv.consensus.logger.Warn("Unable to recover randomness", @@ -385,23 +368,7 @@ CleanChannelLoop: break CleanChannelLoop } } - newPos := block.Position - changeNotaryHeight := recv.changeNotaryHeight() - if block.Position.Height+1 >= changeNotaryHeight { - recv.consensus.logger.Info("Round will change", - "block", block, - "change-height", changeNotaryHeight) - newPos.Round++ - recv.updateRound(newPos.Round) - } - currentRound := recv.round() - if block.Position.Height > changeNotaryHeight && - block.Position.Round < currentRound { - panic(fmt.Errorf( - "round not switch when confirming: %s, %d, should switch at %d, %s", - block, currentRound, changeNotaryHeight, newPos)) - } - recv.restartNotary <- newPos + recv.restartNotary <- block.Position } func (recv *consensusBAReceiver) PullBlocks(hashes common.Hashes) { @@ -420,18 +387,6 @@ func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) { recv.consensus.gov.ReportForkBlock(b1, b2) } -func (recv *consensusBAReceiver) updateRound(round uint64) { - recv.roundValue.Store(round) - var err error - _, recv.psigSigner, err = - recv.consensus.cfgModule.getDKGInfo(round, false) - if err != nil { - recv.consensus.logger.Warn("cannot get dkg info", - "round", round, "error", err) - recv.psigSigner = nil - } -} - // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { ID types.NodeID |