aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer/agreement.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/syncer/agreement.go')
-rw-r--r--core/syncer/agreement.go66
1 files changed, 47 insertions, 19 deletions
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index 9b351ea..eaad860 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -18,6 +18,9 @@
package syncer
import (
+ "context"
+ "time"
+
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/types"
@@ -37,16 +40,14 @@ type agreement struct {
pendings map[uint64]map[common.Hash]*types.AgreementResult
logger common.Logger
confirmedBlocks map[common.Hash]struct{}
+ ctx context.Context
+ ctxCancel context.CancelFunc
}
// newAgreement creates a new agreement instance.
-func newAgreement(
- ch chan<- *types.Block,
- pullChan chan<- common.Hash,
- cache *utils.NodeSetCache,
- logger common.Logger) *agreement {
-
- return &agreement{
+func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache, logger common.Logger) *agreement {
+ a := &agreement{
cache: cache,
inputChan: make(chan interface{}, 1000),
outputChan: ch,
@@ -58,11 +59,14 @@ func newAgreement(
map[uint64]map[common.Hash]*types.AgreementResult),
confirmedBlocks: make(map[common.Hash]struct{}),
}
+ a.ctx, a.ctxCancel = context.WithCancel(context.Background())
+ return a
}
// run starts the agreement, this does not start a new routine, go a new
// routine explicitly in the caller.
func (a *agreement) run() {
+ defer a.ctxCancel()
for {
select {
case val, ok := <-a.inputChan:
@@ -119,22 +123,35 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) {
return
}
if r.IsEmptyBlock {
- // Empty block is also confirmed.
b := &types.Block{
Position: r.Position,
}
+ // Empty blocks should be confirmed directly, they won't be sent over
+ // the wire.
a.confirm(b)
- } else {
- needPull := true
- if bs, exist := a.blocks[r.Position]; exist {
- if b, exist := bs[r.BlockHash]; exist {
- a.confirm(b)
- needPull = false
- }
+ return
+ }
+ if bs, exist := a.blocks[r.Position]; exist {
+ if b, exist := bs[r.BlockHash]; exist {
+ a.confirm(b)
+ return
}
- if needPull {
- a.agreementResults[r.BlockHash] = struct{}{}
- a.pullChan <- r.BlockHash
+ }
+ a.agreementResults[r.BlockHash] = struct{}{}
+loop:
+ for {
+ select {
+ case a.pullChan <- r.BlockHash:
+ break loop
+ case <-a.ctx.Done():
+ a.logger.Error("pull request is not sent",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
+ return
+ case <-time.After(500 * time.Millisecond):
+ a.logger.Debug("pull request is unable to send",
+ "position", &r.Position,
+ "hash", r.BlockHash.String()[:6])
}
}
}
@@ -168,7 +185,18 @@ func (a *agreement) confirm(b *types.Block) {
if _, exist := a.confirmedBlocks[b.Hash]; !exist {
delete(a.blocks, b.Position)
delete(a.agreementResults, b.Hash)
- a.outputChan <- b
+ loop:
+ for {
+ select {
+ case a.outputChan <- b:
+ break loop
+ case <-a.ctx.Done():
+ a.logger.Error("confirmed block is not sent", "block", b)
+ return
+ case <-time.After(500 * time.Millisecond):
+ a.logger.Debug("agreement output channel is full", "block", b)
+ }
+ }
a.confirmedBlocks[b.Hash] = struct{}{}
}
}