aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go')
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go63
1 files changed, 33 insertions, 30 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index 413f16caa..3a27b5fc1 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -99,7 +99,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash {
block := recv.consensus.proposeBlock(recv.chainID, recv.round())
if block == nil {
recv.consensus.logger.Error("unable to propose block")
- return nullBlockHash
+ return types.NullBlockHash
}
go func() {
if err := recv.consensus.preProcessBlock(block); err != nil {
@@ -203,12 +203,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
"cur-position", &block.Position,
"chainID", recv.chainID)
recv.consensus.ccModule.registerBlock(block)
- if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block",
- "block", block,
- "error", err)
- return
- }
+ recv.consensus.processBlockChan <- block
parentHash = block.ParentHash
if block.Position.Height == 0 ||
recv.consensus.lattice.Exist(parentHash) {
@@ -235,12 +230,7 @@ func (recv *consensusBAReceiver) ConfirmBlock(
"result", result)
recv.consensus.network.BroadcastAgreementResult(result)
}
- if err := recv.consensus.processBlock(block); err != nil {
- recv.consensus.logger.Error("Failed to process block",
- "block", block,
- "error", err)
- return
- }
+ recv.consensus.processBlockChan <- block
// Clean the restartNotary channel so BA will not stuck by deadlock.
CleanChannelLoop:
for {
@@ -252,8 +242,8 @@ CleanChannelLoop:
}
newPos := block.Position
if block.Timestamp.After(recv.changeNotaryTime) {
- recv.roundValue.Store(recv.round() + 1)
newPos.Round++
+ recv.roundValue.Store(newPos.Round)
}
recv.restartNotary <- newPos
}
@@ -398,7 +388,7 @@ type Consensus struct {
dMoment time.Time
nodeSetCache *utils.NodeSetCache
round uint64
- roundToNotify uint64
+ roundForNewConfig uint64
lock sync.RWMutex
ctx context.Context
ctxCancel context.CancelFunc
@@ -409,6 +399,7 @@ type Consensus struct {
resetDeliveryGuardTicker chan struct{}
msgChan chan interface{}
waitGroup sync.WaitGroup
+ processBlockChan chan *types.Block
// Context of Dummy receiver during switching from syncer.
dummyCancel context.CancelFunc
@@ -577,7 +568,8 @@ func newConsensusForRound(
logger: logger,
resetRandomnessTicker: make(chan struct{}),
resetDeliveryGuardTicker: make(chan struct{}),
- msgChan: make(chan interface{}, 10240),
+ msgChan: make(chan interface{}, 1024),
+ processBlockChan: make(chan *types.Block, 1024),
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
@@ -594,7 +586,7 @@ func newConsensusForRound(
func (con *Consensus) prepare(initBlock *types.Block) error {
// The block past from full node should be delivered already or known by
// full node. We don't have to notify it.
- con.roundToNotify = initBlock.Position.Round + 1
+ con.roundForNewConfig = initBlock.Position.Round + 1
initRound := initBlock.Position.Round
initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger)
// Setup context.
@@ -647,6 +639,7 @@ func (con *Consensus) Run() {
go con.deliverNetworkMsg()
con.waitGroup.Add(1)
go con.processMsg()
+ go con.processBlockLoop()
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
// Take some time to bootstrap.
@@ -1192,13 +1185,13 @@ func (con *Consensus) deliverBlock(b *types.Block) {
con.cfgModule.untouchTSigHash(b.Hash)
con.logger.Debug("Calling Application.BlockDelivered", "block", b)
con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone())
- if b.Position.Round == con.roundToNotify {
+ if b.Position.Round == con.roundForNewConfig {
// Get configuration for the round next to next round. Configuration
// for that round should be ready at this moment and is required for
// lattice module. This logic is related to:
// - roundShift
// - notifyGenesisRound
- futureRound := con.roundToNotify + 1
+ futureRound := con.roundForNewConfig + 1
futureConfig := utils.GetConfigWithPanic(con.gov, futureRound, con.logger)
con.logger.Debug("Append Config", "round", futureRound)
if err := con.lattice.AppendConfig(
@@ -1208,14 +1201,7 @@ func (con *Consensus) deliverBlock(b *types.Block) {
"error", err)
panic(err)
}
- // Only the first block delivered of that round would
- // trigger this noitification.
- con.logger.Debug("Calling Governance.NotifyRoundHeight",
- "round", con.roundToNotify,
- "height", b.Finalization.Height)
- con.gov.NotifyRoundHeight(
- con.roundToNotify, b.Finalization.Height)
- con.roundToNotify++
+ con.roundForNewConfig++
}
if con.debugApp != nil {
con.debugApp.BlockReady(b.Hash)
@@ -1242,11 +1228,28 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) {
return
}
+func (con *Consensus) processBlockLoop() {
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
+ select {
+ case <-con.ctx.Done():
+ return
+ case block := <-con.processBlockChan:
+ if err := con.processBlock(block); err != nil {
+ con.logger.Error("Error processing block",
+ "block", block,
+ "error", err)
+ }
+ }
+ }
+}
+
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
- if err = con.db.PutBlock(*block); err != nil && err != db.ErrBlockExists {
- return
- }
con.lock.Lock()
defer con.lock.Unlock()
// Block processed by lattice can be out-of-order. But the output of lattice