aboutsummaryrefslogtreecommitdiffstats
path: root/miner/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'miner/worker.go')
-rw-r--r--miner/worker.go125
1 files changed, 69 insertions, 56 deletions
diff --git a/miner/worker.go b/miner/worker.go
index 34329f849..f1194fa18 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -55,9 +55,8 @@ const (
type Agent interface {
Work() chan<- *Work
SetReturnCh(chan<- *Result)
- Stop()
Start()
- GetHashRate() int64
+ Stop()
}
// Work is the workers current environment and holds
@@ -102,7 +101,6 @@ type worker struct {
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
- wg sync.WaitGroup
agents map[Agent]struct{}
recv chan *Result
@@ -128,11 +126,11 @@ type worker struct {
unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
// atomic status counters
- mining int32
- atWork int32
+ atWork int32 // The number of in-flight consensus engine work.
+ running int32 // The indicator whether the consensus engine is running or not.
}
-func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
+func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
worker := &worker{
config: config,
engine: engine,
@@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
chain: eth.BlockChain(),
proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
- coinbase: coinbase,
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
}
@@ -176,62 +173,51 @@ func (self *worker) setExtra(extra []byte) {
}
func (self *worker) pending() (*types.Block, *state.StateDB) {
- if atomic.LoadInt32(&self.mining) == 0 {
- // return a snapshot to avoid contention on currentMu mutex
- self.snapshotMu.RLock()
- defer self.snapshotMu.RUnlock()
- return self.snapshotBlock, self.snapshotState.Copy()
- }
-
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- return self.current.Block, self.current.state.Copy()
+ // return a snapshot to avoid contention on currentMu mutex
+ self.snapshotMu.RLock()
+ defer self.snapshotMu.RUnlock()
+ return self.snapshotBlock, self.snapshotState.Copy()
}
func (self *worker) pendingBlock() *types.Block {
- if atomic.LoadInt32(&self.mining) == 0 {
- // return a snapshot to avoid contention on currentMu mutex
- self.snapshotMu.RLock()
- defer self.snapshotMu.RUnlock()
- return self.snapshotBlock
- }
-
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- return self.current.Block
+ // return a snapshot to avoid contention on currentMu mutex
+ self.snapshotMu.RLock()
+ defer self.snapshotMu.RUnlock()
+ return self.snapshotBlock
}
func (self *worker) start() {
self.mu.Lock()
defer self.mu.Unlock()
-
- atomic.StoreInt32(&self.mining, 1)
-
- // spin up agents
+ atomic.StoreInt32(&self.running, 1)
for agent := range self.agents {
agent.Start()
}
}
func (self *worker) stop() {
- self.wg.Wait()
-
self.mu.Lock()
defer self.mu.Unlock()
- if atomic.LoadInt32(&self.mining) == 1 {
- for agent := range self.agents {
- agent.Stop()
- }
+
+ atomic.StoreInt32(&self.running, 0)
+ for agent := range self.agents {
+ agent.Stop()
}
- atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.atWork, 0)
}
+func (self *worker) isRunning() bool {
+ return atomic.LoadInt32(&self.running) == 1
+}
+
func (self *worker) register(agent Agent) {
self.mu.Lock()
defer self.mu.Unlock()
self.agents[agent] = struct{}{}
agent.SetReturnCh(self.recv)
+ if self.isRunning() {
+ agent.Start()
+ }
}
func (self *worker) unregister(agent Agent) {
@@ -266,7 +252,7 @@ func (self *worker) update() {
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// be automatically eliminated.
- if atomic.LoadInt32(&self.mining) == 0 {
+ if !self.isRunning() {
self.currentMu.Lock()
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
@@ -343,9 +329,6 @@ func (self *worker) wait() {
// push sends a new work task to currently live miner agents.
func (self *worker) push(work *Work) {
- if atomic.LoadInt32(&self.mining) != 1 {
- return
- }
for agent := range self.agents {
atomic.AddInt32(&self.atWork, 1)
if ch := agent.Work(); ch != nil {
@@ -416,8 +399,12 @@ func (self *worker) commitNewWork() {
Extra: self.extra,
Time: big.NewInt(tstamp),
}
- // Only set the coinbase if we are mining (avoid spurious block rewards)
- if atomic.LoadInt32(&self.mining) == 1 {
+ // Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
+ if self.isRunning() {
+ if self.coinbase == (common.Address{}) {
+ log.Error("Refusing to mine without etherbase")
+ return
+ }
header.Coinbase = self.coinbase
}
if err := self.engine.Prepare(self.chain, header); err != nil {
@@ -448,13 +435,6 @@ func (self *worker) commitNewWork() {
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(work.state)
}
- pending, err := self.eth.TxPool().Pending()
- if err != nil {
- log.Error("Failed to fetch pending transactions", "err", err)
- return
- }
- txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
- work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
// compute uncles for the new block.
var (
@@ -478,17 +458,41 @@ func (self *worker) commitNewWork() {
for _, hash := range badUncles {
delete(self.possibleUncles, hash)
}
- // Create the new block to seal with the consensus engine
+
+ // Create an empty block based on temporary copied state for sealing in advance without waiting block
+ // execution finished.
+ if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil {
+ log.Error("Failed to finalize block for temporary sealing", "err", err)
+ } else {
+ // Push empty work in advance without applying pending transaction.
+ // The reason is transactions execution can cost a lot and sealer need to
+ // take advantage of this part time.
+ if self.isRunning() {
+ log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles))
+ self.push(work)
+ }
+ }
+
+ // Fill the block with all available pending transactions.
+ pending, err := self.eth.TxPool().Pending()
+ if err != nil {
+ log.Error("Failed to fetch pending transactions", "err", err)
+ return
+ }
+ txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
+ work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
+
+ // Create the full block to seal with the consensus engine
if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
log.Error("Failed to finalize block for sealing", "err", err)
return
}
// We only care about logging if we're actually mining.
- if atomic.LoadInt32(&self.mining) == 1 {
- log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
+ if self.isRunning() {
+ log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(work.Block.NumberU64() - 1)
+ self.push(work)
}
- self.push(work)
self.updateSnapshot()
}
@@ -511,10 +515,19 @@ func (self *worker) updateSnapshot() {
self.snapshotMu.Lock()
defer self.snapshotMu.Unlock()
+ var uncles []*types.Header
+ self.current.uncles.Each(func(item interface{}) bool {
+ if header, ok := item.(*types.Header); ok {
+ uncles = append(uncles, header)
+ return true
+ }
+ return false
+ })
+
self.snapshotBlock = types.NewBlock(
self.current.header,
self.current.txs,
- nil,
+ uncles,
self.current.receipts,
)
self.snapshotState = self.current.state.Copy()