diff options
Diffstat (limited to 'core/syncer/agreement.go')
-rw-r--r-- | core/syncer/agreement.go | 66 |
1 files changed, 47 insertions, 19 deletions
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go index 9b351ea..eaad860 100644 --- a/core/syncer/agreement.go +++ b/core/syncer/agreement.go @@ -18,6 +18,9 @@ package syncer import ( + "context" + "time" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core" "github.com/dexon-foundation/dexon-consensus/core/types" @@ -37,16 +40,14 @@ type agreement struct { pendings map[uint64]map[common.Hash]*types.AgreementResult logger common.Logger confirmedBlocks map[common.Hash]struct{} + ctx context.Context + ctxCancel context.CancelFunc } // newAgreement creates a new agreement instance. -func newAgreement( - ch chan<- *types.Block, - pullChan chan<- common.Hash, - cache *utils.NodeSetCache, - logger common.Logger) *agreement { - - return &agreement{ +func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash, + cache *utils.NodeSetCache, logger common.Logger) *agreement { + a := &agreement{ cache: cache, inputChan: make(chan interface{}, 1000), outputChan: ch, @@ -58,11 +59,14 @@ func newAgreement( map[uint64]map[common.Hash]*types.AgreementResult), confirmedBlocks: make(map[common.Hash]struct{}), } + a.ctx, a.ctxCancel = context.WithCancel(context.Background()) + return a } // run starts the agreement, this does not start a new routine, go a new // routine explicitly in the caller. func (a *agreement) run() { + defer a.ctxCancel() for { select { case val, ok := <-a.inputChan: @@ -119,22 +123,35 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { return } if r.IsEmptyBlock { - // Empty block is also confirmed. b := &types.Block{ Position: r.Position, } + // Empty blocks should be confirmed directly, they won't be sent over + // the wire. a.confirm(b) - } else { - needPull := true - if bs, exist := a.blocks[r.Position]; exist { - if b, exist := bs[r.BlockHash]; exist { - a.confirm(b) - needPull = false - } + return + } + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + a.confirm(b) + return } - if needPull { - a.agreementResults[r.BlockHash] = struct{}{} - a.pullChan <- r.BlockHash + } + a.agreementResults[r.BlockHash] = struct{}{} +loop: + for { + select { + case a.pullChan <- r.BlockHash: + break loop + case <-a.ctx.Done(): + a.logger.Error("pull request is not sent", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("pull request is unable to send", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) } } } @@ -168,7 +185,18 @@ func (a *agreement) confirm(b *types.Block) { if _, exist := a.confirmedBlocks[b.Hash]; !exist { delete(a.blocks, b.Position) delete(a.agreementResults, b.Hash) - a.outputChan <- b + loop: + for { + select { + case a.outputChan <- b: + break loop + case <-a.ctx.Done(): + a.logger.Error("confirmed block is not sent", "block", b) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("agreement output channel is full", "block", b) + } + } a.confirmedBlocks[b.Hash] = struct{}{} } } |