aboutsummaryrefslogtreecommitdiffstats
path: root/miner
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2018-08-03 16:33:37 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-08-03 16:33:37 +0800
commit51db5975cc5fb88db6a0dba1826b534fd4df29d7 (patch)
tree930f5a66d52c9bdcecd5596d7630fb48b0982cfd /miner
parent70176cda0eedbb4ec9cde867e8f6cde63efa5a12 (diff)
downloaddexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.tar
dexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.tar.gz
dexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.tar.bz2
dexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.tar.lz
dexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.tar.xz
dexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.tar.zst
dexon-51db5975cc5fb88db6a0dba1826b534fd4df29d7.zip
consensus/ethash: move remote agent logic to ethash internal (#15853)
* consensus/ethash: start remote ggoroutine to handle remote mining * consensus/ethash: expose remote miner api * consensus/ethash: expose submitHashrate api * miner, ethash: push empty block to sealer without waiting execution * consensus, internal: add getHashrate API for ethash * consensus: add three method for consensus interface * miner: expose consensus engine running status to miner * eth, miner: specify etherbase when miner created * miner: commit new work when consensus engine is started * consensus, miner: fix some logics * all: delete useless interfaces * consensus: polish a bit
Diffstat (limited to 'miner')
-rw-r--r--miner/agent.go30
-rw-r--r--miner/miner.go32
-rw-r--r--miner/remote_agent.go202
-rw-r--r--miner/worker.go125
4 files changed, 87 insertions, 302 deletions
diff --git a/miner/agent.go b/miner/agent.go
index e3cebbd2e..95d835bd7 100644
--- a/miner/agent.go
+++ b/miner/agent.go
@@ -18,7 +18,6 @@ package miner
import (
"sync"
-
"sync/atomic"
"github.com/ethereum/go-ethereum/consensus"
@@ -36,24 +35,31 @@ type CpuAgent struct {
chain consensus.ChainReader
engine consensus.Engine
- isMining int32 // isMining indicates whether the agent is currently mining
+ started int32 // started indicates whether the agent is currently started
}
func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent {
- miner := &CpuAgent{
+ agent := &CpuAgent{
chain: chain,
engine: engine,
stop: make(chan struct{}, 1),
workCh: make(chan *Work, 1),
}
- return miner
+ return agent
}
func (self *CpuAgent) Work() chan<- *Work { return self.workCh }
func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }
+func (self *CpuAgent) Start() {
+ if !atomic.CompareAndSwapInt32(&self.started, 0, 1) {
+ return // agent already started
+ }
+ go self.update()
+}
+
func (self *CpuAgent) Stop() {
- if !atomic.CompareAndSwapInt32(&self.isMining, 1, 0) {
+ if !atomic.CompareAndSwapInt32(&self.started, 1, 0) {
return // agent already stopped
}
self.stop <- struct{}{}
@@ -68,13 +74,6 @@ done:
}
}
-func (self *CpuAgent) Start() {
- if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
- return // agent already started
- }
- go self.update()
-}
-
func (self *CpuAgent) update() {
out:
for {
@@ -110,10 +109,3 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
self.returnCh <- nil
}
}
-
-func (self *CpuAgent) GetHashRate() int64 {
- if pow, ok := self.engine.(consensus.PoW); ok {
- return int64(pow.Hashrate())
- }
- return 0
-}
diff --git a/miner/miner.go b/miner/miner.go
index d9256e978..4c5717c8a 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -44,12 +44,9 @@ type Backend interface {
// Miner creates blocks and searches for proof-of-work values.
type Miner struct {
- mux *event.TypeMux
-
- worker *worker
-
+ mux *event.TypeMux
+ worker *worker
coinbase common.Address
- mining int32
eth Backend
engine consensus.Engine
@@ -62,7 +59,7 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con
eth: eth,
mux: mux,
engine: engine,
- worker: newWorker(config, engine, common.Address{}, eth, mux),
+ worker: newWorker(config, engine, eth, mux),
canStart: 1,
}
miner.Register(NewCpuAgent(eth.BlockChain(), engine))
@@ -111,23 +108,16 @@ func (self *Miner) Start(coinbase common.Address) {
log.Info("Network syncing, will start miner afterwards")
return
}
- atomic.StoreInt32(&self.mining, 1)
-
- log.Info("Starting mining operation")
self.worker.start()
self.worker.commitNewWork()
}
func (self *Miner) Stop() {
self.worker.stop()
- atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.shouldStart, 0)
}
func (self *Miner) Register(agent Agent) {
- if self.Mining() {
- agent.Start()
- }
self.worker.register(agent)
}
@@ -136,22 +126,14 @@ func (self *Miner) Unregister(agent Agent) {
}
func (self *Miner) Mining() bool {
- return atomic.LoadInt32(&self.mining) > 0
+ return self.worker.isRunning()
}
-func (self *Miner) HashRate() (tot int64) {
+func (self *Miner) HashRate() uint64 {
if pow, ok := self.engine.(consensus.PoW); ok {
- tot += int64(pow.Hashrate())
- }
- // do we care this might race? is it worth we're rewriting some
- // aspects of the worker/locking up agents so we can get an accurate
- // hashrate?
- for agent := range self.worker.agents {
- if _, ok := agent.(*CpuAgent); !ok {
- tot += agent.GetHashRate()
- }
+ return uint64(pow.Hashrate())
}
- return
+ return 0
}
func (self *Miner) SetExtra(extra []byte) error {
diff --git a/miner/remote_agent.go b/miner/remote_agent.go
deleted file mode 100644
index 287e7530c..000000000
--- a/miner/remote_agent.go
+++ /dev/null
@@ -1,202 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package miner
-
-import (
- "errors"
- "math/big"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus"
- "github.com/ethereum/go-ethereum/consensus/ethash"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/log"
-)
-
-type hashrate struct {
- ping time.Time
- rate uint64
-}
-
-type RemoteAgent struct {
- mu sync.Mutex
-
- quitCh chan struct{}
- workCh chan *Work
- returnCh chan<- *Result
-
- chain consensus.ChainReader
- engine consensus.Engine
- currentWork *Work
- work map[common.Hash]*Work
-
- hashrateMu sync.RWMutex
- hashrate map[common.Hash]hashrate
-
- running int32 // running indicates whether the agent is active. Call atomically
-}
-
-func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent {
- return &RemoteAgent{
- chain: chain,
- engine: engine,
- work: make(map[common.Hash]*Work),
- hashrate: make(map[common.Hash]hashrate),
- }
-}
-
-func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) {
- a.hashrateMu.Lock()
- defer a.hashrateMu.Unlock()
-
- a.hashrate[id] = hashrate{time.Now(), rate}
-}
-
-func (a *RemoteAgent) Work() chan<- *Work {
- return a.workCh
-}
-
-func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) {
- a.returnCh = returnCh
-}
-
-func (a *RemoteAgent) Start() {
- if !atomic.CompareAndSwapInt32(&a.running, 0, 1) {
- return
- }
- a.quitCh = make(chan struct{})
- a.workCh = make(chan *Work, 1)
- go a.loop(a.workCh, a.quitCh)
-}
-
-func (a *RemoteAgent) Stop() {
- if !atomic.CompareAndSwapInt32(&a.running, 1, 0) {
- return
- }
- close(a.quitCh)
- close(a.workCh)
-}
-
-// GetHashRate returns the accumulated hashrate of all identifier combined
-func (a *RemoteAgent) GetHashRate() (tot int64) {
- a.hashrateMu.RLock()
- defer a.hashrateMu.RUnlock()
-
- // this could overflow
- for _, hashrate := range a.hashrate {
- tot += int64(hashrate.rate)
- }
- return
-}
-
-func (a *RemoteAgent) GetWork() ([3]string, error) {
- a.mu.Lock()
- defer a.mu.Unlock()
-
- var res [3]string
-
- if a.currentWork != nil {
- block := a.currentWork.Block
-
- res[0] = block.HashNoNonce().Hex()
- seedHash := ethash.SeedHash(block.NumberU64())
- res[1] = common.BytesToHash(seedHash).Hex()
- // Calculate the "target" to be returned to the external miner
- n := big.NewInt(1)
- n.Lsh(n, 255)
- n.Div(n, block.Difficulty())
- n.Lsh(n, 1)
- res[2] = common.BytesToHash(n.Bytes()).Hex()
-
- a.work[block.HashNoNonce()] = a.currentWork
- return res, nil
- }
- return res, errors.New("No work available yet, don't panic.")
-}
-
-// SubmitWork tries to inject a pow solution into the remote agent, returning
-// whether the solution was accepted or not (not can be both a bad pow as well as
-// any other error, like no work pending).
-func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool {
- a.mu.Lock()
- defer a.mu.Unlock()
-
- // Make sure the work submitted is present
- work := a.work[hash]
- if work == nil {
- log.Info("Work submitted but none pending", "hash", hash)
- return false
- }
- // Make sure the Engine solutions is indeed valid
- result := work.Block.Header()
- result.Nonce = nonce
- result.MixDigest = mixDigest
-
- if err := a.engine.VerifySeal(a.chain, result); err != nil {
- log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)
- return false
- }
- block := work.Block.WithSeal(result)
-
- // Solutions seems to be valid, return to the miner and notify acceptance
- a.returnCh <- &Result{work, block}
- delete(a.work, hash)
-
- return true
-}
-
-// loop monitors mining events on the work and quit channels, updating the internal
-// state of the remote miner until a termination is requested.
-//
-// Note, the reason the work and quit channels are passed as parameters is because
-// RemoteAgent.Start() constantly recreates these channels, so the loop code cannot
-// assume data stability in these member fields.
-func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) {
- ticker := time.NewTicker(5 * time.Second)
- defer ticker.Stop()
-
- for {
- select {
- case <-quitCh:
- return
- case work := <-workCh:
- a.mu.Lock()
- a.currentWork = work
- a.mu.Unlock()
- case <-ticker.C:
- // cleanup
- a.mu.Lock()
- for hash, work := range a.work {
- if time.Since(work.createdAt) > 7*(12*time.Second) {
- delete(a.work, hash)
- }
- }
- a.mu.Unlock()
-
- a.hashrateMu.Lock()
- for id, hashrate := range a.hashrate {
- if time.Since(hashrate.ping) > 10*time.Second {
- delete(a.hashrate, id)
- }
- }
- a.hashrateMu.Unlock()
- }
- }
-}
diff --git a/miner/worker.go b/miner/worker.go
index 34329f849..f1194fa18 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -55,9 +55,8 @@ const (
type Agent interface {
Work() chan<- *Work
SetReturnCh(chan<- *Result)
- Stop()
Start()
- GetHashRate() int64
+ Stop()
}
// Work is the workers current environment and holds
@@ -102,7 +101,6 @@ type worker struct {
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
- wg sync.WaitGroup
agents map[Agent]struct{}
recv chan *Result
@@ -128,11 +126,11 @@ type worker struct {
unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
// atomic status counters
- mining int32
- atWork int32
+ atWork int32 // The number of in-flight consensus engine work.
+ running int32 // The indicator whether the consensus engine is running or not.
}
-func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
+func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
worker := &worker{
config: config,
engine: engine,
@@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
chain: eth.BlockChain(),
proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
- coinbase: coinbase,
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
}
@@ -176,62 +173,51 @@ func (self *worker) setExtra(extra []byte) {
}
func (self *worker) pending() (*types.Block, *state.StateDB) {
- if atomic.LoadInt32(&self.mining) == 0 {
- // return a snapshot to avoid contention on currentMu mutex
- self.snapshotMu.RLock()
- defer self.snapshotMu.RUnlock()
- return self.snapshotBlock, self.snapshotState.Copy()
- }
-
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- return self.current.Block, self.current.state.Copy()
+ // return a snapshot to avoid contention on currentMu mutex
+ self.snapshotMu.RLock()
+ defer self.snapshotMu.RUnlock()
+ return self.snapshotBlock, self.snapshotState.Copy()
}
func (self *worker) pendingBlock() *types.Block {
- if atomic.LoadInt32(&self.mining) == 0 {
- // return a snapshot to avoid contention on currentMu mutex
- self.snapshotMu.RLock()
- defer self.snapshotMu.RUnlock()
- return self.snapshotBlock
- }
-
- self.currentMu.Lock()
- defer self.currentMu.Unlock()
- return self.current.Block
+ // return a snapshot to avoid contention on currentMu mutex
+ self.snapshotMu.RLock()
+ defer self.snapshotMu.RUnlock()
+ return self.snapshotBlock
}
func (self *worker) start() {
self.mu.Lock()
defer self.mu.Unlock()
-
- atomic.StoreInt32(&self.mining, 1)
-
- // spin up agents
+ atomic.StoreInt32(&self.running, 1)
for agent := range self.agents {
agent.Start()
}
}
func (self *worker) stop() {
- self.wg.Wait()
-
self.mu.Lock()
defer self.mu.Unlock()
- if atomic.LoadInt32(&self.mining) == 1 {
- for agent := range self.agents {
- agent.Stop()
- }
+
+ atomic.StoreInt32(&self.running, 0)
+ for agent := range self.agents {
+ agent.Stop()
}
- atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.atWork, 0)
}
+func (self *worker) isRunning() bool {
+ return atomic.LoadInt32(&self.running) == 1
+}
+
func (self *worker) register(agent Agent) {
self.mu.Lock()
defer self.mu.Unlock()
self.agents[agent] = struct{}{}
agent.SetReturnCh(self.recv)
+ if self.isRunning() {
+ agent.Start()
+ }
}
func (self *worker) unregister(agent Agent) {
@@ -266,7 +252,7 @@ func (self *worker) update() {
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// be automatically eliminated.
- if atomic.LoadInt32(&self.mining) == 0 {
+ if !self.isRunning() {
self.currentMu.Lock()
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
@@ -343,9 +329,6 @@ func (self *worker) wait() {
// push sends a new work task to currently live miner agents.
func (self *worker) push(work *Work) {
- if atomic.LoadInt32(&self.mining) != 1 {
- return
- }
for agent := range self.agents {
atomic.AddInt32(&self.atWork, 1)
if ch := agent.Work(); ch != nil {
@@ -416,8 +399,12 @@ func (self *worker) commitNewWork() {
Extra: self.extra,
Time: big.NewInt(tstamp),
}
- // Only set the coinbase if we are mining (avoid spurious block rewards)
- if atomic.LoadInt32(&self.mining) == 1 {
+ // Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
+ if self.isRunning() {
+ if self.coinbase == (common.Address{}) {
+ log.Error("Refusing to mine without etherbase")
+ return
+ }
header.Coinbase = self.coinbase
}
if err := self.engine.Prepare(self.chain, header); err != nil {
@@ -448,13 +435,6 @@ func (self *worker) commitNewWork() {
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(work.state)
}
- pending, err := self.eth.TxPool().Pending()
- if err != nil {
- log.Error("Failed to fetch pending transactions", "err", err)
- return
- }
- txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
- work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
// compute uncles for the new block.
var (
@@ -478,17 +458,41 @@ func (self *worker) commitNewWork() {
for _, hash := range badUncles {
delete(self.possibleUncles, hash)
}
- // Create the new block to seal with the consensus engine
+
+ // Create an empty block based on temporary copied state for sealing in advance without waiting block
+ // execution finished.
+ if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil {
+ log.Error("Failed to finalize block for temporary sealing", "err", err)
+ } else {
+ // Push empty work in advance without applying pending transaction.
+ // The reason is transactions execution can cost a lot and sealer need to
+ // take advantage of this part time.
+ if self.isRunning() {
+ log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles))
+ self.push(work)
+ }
+ }
+
+ // Fill the block with all available pending transactions.
+ pending, err := self.eth.TxPool().Pending()
+ if err != nil {
+ log.Error("Failed to fetch pending transactions", "err", err)
+ return
+ }
+ txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
+ work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
+
+ // Create the full block to seal with the consensus engine
if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
log.Error("Failed to finalize block for sealing", "err", err)
return
}
// We only care about logging if we're actually mining.
- if atomic.LoadInt32(&self.mining) == 1 {
- log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
+ if self.isRunning() {
+ log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(work.Block.NumberU64() - 1)
+ self.push(work)
}
- self.push(work)
self.updateSnapshot()
}
@@ -511,10 +515,19 @@ func (self *worker) updateSnapshot() {
self.snapshotMu.Lock()
defer self.snapshotMu.Unlock()
+ var uncles []*types.Header
+ self.current.uncles.Each(func(item interface{}) bool {
+ if header, ok := item.(*types.Header); ok {
+ uncles = append(uncles, header)
+ return true
+ }
+ return false
+ })
+
self.snapshotBlock = types.NewBlock(
self.current.header,
self.current.txs,
- nil,
+ uncles,
self.current.receipts,
)
self.snapshotState = self.current.state.Copy()