aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-01-23 15:02:42 +0800
committerGitHub <noreply@github.com>2019-01-23 15:02:42 +0800
commit0e6dc8b38f7df249831aebd4928ec42b827038e3 (patch)
tree29966ef1a69ed7431dbac461049338056f0717da
parentca43bd9f99deead21dae71a749297dc6aa361898 (diff)
downloaddexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.tar
dexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.tar.gz
dexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.tar.bz2
dexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.tar.lz
dexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.tar.xz
dexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.tar.zst
dexon-consensus-0e6dc8b38f7df249831aebd4928ec42b827038e3.zip
core: optimize core (#428)
* core: Use a channel to process ba confirmed block * change the implementation of done() to react faster * Fix restart * Wait tipRound to change * fix corner case * Check for context
-rw-r--r--core/agreement-mgr.go137
-rw-r--r--core/agreement.go44
-rw-r--r--core/consensus.go39
-rw-r--r--core/lattice-data.go13
-rw-r--r--core/lattice.go7
5 files changed, 169 insertions, 71 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index 9e86369..7410977 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -419,7 +419,10 @@ Loop:
// Run BA for this round.
recv.roundValue.Store(currentRound)
recv.changeNotaryTime = roundEndTime
- recv.restartNotary <- types.Position{ChainID: math.MaxUint32}
+ recv.restartNotary <- types.Position{
+ Round: setting.recv.round(),
+ ChainID: math.MaxUint32,
+ }
if err := mgr.baRoutineForOneRound(&setting); err != nil {
mgr.logger.Error("BA routine failed",
"error", err,
@@ -435,6 +438,79 @@ func (mgr *agreementMgr) baRoutineForOneRound(
agr := setting.agr
recv := setting.recv
oldPos := agr.agreementID()
+ restart := func(restartPos types.Position) (breakLoop bool, err error) {
+ if !isStop(restartPos) {
+ if restartPos.Round > oldPos.Round {
+ for {
+ select {
+ case <-mgr.ctx.Done():
+ break
+ default:
+ }
+ tipRound := mgr.lattice.TipRound(setting.chainID)
+ if tipRound > restartPos.Round {
+ // It's a vary rare that this go routine sleeps for entire round.
+ break
+ } else if tipRound != restartPos.Round {
+ mgr.logger.Debug("Waiting lattice to change round...",
+ "pos", &restartPos)
+ } else {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ // This round is finished.
+ breakLoop = true
+ return
+ }
+ if restartPos.Older(&oldPos) {
+ // The restartNotary event is triggered by 'BlockConfirmed'
+ // of some older block.
+ return
+ }
+ }
+ var nextHeight uint64
+ var nextTime time.Time
+ for {
+ nextHeight, nextTime, err =
+ mgr.lattice.NextBlock(recv.round(), setting.chainID)
+ if err != nil {
+ mgr.logger.Debug("Error getting next height",
+ "error", err,
+ "round", recv.round(),
+ "chainID", setting.chainID)
+ err = nil
+ nextHeight = restartPos.Height
+ }
+ if isStop(oldPos) && nextHeight == 0 {
+ break
+ }
+ if isStop(restartPos) && nextHeight == 0 {
+ break
+ }
+ if nextHeight > restartPos.Height {
+ break
+ }
+ mgr.logger.Debug("Lattice not ready!!!",
+ "old", &oldPos, "restart", &restartPos, "next", nextHeight)
+ time.Sleep(100 * time.Millisecond)
+ }
+ nextPos := types.Position{
+ Round: recv.round(),
+ ChainID: setting.chainID,
+ Height: nextHeight,
+ }
+ oldPos = nextPos
+ var leader types.NodeID
+ leader, err = mgr.cache.GetLeaderNode(nextPos)
+ if err != nil {
+ return
+ }
+ time.Sleep(nextTime.Sub(time.Now()))
+ setting.ticker.Restart()
+ agr.restart(setting.notarySet, nextPos, leader, setting.crs)
+ return
+ }
Loop:
for {
select {
@@ -442,55 +518,30 @@ Loop:
break Loop
default:
}
- select {
- case restartPos := <-recv.restartNotary:
- if !isStop(restartPos) {
- if restartPos.Round > oldPos.Round {
- // This round is finished.
- break Loop
- }
- if restartPos.Older(&oldPos) {
- // The restartNotary event is triggered by 'BlockConfirmed'
- // of some older block.
- break
- }
- }
- var nextHeight uint64
- var nextTime time.Time
- for {
- nextHeight, nextTime, err =
- mgr.lattice.NextBlock(recv.round(), setting.chainID)
+ if agr.confirmed() {
+ // Block until receive restartPos
+ select {
+ case restartPos := <-recv.restartNotary:
+ breakLoop, err := restart(restartPos)
if err != nil {
- mgr.logger.Debug("Error getting next height",
- "error", err,
- "round", recv.round(),
- "chainID", setting.chainID)
- err = nil
- nextHeight = restartPos.Height
- }
- if isStop(restartPos) || nextHeight == 0 {
- break
+ return err
}
- if nextHeight > restartPos.Height {
- break
+ if breakLoop {
+ break Loop
}
- mgr.logger.Debug("Lattice not ready!!!",
- "old", &restartPos, "next", nextHeight)
- time.Sleep(100 * time.Millisecond)
- }
- nextPos := types.Position{
- Round: recv.round(),
- ChainID: setting.chainID,
- Height: nextHeight,
+ case <-mgr.ctx.Done():
+ break Loop
}
- oldPos = nextPos
- leader, err := mgr.cache.GetLeaderNode(nextPos)
+ }
+ select {
+ case restartPos := <-recv.restartNotary:
+ breakLoop, err := restart(restartPos)
if err != nil {
return err
}
- time.Sleep(nextTime.Sub(time.Now()))
- setting.ticker.Restart()
- agr.restart(setting.notarySet, nextPos, leader, setting.crs)
+ if breakLoop {
+ break Loop
+ }
default:
}
if agr.pullVotes() {
diff --git a/core/agreement.go b/core/agreement.go
index 97848c5..ebb9b02 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -29,6 +29,13 @@ import (
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
+// closedchan is a reusable closed channel.
+var closedchan = make(chan struct{})
+
+func init() {
+ close(closedchan)
+}
+
// Errors for agreement module.
var (
ErrInvalidVote = fmt.Errorf("invalid vote")
@@ -110,6 +117,7 @@ type agreement struct {
state agreementState
data *agreementData
aID *atomic.Value
+ doneChan chan struct{}
notarySet map[types.NodeID]struct{}
hasVoteFast bool
hasOutput bool
@@ -171,6 +179,10 @@ func (a *agreement) restart(
a.data.lockValue = nullBlockHash
a.data.lockIter = 0
a.data.isLeader = a.data.ID == leader
+ if a.doneChan != nil {
+ close(a.doneChan)
+ }
+ a.doneChan = make(chan struct{})
a.fastForward = make(chan uint64, 1)
a.hasVoteFast = false
a.hasOutput = false
@@ -401,6 +413,8 @@ func (a *agreement) processVote(vote *types.Vote) error {
a.hasOutput = true
a.data.recv.ConfirmBlock(hash,
a.data.votes[vote.Period][vote.Type])
+ close(a.doneChan)
+ a.doneChan = nil
}
return nil
}
@@ -462,24 +476,24 @@ func (a *agreement) processVote(vote *types.Vote) error {
func (a *agreement) done() <-chan struct{} {
a.lock.Lock()
defer a.lock.Unlock()
+ if a.doneChan == nil {
+ return closedchan
+ }
a.data.lock.Lock()
defer a.data.lock.Unlock()
- ch := make(chan struct{}, 1)
- if a.hasOutput {
- ch <- struct{}{}
- } else {
- select {
- case period := <-a.fastForward:
- if period <= a.data.period {
- break
- }
- a.data.setPeriod(period)
- a.state = newPreCommitState(a.data)
- ch <- struct{}{}
- default:
+ select {
+ case period := <-a.fastForward:
+ if period <= a.data.period {
+ break
}
- }
- return ch
+ a.data.setPeriod(period)
+ a.state = newPreCommitState(a.data)
+ close(a.doneChan)
+ a.doneChan = make(chan struct{})
+ return closedchan
+ default:
+ }
+ return a.doneChan
}
func (a *agreement) confirmed() bool {
diff --git a/core/consensus.go b/core/consensus.go
index afc8973..67a8b05 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -203,12 +203,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
"cur-position", &block.Position,
"chainID", recv.chainID)
recv.consensus.ccModule.registerBlock(block)
- if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block",
- "block", block,
- "error", err)
- return
- }
+ recv.consensus.processBlockChan <- block
parentHash = block.ParentHash
if block.Position.Height == 0 ||
recv.consensus.lattice.Exist(parentHash) {
@@ -235,12 +230,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
"result", result)
recv.consensus.network.BroadcastAgreementResult(result)
}
- if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block",
- "block", block,
- "error", err)
- return
- }
+ recv.consensus.processBlockChan <- block
// Clean the restartNotary channel so BA will not stuck by deadlock.
CleanChannelLoop:
for {
@@ -252,8 +242,8 @@ CleanChannelLoop:
}
newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
- recv.roundValue.Store(recv.round() + 1)
newPos.Round++
+ recv.roundValue.Store(newPos.Round)
}
recv.restartNotary <- newPos
}
@@ -409,6 +399,7 @@ type Consensus struct {
resetDeliveryGuardTicker chan struct{}
msgChan chan interface{}
waitGroup sync.WaitGroup
+ processBlockChan chan *types.Block
// Context of Dummy receiver during switching from syncer.
dummyCancel context.CancelFunc
@@ -578,6 +569,7 @@ func newConsensusForRound(
resetRandomnessTicker: make(chan struct{}),
resetDeliveryGuardTicker: make(chan struct{}),
msgChan: make(chan interface{}, 1024),
+ processBlockChan: make(chan *types.Block, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
@@ -647,6 +639,7 @@ func (con *Consensus) Run() {
go con.deliverNetworkMsg()
con.waitGroup.Add(1)
go con.processMsg()
+ go con.processBlockLoop()
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Take some time to bootstrap.
@@ -1242,6 +1235,26 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
return
}
+func (con *Consensus) processBlockLoop() {
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
+ case block := <-con.processBlockChan:
+ if err := con.processBlock(block); err != nil {
+ con.logger.Error("Error processing block",
+ "block", block,
+ "error", err)
+ }
+ }
+ }
+}
+
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
con.lock.Lock()
diff --git a/core/lattice-data.go b/core/lattice-data.go
index b9ad699..0bbe890 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -387,6 +387,19 @@ func (data *latticeData) addFinalizedBlock(block *types.Block) (err error) {
return
}
+func (data *latticeData) tipRound(chainID uint32) uint64 {
+ if tip := data.chains[chainID].tip; tip != nil {
+ tipConfig := data.getConfig(tip.Position.Round)
+ offset := uint64(0)
+ if tip.Timestamp.After(tipConfig.roundEndTime) {
+ offset++
+ }
+ return tip.Position.Round + offset
+ }
+ return uint64(0)
+
+}
+
// isBindTip checks if a block's fields should follow up its parent block.
func (data *latticeData) isBindTip(
pos types.Position, tip *types.Block) (bindTip bool, err error) {
diff --git a/core/lattice.go b/core/lattice.go
index d531639..de0e549 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -290,6 +290,13 @@ func (l *Lattice) NextBlock(round uint64, chainID uint32) (
return l.data.nextBlock(round, chainID)
}
+// TipRound returns the round of the tip of given chain.
+func (l *Lattice) TipRound(chainID uint32) uint64 {
+ l.lock.RLock()
+ defer l.lock.RUnlock()
+ return l.data.tipRound(chainID)
+}
+
// PurgeBlocks purges blocks' cache in memory, this is called when the caller
// makes sure those blocks are already saved in db.
func (l *Lattice) PurgeBlocks(blocks []*types.Block) error {