diff options
Diffstat (limited to 'miner')
-rw-r--r-- | miner/worker.go | 75 |
1 files changed, 52 insertions, 23 deletions
diff --git a/miner/worker.go b/miner/worker.go index 8c3337ba4..18fb12e45 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -72,6 +72,9 @@ const ( // intervalAdjustBias is applied during the new resubmit interval calculation in favor of // increasing upper limit or decreasing lower limit so that the limit can be reachable. intervalAdjustBias = 200 * 1000.0 * 1000.0 + + // staleThreshold is the maximum distance of the acceptable stale block. + staleThreshold = 7 ) // environment is the worker's current environment and holds all of the current state information. @@ -150,6 +153,9 @@ type worker struct { coinbase common.Address extra []byte + pendingMu sync.RWMutex + pendingTasks map[common.Hash]*task + snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot snapshotBlock *types.Block snapshotState *state.StateDB @@ -174,6 +180,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, chain: eth.BlockChain(), possibleUncles: make(map[common.Hash]*types.Block), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + pendingTasks: make(map[common.Hash]*task), txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), @@ -317,13 +324,25 @@ func (w *worker) newWorkLoop(recommit time.Duration) { } recommit = time.Duration(int64(next)) } + // clearPending cleans the stale pending tasks. + clearPending := func(number uint64) { + w.pendingMu.Lock() + for h, t := range w.pendingTasks { + if t.block.NumberU64()+staleThreshold <= number { + delete(w.pendingTasks, h) + } + } + w.pendingMu.Unlock() + } for { select { case <-w.startCh: + clearPending(w.chain.CurrentBlock().NumberU64()) commit(false, commitInterruptNewHead) - case <-w.chainHeadCh: + case head := <-w.chainHeadCh: + clearPending(head.Block.NumberU64()) commit(false, commitInterruptNewHead) case <-timer.C: @@ -454,28 +473,37 @@ func (w *worker) mainLoop() { // seal pushes a sealing task to consensus engine and submits the result. func (w *worker) seal(t *task, stop <-chan struct{}) { - var ( - err error - res *task - ) - if w.skipSealHook != nil && w.skipSealHook(t) { return } - - if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil { - log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(), - "elapsed", common.PrettyDuration(time.Since(t.createdAt))) - res = t - } else { - if err != nil { - log.Warn("Block sealing failed", "err", err) + // The reason for caching task first is: + // A previous sealing action will be canceled by subsequent actions, + // however, remote miner may submit a result based on the cancelled task. + // So we should only submit the pending state corresponding to the seal result. + // TODO(rjl493456442) Replace the seal-wait logic structure + w.pendingMu.Lock() + w.pendingTasks[w.engine.SealHash(t.block.Header())] = t + w.pendingMu.Unlock() + + if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil { + sealhash := w.engine.SealHash(block.Header()) + w.pendingMu.RLock() + task, exist := w.pendingTasks[sealhash] + w.pendingMu.RUnlock() + if !exist { + log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash()) + return } - res = nil - } - select { - case w.resultCh <- res: - case <-w.exitCh: + // Assemble sealing result + task.block = block + log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(), + "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + select { + case w.resultCh <- task: + case <-w.exitCh: + } + } else if err != nil { + log.Warn("Block sealing failed", "err", err) } } @@ -501,12 +529,13 @@ func (w *worker) taskLoop() { w.newTaskHook(task) } // Reject duplicate sealing work due to resubmitting. - if task.block.HashNoNonce() == prev { + sealHash := w.engine.SealHash(task.block.Header()) + if sealHash == prev { continue } interrupt() stopCh = make(chan struct{}) - prev = task.block.HashNoNonce() + prev = sealHash go w.seal(task, stopCh) case <-w.exitCh: interrupt() @@ -928,8 +957,8 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st } feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) - log.Info("Commit new mining work", "number", block.Number(), "uncles", len(uncles), "txs", w.current.tcount, - "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start))) + log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), + "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: log.Info("Worker has exited") |