From 51db5975cc5fb88db6a0dba1826b534fd4df29d7 Mon Sep 17 00:00:00 2001 From: gary rong Date: Fri, 3 Aug 2018 16:33:37 +0800 Subject: consensus/ethash: move remote agent logic to ethash internal (#15853) * consensus/ethash: start remote ggoroutine to handle remote mining * consensus/ethash: expose remote miner api * consensus/ethash: expose submitHashrate api * miner, ethash: push empty block to sealer without waiting execution * consensus, internal: add getHashrate API for ethash * consensus: add three method for consensus interface * miner: expose consensus engine running status to miner * eth, miner: specify etherbase when miner created * miner: commit new work when consensus engine is started * consensus, miner: fix some logics * all: delete useless interfaces * consensus: polish a bit --- miner/worker.go | 125 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 69 insertions(+), 56 deletions(-) (limited to 'miner/worker.go') diff --git a/miner/worker.go b/miner/worker.go index 34329f849..f1194fa18 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -55,9 +55,8 @@ const ( type Agent interface { Work() chan<- *Work SetReturnCh(chan<- *Result) - Stop() Start() - GetHashRate() int64 + Stop() } // Work is the workers current environment and holds @@ -102,7 +101,6 @@ type worker struct { chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -128,11 +126,11 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - mining int32 - atWork int32 + atWork int32 // The number of in-flight consensus engine work. + running int32 // The indicator whether the consensus engine is running or not. } -func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { worker := &worker{ config: config, engine: engine, @@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), - coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } @@ -176,62 +173,51 @@ func (self *worker) setExtra(extra []byte) { } func (self *worker) pending() (*types.Block, *state.StateDB) { - if atomic.LoadInt32(&self.mining) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock, self.snapshotState.Copy() - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block, self.current.state.Copy() + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock, self.snapshotState.Copy() } func (self *worker) pendingBlock() *types.Block { - if atomic.LoadInt32(&self.mining) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock } func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() - - atomic.StoreInt32(&self.mining, 1) - - // spin up agents + atomic.StoreInt32(&self.running, 1) for agent := range self.agents { agent.Start() } } func (self *worker) stop() { - self.wg.Wait() - self.mu.Lock() defer self.mu.Unlock() - if atomic.LoadInt32(&self.mining) == 1 { - for agent := range self.agents { - agent.Stop() - } + + atomic.StoreInt32(&self.running, 0) + for agent := range self.agents { + agent.Stop() } - atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.atWork, 0) } +func (self *worker) isRunning() bool { + return atomic.LoadInt32(&self.running) == 1 +} + func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() self.agents[agent] = struct{}{} agent.SetReturnCh(self.recv) + if self.isRunning() { + agent.Start() + } } func (self *worker) unregister(agent Agent) { @@ -266,7 +252,7 @@ func (self *worker) update() { // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if atomic.LoadInt32(&self.mining) == 0 { + if !self.isRunning() { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { @@ -343,9 +329,6 @@ func (self *worker) wait() { // push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { - if atomic.LoadInt32(&self.mining) != 1 { - return - } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { @@ -416,8 +399,12 @@ func (self *worker) commitNewWork() { Extra: self.extra, Time: big.NewInt(tstamp), } - // Only set the coinbase if we are mining (avoid spurious block rewards) - if atomic.LoadInt32(&self.mining) == 1 { + // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) + if self.isRunning() { + if self.coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } header.Coinbase = self.coinbase } if err := self.engine.Prepare(self.chain, header); err != nil { @@ -448,13 +435,6 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(work.state) } - pending, err := self.eth.TxPool().Pending() - if err != nil { - log.Error("Failed to fetch pending transactions", "err", err) - return - } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - work.commitTransactions(self.mux, txs, self.chain, self.coinbase) // compute uncles for the new block. var ( @@ -478,17 +458,41 @@ func (self *worker) commitNewWork() { for _, hash := range badUncles { delete(self.possibleUncles, hash) } - // Create the new block to seal with the consensus engine + + // Create an empty block based on temporary copied state for sealing in advance without waiting block + // execution finished. + if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil { + log.Error("Failed to finalize block for temporary sealing", "err", err) + } else { + // Push empty work in advance without applying pending transaction. + // The reason is transactions execution can cost a lot and sealer need to + // take advantage of this part time. + if self.isRunning() { + log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) + self.push(work) + } + } + + // Fill the block with all available pending transactions. + pending, err := self.eth.TxPool().Pending() + if err != nil { + log.Error("Failed to fetch pending transactions", "err", err) + return + } + txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) + work.commitTransactions(self.mux, txs, self.chain, self.coinbase) + + // Create the full block to seal with the consensus engine if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. - if atomic.LoadInt32(&self.mining) == 1 { - log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + if self.isRunning() { + log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) + self.push(work) } - self.push(work) self.updateSnapshot() } @@ -511,10 +515,19 @@ func (self *worker) updateSnapshot() { self.snapshotMu.Lock() defer self.snapshotMu.Unlock() + var uncles []*types.Header + self.current.uncles.Each(func(item interface{}) bool { + if header, ok := item.(*types.Header); ok { + uncles = append(uncles, header) + return true + } + return false + }) + self.snapshotBlock = types.NewBlock( self.current.header, self.current.txs, - nil, + uncles, self.current.receipts, ) self.snapshotState = self.current.state.Copy() -- cgit v1.2.3