From 0e6dc8b38f7df249831aebd4928ec42b827038e3 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Wed, 23 Jan 2019 15:02:42 +0800 Subject: core: optimize core (#428) * core: Use a channel to process ba confirmed block * change the implementation of done() to react faster * Fix restart * Wait tipRound to change * fix corner case * Check for context --- core/agreement-mgr.go | 137 ++++++++++++++++++++++++++++++++++---------------- core/agreement.go | 44 ++++++++++------ core/consensus.go | 39 +++++++++----- core/lattice-data.go | 13 +++++ core/lattice.go | 7 +++ 5 files changed, 169 insertions(+), 71 deletions(-) (limited to 'core') diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 9e86369..7410977 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -419,7 +419,10 @@ Loop: // Run BA for this round. recv.roundValue.Store(currentRound) recv.changeNotaryTime = roundEndTime - recv.restartNotary <- types.Position{ChainID: math.MaxUint32} + recv.restartNotary <- types.Position{ + Round: setting.recv.round(), + ChainID: math.MaxUint32, + } if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, @@ -435,6 +438,79 @@ func (mgr *agreementMgr) baRoutineForOneRound( agr := setting.agr recv := setting.recv oldPos := agr.agreementID() + restart := func(restartPos types.Position) (breakLoop bool, err error) { + if !isStop(restartPos) { + if restartPos.Round > oldPos.Round { + for { + select { + case <-mgr.ctx.Done(): + break + default: + } + tipRound := mgr.lattice.TipRound(setting.chainID) + if tipRound > restartPos.Round { + // It's a vary rare that this go routine sleeps for entire round. + break + } else if tipRound != restartPos.Round { + mgr.logger.Debug("Waiting lattice to change round...", + "pos", &restartPos) + } else { + break + } + time.Sleep(100 * time.Millisecond) + } + // This round is finished. + breakLoop = true + return + } + if restartPos.Older(&oldPos) { + // The restartNotary event is triggered by 'BlockConfirmed' + // of some older block. + return + } + } + var nextHeight uint64 + var nextTime time.Time + for { + nextHeight, nextTime, err = + mgr.lattice.NextBlock(recv.round(), setting.chainID) + if err != nil { + mgr.logger.Debug("Error getting next height", + "error", err, + "round", recv.round(), + "chainID", setting.chainID) + err = nil + nextHeight = restartPos.Height + } + if isStop(oldPos) && nextHeight == 0 { + break + } + if isStop(restartPos) && nextHeight == 0 { + break + } + if nextHeight > restartPos.Height { + break + } + mgr.logger.Debug("Lattice not ready!!!", + "old", &oldPos, "restart", &restartPos, "next", nextHeight) + time.Sleep(100 * time.Millisecond) + } + nextPos := types.Position{ + Round: recv.round(), + ChainID: setting.chainID, + Height: nextHeight, + } + oldPos = nextPos + var leader types.NodeID + leader, err = mgr.cache.GetLeaderNode(nextPos) + if err != nil { + return + } + time.Sleep(nextTime.Sub(time.Now())) + setting.ticker.Restart() + agr.restart(setting.notarySet, nextPos, leader, setting.crs) + return + } Loop: for { select { @@ -442,55 +518,30 @@ Loop: break Loop default: } - select { - case restartPos := <-recv.restartNotary: - if !isStop(restartPos) { - if restartPos.Round > oldPos.Round { - // This round is finished. - break Loop - } - if restartPos.Older(&oldPos) { - // The restartNotary event is triggered by 'BlockConfirmed' - // of some older block. - break - } - } - var nextHeight uint64 - var nextTime time.Time - for { - nextHeight, nextTime, err = - mgr.lattice.NextBlock(recv.round(), setting.chainID) + if agr.confirmed() { + // Block until receive restartPos + select { + case restartPos := <-recv.restartNotary: + breakLoop, err := restart(restartPos) if err != nil { - mgr.logger.Debug("Error getting next height", - "error", err, - "round", recv.round(), - "chainID", setting.chainID) - err = nil - nextHeight = restartPos.Height - } - if isStop(restartPos) || nextHeight == 0 { - break + return err } - if nextHeight > restartPos.Height { - break + if breakLoop { + break Loop } - mgr.logger.Debug("Lattice not ready!!!", - "old", &restartPos, "next", nextHeight) - time.Sleep(100 * time.Millisecond) - } - nextPos := types.Position{ - Round: recv.round(), - ChainID: setting.chainID, - Height: nextHeight, + case <-mgr.ctx.Done(): + break Loop } - oldPos = nextPos - leader, err := mgr.cache.GetLeaderNode(nextPos) + } + select { + case restartPos := <-recv.restartNotary: + breakLoop, err := restart(restartPos) if err != nil { return err } - time.Sleep(nextTime.Sub(time.Now())) - setting.ticker.Restart() - agr.restart(setting.notarySet, nextPos, leader, setting.crs) + if breakLoop { + break Loop + } default: } if agr.pullVotes() { diff --git a/core/agreement.go b/core/agreement.go index 97848c5..ebb9b02 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -29,6 +29,13 @@ import ( "github.com/dexon-foundation/dexon-consensus/core/utils" ) +// closedchan is a reusable closed channel. +var closedchan = make(chan struct{}) + +func init() { + close(closedchan) +} + // Errors for agreement module. var ( ErrInvalidVote = fmt.Errorf("invalid vote") @@ -110,6 +117,7 @@ type agreement struct { state agreementState data *agreementData aID *atomic.Value + doneChan chan struct{} notarySet map[types.NodeID]struct{} hasVoteFast bool hasOutput bool @@ -171,6 +179,10 @@ func (a *agreement) restart( a.data.lockValue = nullBlockHash a.data.lockIter = 0 a.data.isLeader = a.data.ID == leader + if a.doneChan != nil { + close(a.doneChan) + } + a.doneChan = make(chan struct{}) a.fastForward = make(chan uint64, 1) a.hasVoteFast = false a.hasOutput = false @@ -401,6 +413,8 @@ func (a *agreement) processVote(vote *types.Vote) error { a.hasOutput = true a.data.recv.ConfirmBlock(hash, a.data.votes[vote.Period][vote.Type]) + close(a.doneChan) + a.doneChan = nil } return nil } @@ -462,24 +476,24 @@ func (a *agreement) processVote(vote *types.Vote) error { func (a *agreement) done() <-chan struct{} { a.lock.Lock() defer a.lock.Unlock() + if a.doneChan == nil { + return closedchan + } a.data.lock.Lock() defer a.data.lock.Unlock() - ch := make(chan struct{}, 1) - if a.hasOutput { - ch <- struct{}{} - } else { - select { - case period := <-a.fastForward: - if period <= a.data.period { - break - } - a.data.setPeriod(period) - a.state = newPreCommitState(a.data) - ch <- struct{}{} - default: + select { + case period := <-a.fastForward: + if period <= a.data.period { + break } - } - return ch + a.data.setPeriod(period) + a.state = newPreCommitState(a.data) + close(a.doneChan) + a.doneChan = make(chan struct{}) + return closedchan + default: + } + return a.doneChan } func (a *agreement) confirmed() bool { diff --git a/core/consensus.go b/core/consensus.go index afc8973..67a8b05 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -203,12 +203,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( "cur-position", &block.Position, "chainID", recv.chainID) recv.consensus.ccModule.registerBlock(block) - if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", - "block", block, - "error", err) - return - } + recv.consensus.processBlockChan <- block parentHash = block.ParentHash if block.Position.Height == 0 || recv.consensus.lattice.Exist(parentHash) { @@ -235,12 +230,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( "result", result) recv.consensus.network.BroadcastAgreementResult(result) } - if err := recv.consensus.processBlock(block); err != nil { - recv.consensus.logger.Error("Failed to process block", - "block", block, - "error", err) - return - } + recv.consensus.processBlockChan <- block // Clean the restartNotary channel so BA will not stuck by deadlock. CleanChannelLoop: for { @@ -252,8 +242,8 @@ CleanChannelLoop: } newPos := block.Position if block.Timestamp.After(recv.changeNotaryTime) { - recv.roundValue.Store(recv.round() + 1) newPos.Round++ + recv.roundValue.Store(newPos.Round) } recv.restartNotary <- newPos } @@ -409,6 +399,7 @@ type Consensus struct { resetDeliveryGuardTicker chan struct{} msgChan chan interface{} waitGroup sync.WaitGroup + processBlockChan chan *types.Block // Context of Dummy receiver during switching from syncer. dummyCancel context.CancelFunc @@ -578,6 +569,7 @@ func newConsensusForRound( resetRandomnessTicker: make(chan struct{}), resetDeliveryGuardTicker: make(chan struct{}), msgChan: make(chan interface{}, 1024), + processBlockChan: make(chan *types.Block, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) @@ -647,6 +639,7 @@ func (con *Consensus) Run() { go con.deliverNetworkMsg() con.waitGroup.Add(1) go con.processMsg() + go con.processBlockLoop() // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) // Take some time to bootstrap. @@ -1242,6 +1235,26 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) { return } +func (con *Consensus) processBlockLoop() { + for { + select { + case <-con.ctx.Done(): + return + default: + } + select { + case <-con.ctx.Done(): + return + case block := <-con.processBlockChan: + if err := con.processBlock(block); err != nil { + con.logger.Error("Error processing block", + "block", block, + "error", err) + } + } + } +} + // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { con.lock.Lock() diff --git a/core/lattice-data.go b/core/lattice-data.go index b9ad699..0bbe890 100644 --- a/core/lattice-data.go +++ b/core/lattice-data.go @@ -387,6 +387,19 @@ func (data *latticeData) addFinalizedBlock(block *types.Block) (err error) { return } +func (data *latticeData) tipRound(chainID uint32) uint64 { + if tip := data.chains[chainID].tip; tip != nil { + tipConfig := data.getConfig(tip.Position.Round) + offset := uint64(0) + if tip.Timestamp.After(tipConfig.roundEndTime) { + offset++ + } + return tip.Position.Round + offset + } + return uint64(0) + +} + // isBindTip checks if a block's fields should follow up its parent block. func (data *latticeData) isBindTip( pos types.Position, tip *types.Block) (bindTip bool, err error) { diff --git a/core/lattice.go b/core/lattice.go index d531639..de0e549 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -290,6 +290,13 @@ func (l *Lattice) NextBlock(round uint64, chainID uint32) ( return l.data.nextBlock(round, chainID) } +// TipRound returns the round of the tip of given chain. +func (l *Lattice) TipRound(chainID uint32) uint64 { + l.lock.RLock() + defer l.lock.RUnlock() + return l.data.tipRound(chainID) +} + // PurgeBlocks purges blocks' cache in memory, this is called when the caller // makes sure those blocks are already saved in db. func (l *Lattice) PurgeBlocks(blocks []*types.Block) error { -- cgit v1.2.3