From 51db5975cc5fb88db6a0dba1826b534fd4df29d7 Mon Sep 17 00:00:00 2001 From: gary rong Date: Fri, 3 Aug 2018 16:33:37 +0800 Subject: consensus/ethash: move remote agent logic to ethash internal (#15853) * consensus/ethash: start remote ggoroutine to handle remote mining * consensus/ethash: expose remote miner api * consensus/ethash: expose submitHashrate api * miner, ethash: push empty block to sealer without waiting execution * consensus, internal: add getHashrate API for ethash * consensus: add three method for consensus interface * miner: expose consensus engine running status to miner * eth, miner: specify etherbase when miner created * miner: commit new work when consensus engine is started * consensus, miner: fix some logics * all: delete useless interfaces * consensus: polish a bit --- miner/agent.go | 30 +++----- miner/miner.go | 32 ++------ miner/remote_agent.go | 202 -------------------------------------------------- miner/worker.go | 125 +++++++++++++++++-------------- 4 files changed, 87 insertions(+), 302 deletions(-) delete mode 100644 miner/remote_agent.go (limited to 'miner') diff --git a/miner/agent.go b/miner/agent.go index e3cebbd2e..95d835bd7 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -18,7 +18,6 @@ package miner import ( "sync" - "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -36,24 +35,31 @@ type CpuAgent struct { chain consensus.ChainReader engine consensus.Engine - isMining int32 // isMining indicates whether the agent is currently mining + started int32 // started indicates whether the agent is currently started } func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { - miner := &CpuAgent{ + agent := &CpuAgent{ chain: chain, engine: engine, stop: make(chan struct{}, 1), workCh: make(chan *Work, 1), } - return miner + return agent } func (self *CpuAgent) Work() chan<- *Work { return self.workCh } func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch } +func (self *CpuAgent) Start() { + if !atomic.CompareAndSwapInt32(&self.started, 0, 1) { + return // agent already started + } + go self.update() +} + func (self *CpuAgent) Stop() { - if !atomic.CompareAndSwapInt32(&self.isMining, 1, 0) { + if !atomic.CompareAndSwapInt32(&self.started, 1, 0) { return // agent already stopped } self.stop <- struct{}{} @@ -68,13 +74,6 @@ done: } } -func (self *CpuAgent) Start() { - if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) { - return // agent already started - } - go self.update() -} - func (self *CpuAgent) update() { out: for { @@ -110,10 +109,3 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { self.returnCh <- nil } } - -func (self *CpuAgent) GetHashRate() int64 { - if pow, ok := self.engine.(consensus.PoW); ok { - return int64(pow.Hashrate()) - } - return 0 -} diff --git a/miner/miner.go b/miner/miner.go index d9256e978..4c5717c8a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -44,12 +44,9 @@ type Backend interface { // Miner creates blocks and searches for proof-of-work values. type Miner struct { - mux *event.TypeMux - - worker *worker - + mux *event.TypeMux + worker *worker coinbase common.Address - mining int32 eth Backend engine consensus.Engine @@ -62,7 +59,7 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con eth: eth, mux: mux, engine: engine, - worker: newWorker(config, engine, common.Address{}, eth, mux), + worker: newWorker(config, engine, eth, mux), canStart: 1, } miner.Register(NewCpuAgent(eth.BlockChain(), engine)) @@ -111,23 +108,16 @@ func (self *Miner) Start(coinbase common.Address) { log.Info("Network syncing, will start miner afterwards") return } - atomic.StoreInt32(&self.mining, 1) - - log.Info("Starting mining operation") self.worker.start() self.worker.commitNewWork() } func (self *Miner) Stop() { self.worker.stop() - atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.shouldStart, 0) } func (self *Miner) Register(agent Agent) { - if self.Mining() { - agent.Start() - } self.worker.register(agent) } @@ -136,22 +126,14 @@ func (self *Miner) Unregister(agent Agent) { } func (self *Miner) Mining() bool { - return atomic.LoadInt32(&self.mining) > 0 + return self.worker.isRunning() } -func (self *Miner) HashRate() (tot int64) { +func (self *Miner) HashRate() uint64 { if pow, ok := self.engine.(consensus.PoW); ok { - tot += int64(pow.Hashrate()) - } - // do we care this might race? is it worth we're rewriting some - // aspects of the worker/locking up agents so we can get an accurate - // hashrate? - for agent := range self.worker.agents { - if _, ok := agent.(*CpuAgent); !ok { - tot += agent.GetHashRate() - } + return uint64(pow.Hashrate()) } - return + return 0 } func (self *Miner) SetExtra(extra []byte) error { diff --git a/miner/remote_agent.go b/miner/remote_agent.go deleted file mode 100644 index 287e7530c..000000000 --- a/miner/remote_agent.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package miner - -import ( - "errors" - "math/big" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/ethash" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -type hashrate struct { - ping time.Time - rate uint64 -} - -type RemoteAgent struct { - mu sync.Mutex - - quitCh chan struct{} - workCh chan *Work - returnCh chan<- *Result - - chain consensus.ChainReader - engine consensus.Engine - currentWork *Work - work map[common.Hash]*Work - - hashrateMu sync.RWMutex - hashrate map[common.Hash]hashrate - - running int32 // running indicates whether the agent is active. Call atomically -} - -func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent { - return &RemoteAgent{ - chain: chain, - engine: engine, - work: make(map[common.Hash]*Work), - hashrate: make(map[common.Hash]hashrate), - } -} - -func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) { - a.hashrateMu.Lock() - defer a.hashrateMu.Unlock() - - a.hashrate[id] = hashrate{time.Now(), rate} -} - -func (a *RemoteAgent) Work() chan<- *Work { - return a.workCh -} - -func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) { - a.returnCh = returnCh -} - -func (a *RemoteAgent) Start() { - if !atomic.CompareAndSwapInt32(&a.running, 0, 1) { - return - } - a.quitCh = make(chan struct{}) - a.workCh = make(chan *Work, 1) - go a.loop(a.workCh, a.quitCh) -} - -func (a *RemoteAgent) Stop() { - if !atomic.CompareAndSwapInt32(&a.running, 1, 0) { - return - } - close(a.quitCh) - close(a.workCh) -} - -// GetHashRate returns the accumulated hashrate of all identifier combined -func (a *RemoteAgent) GetHashRate() (tot int64) { - a.hashrateMu.RLock() - defer a.hashrateMu.RUnlock() - - // this could overflow - for _, hashrate := range a.hashrate { - tot += int64(hashrate.rate) - } - return -} - -func (a *RemoteAgent) GetWork() ([3]string, error) { - a.mu.Lock() - defer a.mu.Unlock() - - var res [3]string - - if a.currentWork != nil { - block := a.currentWork.Block - - res[0] = block.HashNoNonce().Hex() - seedHash := ethash.SeedHash(block.NumberU64()) - res[1] = common.BytesToHash(seedHash).Hex() - // Calculate the "target" to be returned to the external miner - n := big.NewInt(1) - n.Lsh(n, 255) - n.Div(n, block.Difficulty()) - n.Lsh(n, 1) - res[2] = common.BytesToHash(n.Bytes()).Hex() - - a.work[block.HashNoNonce()] = a.currentWork - return res, nil - } - return res, errors.New("No work available yet, don't panic.") -} - -// SubmitWork tries to inject a pow solution into the remote agent, returning -// whether the solution was accepted or not (not can be both a bad pow as well as -// any other error, like no work pending). -func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { - a.mu.Lock() - defer a.mu.Unlock() - - // Make sure the work submitted is present - work := a.work[hash] - if work == nil { - log.Info("Work submitted but none pending", "hash", hash) - return false - } - // Make sure the Engine solutions is indeed valid - result := work.Block.Header() - result.Nonce = nonce - result.MixDigest = mixDigest - - if err := a.engine.VerifySeal(a.chain, result); err != nil { - log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) - return false - } - block := work.Block.WithSeal(result) - - // Solutions seems to be valid, return to the miner and notify acceptance - a.returnCh <- &Result{work, block} - delete(a.work, hash) - - return true -} - -// loop monitors mining events on the work and quit channels, updating the internal -// state of the remote miner until a termination is requested. -// -// Note, the reason the work and quit channels are passed as parameters is because -// RemoteAgent.Start() constantly recreates these channels, so the loop code cannot -// assume data stability in these member fields. -func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-quitCh: - return - case work := <-workCh: - a.mu.Lock() - a.currentWork = work - a.mu.Unlock() - case <-ticker.C: - // cleanup - a.mu.Lock() - for hash, work := range a.work { - if time.Since(work.createdAt) > 7*(12*time.Second) { - delete(a.work, hash) - } - } - a.mu.Unlock() - - a.hashrateMu.Lock() - for id, hashrate := range a.hashrate { - if time.Since(hashrate.ping) > 10*time.Second { - delete(a.hashrate, id) - } - } - a.hashrateMu.Unlock() - } - } -} diff --git a/miner/worker.go b/miner/worker.go index 34329f849..f1194fa18 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -55,9 +55,8 @@ const ( type Agent interface { Work() chan<- *Work SetReturnCh(chan<- *Result) - Stop() Start() - GetHashRate() int64 + Stop() } // Work is the workers current environment and holds @@ -102,7 +101,6 @@ type worker struct { chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -128,11 +126,11 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - mining int32 - atWork int32 + atWork int32 // The number of in-flight consensus engine work. + running int32 // The indicator whether the consensus engine is running or not. } -func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { worker := &worker{ config: config, engine: engine, @@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), - coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } @@ -176,62 +173,51 @@ func (self *worker) setExtra(extra []byte) { } func (self *worker) pending() (*types.Block, *state.StateDB) { - if atomic.LoadInt32(&self.mining) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock, self.snapshotState.Copy() - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block, self.current.state.Copy() + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock, self.snapshotState.Copy() } func (self *worker) pendingBlock() *types.Block { - if atomic.LoadInt32(&self.mining) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock } func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() - - atomic.StoreInt32(&self.mining, 1) - - // spin up agents + atomic.StoreInt32(&self.running, 1) for agent := range self.agents { agent.Start() } } func (self *worker) stop() { - self.wg.Wait() - self.mu.Lock() defer self.mu.Unlock() - if atomic.LoadInt32(&self.mining) == 1 { - for agent := range self.agents { - agent.Stop() - } + + atomic.StoreInt32(&self.running, 0) + for agent := range self.agents { + agent.Stop() } - atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.atWork, 0) } +func (self *worker) isRunning() bool { + return atomic.LoadInt32(&self.running) == 1 +} + func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() self.agents[agent] = struct{}{} agent.SetReturnCh(self.recv) + if self.isRunning() { + agent.Start() + } } func (self *worker) unregister(agent Agent) { @@ -266,7 +252,7 @@ func (self *worker) update() { // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if atomic.LoadInt32(&self.mining) == 0 { + if !self.isRunning() { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { @@ -343,9 +329,6 @@ func (self *worker) wait() { // push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { - if atomic.LoadInt32(&self.mining) != 1 { - return - } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { @@ -416,8 +399,12 @@ func (self *worker) commitNewWork() { Extra: self.extra, Time: big.NewInt(tstamp), } - // Only set the coinbase if we are mining (avoid spurious block rewards) - if atomic.LoadInt32(&self.mining) == 1 { + // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) + if self.isRunning() { + if self.coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } header.Coinbase = self.coinbase } if err := self.engine.Prepare(self.chain, header); err != nil { @@ -448,13 +435,6 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(work.state) } - pending, err := self.eth.TxPool().Pending() - if err != nil { - log.Error("Failed to fetch pending transactions", "err", err) - return - } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - work.commitTransactions(self.mux, txs, self.chain, self.coinbase) // compute uncles for the new block. var ( @@ -478,17 +458,41 @@ func (self *worker) commitNewWork() { for _, hash := range badUncles { delete(self.possibleUncles, hash) } - // Create the new block to seal with the consensus engine + + // Create an empty block based on temporary copied state for sealing in advance without waiting block + // execution finished. + if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil { + log.Error("Failed to finalize block for temporary sealing", "err", err) + } else { + // Push empty work in advance without applying pending transaction. + // The reason is transactions execution can cost a lot and sealer need to + // take advantage of this part time. + if self.isRunning() { + log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) + self.push(work) + } + } + + // Fill the block with all available pending transactions. + pending, err := self.eth.TxPool().Pending() + if err != nil { + log.Error("Failed to fetch pending transactions", "err", err) + return + } + txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) + work.commitTransactions(self.mux, txs, self.chain, self.coinbase) + + // Create the full block to seal with the consensus engine if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. - if atomic.LoadInt32(&self.mining) == 1 { - log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + if self.isRunning() { + log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) + self.push(work) } - self.push(work) self.updateSnapshot() } @@ -511,10 +515,19 @@ func (self *worker) updateSnapshot() { self.snapshotMu.Lock() defer self.snapshotMu.Unlock() + var uncles []*types.Header + self.current.uncles.Each(func(item interface{}) bool { + if header, ok := item.(*types.Header); ok { + uncles = append(uncles, header) + return true + } + return false + }) + self.snapshotBlock = types.NewBlock( self.current.header, self.current.txs, - nil, + uncles, self.current.receipts, ) self.snapshotState = self.current.state.Copy() -- cgit v1.2.3