From 51db5975cc5fb88db6a0dba1826b534fd4df29d7 Mon Sep 17 00:00:00 2001 From: gary rong Date: Fri, 3 Aug 2018 16:33:37 +0800 Subject: consensus/ethash: move remote agent logic to ethash internal (#15853) * consensus/ethash: start remote ggoroutine to handle remote mining * consensus/ethash: expose remote miner api * consensus/ethash: expose submitHashrate api * miner, ethash: push empty block to sealer without waiting execution * consensus, internal: add getHashrate API for ethash * consensus: add three method for consensus interface * miner: expose consensus engine running status to miner * eth, miner: specify etherbase when miner created * miner: commit new work when consensus engine is started * consensus, miner: fix some logics * all: delete useless interfaces * consensus: polish a bit --- cmd/geth/consolecmd_test.go | 2 +- consensus/clique/clique.go | 5 + consensus/consensus.go | 3 + consensus/ethash/algorithm_test.go | 1 + consensus/ethash/api.go | 117 +++++++++++++++++++++ consensus/ethash/ethash.go | 132 +++++++++++++++++++++--- consensus/ethash/ethash_test.go | 92 +++++++++++++++++ consensus/ethash/sealer.go | 149 ++++++++++++++++++++++++++- eth/api.go | 46 +-------- eth/backend.go | 3 +- internal/web3ext/web3ext.go | 29 ++++++ les/backend.go | 1 + miner/agent.go | 30 ++---- miner/miner.go | 32 ++---- miner/remote_agent.go | 202 ------------------------------------- miner/worker.go | 125 +++++++++++++---------- 16 files changed, 608 insertions(+), 361 deletions(-) create mode 100644 consensus/ethash/api.go delete mode 100644 miner/remote_agent.go diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index 8d8b10f8f..34ba87702 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -31,7 +31,7 @@ import ( ) const ( - ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 shh:1.0 txpool:1.0 web3:1.0" + ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 ethash:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 shh:1.0 txpool:1.0 web3:1.0" httpAPIs = "eth:1.0 net:1.0 rpc:1.0 web3:1.0" ) diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 8968f500f..59bb3d40b 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -672,6 +672,11 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { return new(big.Int).Set(diffNoTurn) } +// Close implements consensus.Engine. It's a noop for clique as there is are no background threads. +func (c *Clique) Close() error { + return nil +} + // APIs implements consensus.Engine, returning the user facing RPC API to allow // controlling the signer voting. func (c *Clique) APIs(chain consensus.ChainReader) []rpc.API { diff --git a/consensus/consensus.go b/consensus/consensus.go index 5774af1a7..827175444 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -96,6 +96,9 @@ type Engine interface { // APIs returns the RPC APIs this consensus engine provides. APIs(chain ChainReader) []rpc.API + + // Close terminates any background threads maintained by the consensus engine. + Close() error } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index f0c6465fd..e7625f7c0 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -730,6 +730,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}) + defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) } diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go new file mode 100644 index 000000000..a04ea235d --- /dev/null +++ b/consensus/ethash/api.go @@ -0,0 +1,117 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package ethash + +import ( + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" +) + +var errEthashStopped = errors.New("ethash stopped") + +// API exposes ethash related methods for the RPC interface. +type API struct { + ethash *Ethash // Make sure the mode of ethash is normal. +} + +// GetWork returns a work package for external miner. +// +// The work package consists of 3 strings: +// result[0] - 32 bytes hex encoded current block header pow-hash +// result[1] - 32 bytes hex encoded seed hash used for DAG +// result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +func (api *API) GetWork() ([3]string, error) { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return [3]string{}, errors.New("not supported") + } + + var ( + workCh = make(chan [3]string, 1) + errc = make(chan error, 1) + ) + + select { + case api.ethash.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: + case <-api.ethash.exitCh: + return [3]string{}, errEthashStopped + } + + select { + case work := <-workCh: + return work, nil + case err := <-errc: + return [3]string{}, err + } +} + +// SubmitWork can be used by external miner to submit their POW solution. +// It returns an indication if the work was accepted. +// Note either an invalid solution, a stale work a non-existent work will return false. +func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) bool { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return false + } + + var errc = make(chan error, 1) + + select { + case api.ethash.submitWorkCh <- &mineResult{ + nonce: nonce, + mixDigest: digest, + hash: hash, + errc: errc, + }: + case <-api.ethash.exitCh: + return false + } + + err := <-errc + return err == nil +} + +// SubmitHashrate can be used for remote miners to submit their hash rate. +// This enables the node to report the combined hash rate of all miners +// which submit work through this node. +// +// It accepts the miner hash rate and an identifier which must be unique +// between nodes. +func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return false + } + + var done = make(chan struct{}, 1) + + select { + case api.ethash.submitRateCh <- &hashrate{done: done, rate: uint64(rate), id: id}: + case <-api.ethash.exitCh: + return false + } + + // Block until hash rate submitted successfully. + <-done + + return true +} + +// GetHashrate returns the current hashrate for local CPU miner and remote miner. +func (api *API) GetHashrate() uint64 { + return uint64(api.ethash.Hashrate()) +} diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index f79dd6c36..0cb3059b9 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -33,7 +33,9 @@ import ( "unsafe" mmap "github.com/edsrzf/mmap-go" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" @@ -389,6 +391,30 @@ type Config struct { PowMode Mode } +// mineResult wraps the pow solution parameters for the specified block. +type mineResult struct { + nonce types.BlockNonce + mixDigest common.Hash + hash common.Hash + + errc chan error +} + +// hashrate wraps the hash rate submitted by the remote sealer. +type hashrate struct { + id common.Hash + ping time.Time + rate uint64 + + done chan struct{} +} + +// sealWork wraps a seal work package for remote sealer. +type sealWork struct { + errc chan error + res chan [3]string +} + // Ethash is a consensus engine based on proof-of-work implementing the ethash // algorithm. type Ethash struct { @@ -403,15 +429,25 @@ type Ethash struct { update chan struct{} // Notification channel to update mining parameters hashrate metrics.Meter // Meter tracking the average hashrate + // Remote sealer related fields + workCh chan *types.Block // Notification channel to push new work to remote sealer + resultCh chan *types.Block // Channel used by mining threads to return result + fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work + submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result + fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. + submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate + // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration fakeFail uint64 // Block number which fails PoW check even in fake mode fakeDelay time.Duration // Time delay to sleep for before returning from verify - lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + closeOnce sync.Once // Ensures exit channel will not be closed twice. + exitCh chan chan error // Notification channel to exiting backend threads } -// New creates a full sized ethash PoW scheme. +// New creates a full sized ethash PoW scheme and starts a background thread for remote mining. func New(config Config) *Ethash { if config.CachesInMem <= 0 { log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) @@ -423,19 +459,43 @@ func New(config Config) *Ethash { if config.DatasetDir != "" && config.DatasetsOnDisk > 0 { log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) } - return &Ethash{ - config: config, - caches: newlru("cache", config.CachesInMem, newCache), - datasets: newlru("dataset", config.DatasetsInMem, newDataset), - update: make(chan struct{}), - hashrate: metrics.NewMeter(), + ethash := &Ethash{ + config: config, + caches: newlru("cache", config.CachesInMem, newCache), + datasets: newlru("dataset", config.DatasetsInMem, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + exitCh: make(chan chan error), } + go ethash.remote() + return ethash } // NewTester creates a small sized ethash PoW scheme useful only for testing // purposes. func NewTester() *Ethash { - return New(Config{CachesInMem: 1, PowMode: ModeTest}) + ethash := &Ethash{ + config: Config{PowMode: ModeTest}, + caches: newlru("cache", 1, newCache), + datasets: newlru("dataset", 1, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + exitCh: make(chan chan error), + } + go ethash.remote() + return ethash } // NewFaker creates a ethash consensus engine with a fake PoW scheme that accepts @@ -489,6 +549,22 @@ func NewShared() *Ethash { return &Ethash{shared: sharedEthash} } +// Close closes the exit channel to notify all backend threads exiting. +func (ethash *Ethash) Close() error { + var err error + ethash.closeOnce.Do(func() { + // Short circuit if the exit channel is not allocated. + if ethash.exitCh == nil { + return + } + errc := make(chan error) + ethash.exitCh <- errc + err = <-errc + close(ethash.exitCh) + }) + return err +} + // cache tries to retrieve a verification cache for the specified block number // by first checking against a list of in-memory caches, then against caches // stored on disk, and finally generating one if none can be found. @@ -561,14 +637,44 @@ func (ethash *Ethash) SetThreads(threads int) { // Hashrate implements PoW, returning the measured rate of the search invocations // per second over the last minute. +// Note the returned hashrate includes local hashrate, but also includes the total +// hashrate of all remote miner. func (ethash *Ethash) Hashrate() float64 { - return ethash.hashrate.Rate1() + // Short circuit if we are run the ethash in normal/test mode. + if ethash.config.PowMode != ModeNormal && ethash.config.PowMode != ModeTest { + return ethash.hashrate.Rate1() + } + var res = make(chan uint64, 1) + + select { + case ethash.fetchRateCh <- res: + case <-ethash.exitCh: + // Return local hashrate only if ethash is stopped. + return ethash.hashrate.Rate1() + } + + // Gather total submitted hash rate of remote sealers. + return ethash.hashrate.Rate1() + float64(<-res) } -// APIs implements consensus.Engine, returning the user facing RPC APIs. Currently -// that is empty. +// APIs implements consensus.Engine, returning the user facing RPC APIs. func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { - return nil + // In order to ensure backward compatibility, we exposes ethash RPC APIs + // to both eth and ethash namespaces. + return []rpc.API{ + { + Namespace: "eth", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }, + { + Namespace: "ethash", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }, + } } // SeedHash is the seed to use for generating a verification cache and the mining diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index 31116da43..ccdd30fb0 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -23,7 +23,10 @@ import ( "os" "sync" "testing" + "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" ) @@ -32,6 +35,7 @@ func TestTestMode(t *testing.T) { head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} ethash := NewTester() + defer ethash.Close() block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil) if err != nil { t.Fatalf("failed to seal block: %v", err) @@ -52,6 +56,7 @@ func TestCacheFileEvict(t *testing.T) { } defer os.RemoveAll(tmpdir) e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}) + defer e.Close() workers := 8 epochs := 100 @@ -77,3 +82,90 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { e.VerifySeal(nil, head) } } + +func TestRemoteSealer(t *testing.T) { + ethash := NewTester() + defer ethash.Close() + api := &API{ethash} + if _, err := api.GetWork(); err != errNoMiningWork { + t.Error("expect to return an error indicate there is no mining work") + } + + head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(head) + + // Push new work. + ethash.Seal(nil, block, nil) + + var ( + work [3]string + err error + ) + if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + t.Error("expect to return a mining work has same hash") + } + + if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + t.Error("expect to return false when submit a fake solution") + } + + // Push new block with same block number to replace the original one. + head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} + block = types.NewBlockWithHeader(head) + ethash.Seal(nil, block, nil) + + if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + t.Error("expect to return the latest pushed work") + } + + // Push block with higher block number. + newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} + newBlock := types.NewBlockWithHeader(newHead) + ethash.Seal(nil, newBlock, nil) + + if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + t.Error("expect to return false when submit a stale solution") + } +} + +func TestHashRate(t *testing.T) { + var ( + ethash = NewTester() + api = &API{ethash} + hashrate = []hexutil.Uint64{100, 200, 300} + expect uint64 + ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} + ) + + defer ethash.Close() + + if tot := ethash.Hashrate(); tot != 0 { + t.Error("expect the result should be zero") + } + + for i := 0; i < len(hashrate); i += 1 { + if res := api.SubmitHashRate(hashrate[i], ids[i]); !res { + t.Error("remote miner submit hashrate failed") + } + expect += uint64(hashrate[i]) + } + if tot := ethash.Hashrate(); tot != float64(expect) { + t.Error("expect total hashrate should be same") + } +} + +func TestClosedRemoteSealer(t *testing.T) { + ethash := NewTester() + // Make sure exit channel has been listened + time.Sleep(1 * time.Second) + ethash.Close() + + api := &API{ethash} + if _, err := api.GetWork(); err != errEthashStopped { + t.Error("expect to return an error to indicate ethash is stopped") + } + + if res := api.SubmitHashRate(hexutil.Uint64(100), common.HexToHash("a")); res { + t.Error("expect to return false when submit hashrate to a stopped ethash") + } +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index b5e742d8b..a9449d406 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -18,11 +18,13 @@ package ethash import ( crand "crypto/rand" + "errors" "math" "math/big" "math/rand" "runtime" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -30,6 +32,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +var ( + errNoMiningWork = errors.New("no mining work available yet") + errInvalidSealResult = errors.New("invalid or stale proof-of-work solution") +) + // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { @@ -45,7 +52,6 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) - found := make(chan *types.Block) ethash.lock.Lock() threads := ethash.threads @@ -64,12 +70,16 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if threads < 0 { threads = 0 // Allows disabling local mining without extra logic around local/remote } + // Push new work to remote sealer + if ethash.workCh != nil { + ethash.workCh <- block + } var pend sync.WaitGroup for i := 0; i < threads; i++ { pend.Add(1) go func(id int, nonce uint64) { defer pend.Done() - ethash.mine(block, id, nonce, abort, found) + ethash.mine(block, id, nonce, abort, ethash.resultCh) }(i, uint64(ethash.rand.Int63())) } // Wait until sealing is terminated or a nonce is found @@ -78,7 +88,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop case <-stop: // Outside abort, stop all miner threads close(abort) - case result = <-found: + case result = <-ethash.resultCh: // One of the threads found a block, abort all others close(abort) case <-ethash.update: @@ -150,3 +160,136 @@ search: // during sealing so it's not unmapped while being read. runtime.KeepAlive(dataset) } + +// remote starts a standalone goroutine to handle remote mining related stuff. +func (ethash *Ethash) remote() { + var ( + works = make(map[common.Hash]*types.Block) + rates = make(map[common.Hash]hashrate) + currentWork *types.Block + ) + + // getWork returns a work package for external miner. + // + // The work package consists of 3 strings: + // result[0], 32 bytes hex encoded current block header pow-hash + // result[1], 32 bytes hex encoded seed hash used for DAG + // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty + getWork := func() ([3]string, error) { + var res [3]string + if currentWork == nil { + return res, errNoMiningWork + } + res[0] = currentWork.HashNoNonce().Hex() + res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() + + // Calculate the "target" to be returned to the external sealer. + n := big.NewInt(1) + n.Lsh(n, 255) + n.Div(n, currentWork.Difficulty()) + n.Lsh(n, 1) + res[2] = common.BytesToHash(n.Bytes()).Hex() + + // Trace the seal work fetched by remote sealer. + works[currentWork.HashNoNonce()] = currentWork + return res, nil + } + + // submitWork verifies the submitted pow solution, returning + // whether the solution was accepted or not (not can be both a bad pow as well as + // any other error, like no pending work or stale mining result). + submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, hash common.Hash) bool { + // Make sure the work submitted is present + block := works[hash] + if block == nil { + log.Info("Work submitted but none pending", "hash", hash) + return false + } + + // Verify the correctness of submitted result. + header := block.Header() + header.Nonce = nonce + header.MixDigest = mixDigest + if err := ethash.VerifySeal(nil, header); err != nil { + log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) + return false + } + + // Make sure the result channel is created. + if ethash.resultCh == nil { + log.Warn("Ethash result channel is empty, submitted mining result is rejected") + return false + } + + // Solutions seems to be valid, return to the miner and notify acceptance. + select { + case ethash.resultCh <- block.WithSeal(header): + delete(works, hash) + return true + default: + log.Info("Work submitted is stale", "hash", hash) + return false + } + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case block := <-ethash.workCh: + if currentWork != nil && block.ParentHash() != currentWork.ParentHash() { + // Start new round mining, throw out all previous work. + works = make(map[common.Hash]*types.Block) + } + // Update current work with new received block. + // Note same work can be past twice, happens when changing CPU threads. + currentWork = block + + case work := <-ethash.fetchWorkCh: + // Return current mining work to remote miner. + miningWork, err := getWork() + if err != nil { + work.errc <- err + } else { + work.res <- miningWork + } + + case result := <-ethash.submitWorkCh: + // Verify submitted PoW solution based on maintained mining blocks. + if submitWork(result.nonce, result.mixDigest, result.hash) { + result.errc <- nil + } else { + result.errc <- errInvalidSealResult + } + + case result := <-ethash.submitRateCh: + // Trace remote sealer's hash rate by submitted value. + rates[result.id] = hashrate{rate: result.rate, ping: time.Now()} + close(result.done) + + case req := <-ethash.fetchRateCh: + // Gather all hash rate submitted by remote sealer. + var total uint64 + for _, rate := range rates { + // this could overflow + total += rate.rate + } + req <- total + + case <-ticker.C: + // Clear stale submitted hash rate. + for id, rate := range rates { + if time.Since(rate.ping) > 10*time.Second { + delete(rates, id) + } + } + + case errc := <-ethash.exitCh: + // Exit remote loop if ethash is closed and return relevant error. + errc <- nil + log.Trace("Ethash remote sealer is exiting") + return + } + } +} diff --git a/eth/api.go b/eth/api.go index 0b6da456f..c1fbcb6d4 100644 --- a/eth/api.go +++ b/eth/api.go @@ -34,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" @@ -70,16 +69,12 @@ func (api *PublicEthereumAPI) Hashrate() hexutil.Uint64 { // PublicMinerAPI provides an API to control the miner. // It offers only methods that operate on data that pose no security risk when it is publicly accessible. type PublicMinerAPI struct { - e *Ethereum - agent *miner.RemoteAgent + e *Ethereum } // NewPublicMinerAPI create a new PublicMinerAPI instance. func NewPublicMinerAPI(e *Ethereum) *PublicMinerAPI { - agent := miner.NewRemoteAgent(e.BlockChain(), e.Engine()) - e.Miner().Register(agent) - - return &PublicMinerAPI{e, agent} + return &PublicMinerAPI{e} } // Mining returns an indication if this node is currently mining. @@ -87,37 +82,6 @@ func (api *PublicMinerAPI) Mining() bool { return api.e.IsMining() } -// SubmitWork can be used by external miner to submit their POW solution. It returns an indication if the work was -// accepted. Note, this is not an indication if the provided work was valid! -func (api *PublicMinerAPI) SubmitWork(nonce types.BlockNonce, solution, digest common.Hash) bool { - return api.agent.SubmitWork(nonce, digest, solution) -} - -// GetWork returns a work package for external miner. The work package consists of 3 strings -// result[0], 32 bytes hex encoded current block header pow-hash -// result[1], 32 bytes hex encoded seed hash used for DAG -// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty -func (api *PublicMinerAPI) GetWork() ([3]string, error) { - if !api.e.IsMining() { - if err := api.e.StartMining(false); err != nil { - return [3]string{}, err - } - } - work, err := api.agent.GetWork() - if err != nil { - return work, fmt.Errorf("mining not ready: %v", err) - } - return work, nil -} - -// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined -// hash rate of all miners which submit work through this node. It accepts the miner hash rate and an identifier which -// must be unique between nodes. -func (api *PublicMinerAPI) SubmitHashrate(hashrate hexutil.Uint64, id common.Hash) bool { - api.agent.SubmitHashrate(id, uint64(hashrate)) - return true -} - // PrivateMinerAPI provides private RPC methods to control the miner. // These methods can be abused by external users and must be considered insecure for use by untrusted users. type PrivateMinerAPI struct { @@ -132,7 +96,8 @@ func NewPrivateMinerAPI(e *Ethereum) *PrivateMinerAPI { // Start the miner with the given number of threads. If threads is nil the number // of workers started is equal to the number of logical CPUs that are usable by // this process. If mining is already running, this method adjust the number of -// threads allowed to use. +// threads allowed to use and updates the minimum price required by the transaction +// pool. func (api *PrivateMinerAPI) Start(threads *int) error { // Set the number of threads if the seal engine supports it if threads == nil { @@ -153,7 +118,6 @@ func (api *PrivateMinerAPI) Start(threads *int) error { api.e.lock.RLock() price := api.e.gasPrice api.e.lock.RUnlock() - api.e.txPool.SetGasPrice(price) return api.e.StartMining(true) } @@ -198,7 +162,7 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool { // GetHashrate returns the current hashrate of the miner. func (api *PrivateMinerAPI) GetHashrate() uint64 { - return uint64(api.e.miner.HashRate()) + return api.e.miner.HashRate() } // PrivateAdminAPI is the collection of Ethereum full node-related APIs diff --git a/eth/backend.go b/eth/backend.go index a18abdfb5..32946a0ab 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -166,6 +166,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } + eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) @@ -411,6 +412,7 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { func (s *Ethereum) Stop() error { s.bloomIndexer.Close() s.blockchain.Stop() + s.engine.Close() s.protocolManager.Stop() if s.lesServer != nil { s.lesServer.Stop() @@ -421,6 +423,5 @@ func (s *Ethereum) Stop() error { s.chainDb.Close() close(s.shutdownChan) - return nil } diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 89ebceec7..c2e0cd3f5 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -21,6 +21,7 @@ var Modules = map[string]string{ "admin": Admin_JS, "chequebook": Chequebook_JS, "clique": Clique_JS, + "ethash": Ethash_JS, "debug": Debug_JS, "eth": Eth_JS, "miner": Miner_JS, @@ -109,6 +110,34 @@ web3._extend({ }); ` +const Ethash_JS = ` +web3._extend({ + property: 'ethash', + methods: [ + new web3._extend.Method({ + name: 'getWork', + call: 'ethash_getWork', + params: 0 + }), + new web3._extend.Method({ + name: 'getHashrate', + call: 'ethash_getHashrate', + params: 0 + }), + new web3._extend.Method({ + name: 'submitWork', + call: 'ethash_submitWork', + params: 3, + }), + new web3._extend.Method({ + name: 'submitHashRate', + call: 'ethash_submitHashRate', + params: 2, + }), + ] +}); +` + const Admin_JS = ` web3._extend({ property: 'admin', diff --git a/les/backend.go b/les/backend.go index 35f67f29f..952d92cc2 100644 --- a/les/backend.go +++ b/les/backend.go @@ -248,6 +248,7 @@ func (s *LightEthereum) Stop() error { s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() + s.engine.Close() s.eventMux.Stop() diff --git a/miner/agent.go b/miner/agent.go index e3cebbd2e..95d835bd7 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -18,7 +18,6 @@ package miner import ( "sync" - "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -36,24 +35,31 @@ type CpuAgent struct { chain consensus.ChainReader engine consensus.Engine - isMining int32 // isMining indicates whether the agent is currently mining + started int32 // started indicates whether the agent is currently started } func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent { - miner := &CpuAgent{ + agent := &CpuAgent{ chain: chain, engine: engine, stop: make(chan struct{}, 1), workCh: make(chan *Work, 1), } - return miner + return agent } func (self *CpuAgent) Work() chan<- *Work { return self.workCh } func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch } +func (self *CpuAgent) Start() { + if !atomic.CompareAndSwapInt32(&self.started, 0, 1) { + return // agent already started + } + go self.update() +} + func (self *CpuAgent) Stop() { - if !atomic.CompareAndSwapInt32(&self.isMining, 1, 0) { + if !atomic.CompareAndSwapInt32(&self.started, 1, 0) { return // agent already stopped } self.stop <- struct{}{} @@ -68,13 +74,6 @@ done: } } -func (self *CpuAgent) Start() { - if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) { - return // agent already started - } - go self.update() -} - func (self *CpuAgent) update() { out: for { @@ -110,10 +109,3 @@ func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { self.returnCh <- nil } } - -func (self *CpuAgent) GetHashRate() int64 { - if pow, ok := self.engine.(consensus.PoW); ok { - return int64(pow.Hashrate()) - } - return 0 -} diff --git a/miner/miner.go b/miner/miner.go index d9256e978..4c5717c8a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -44,12 +44,9 @@ type Backend interface { // Miner creates blocks and searches for proof-of-work values. type Miner struct { - mux *event.TypeMux - - worker *worker - + mux *event.TypeMux + worker *worker coinbase common.Address - mining int32 eth Backend engine consensus.Engine @@ -62,7 +59,7 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con eth: eth, mux: mux, engine: engine, - worker: newWorker(config, engine, common.Address{}, eth, mux), + worker: newWorker(config, engine, eth, mux), canStart: 1, } miner.Register(NewCpuAgent(eth.BlockChain(), engine)) @@ -111,23 +108,16 @@ func (self *Miner) Start(coinbase common.Address) { log.Info("Network syncing, will start miner afterwards") return } - atomic.StoreInt32(&self.mining, 1) - - log.Info("Starting mining operation") self.worker.start() self.worker.commitNewWork() } func (self *Miner) Stop() { self.worker.stop() - atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.shouldStart, 0) } func (self *Miner) Register(agent Agent) { - if self.Mining() { - agent.Start() - } self.worker.register(agent) } @@ -136,22 +126,14 @@ func (self *Miner) Unregister(agent Agent) { } func (self *Miner) Mining() bool { - return atomic.LoadInt32(&self.mining) > 0 + return self.worker.isRunning() } -func (self *Miner) HashRate() (tot int64) { +func (self *Miner) HashRate() uint64 { if pow, ok := self.engine.(consensus.PoW); ok { - tot += int64(pow.Hashrate()) - } - // do we care this might race? is it worth we're rewriting some - // aspects of the worker/locking up agents so we can get an accurate - // hashrate? - for agent := range self.worker.agents { - if _, ok := agent.(*CpuAgent); !ok { - tot += agent.GetHashRate() - } + return uint64(pow.Hashrate()) } - return + return 0 } func (self *Miner) SetExtra(extra []byte) error { diff --git a/miner/remote_agent.go b/miner/remote_agent.go deleted file mode 100644 index 287e7530c..000000000 --- a/miner/remote_agent.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package miner - -import ( - "errors" - "math/big" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/ethash" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -type hashrate struct { - ping time.Time - rate uint64 -} - -type RemoteAgent struct { - mu sync.Mutex - - quitCh chan struct{} - workCh chan *Work - returnCh chan<- *Result - - chain consensus.ChainReader - engine consensus.Engine - currentWork *Work - work map[common.Hash]*Work - - hashrateMu sync.RWMutex - hashrate map[common.Hash]hashrate - - running int32 // running indicates whether the agent is active. Call atomically -} - -func NewRemoteAgent(chain consensus.ChainReader, engine consensus.Engine) *RemoteAgent { - return &RemoteAgent{ - chain: chain, - engine: engine, - work: make(map[common.Hash]*Work), - hashrate: make(map[common.Hash]hashrate), - } -} - -func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) { - a.hashrateMu.Lock() - defer a.hashrateMu.Unlock() - - a.hashrate[id] = hashrate{time.Now(), rate} -} - -func (a *RemoteAgent) Work() chan<- *Work { - return a.workCh -} - -func (a *RemoteAgent) SetReturnCh(returnCh chan<- *Result) { - a.returnCh = returnCh -} - -func (a *RemoteAgent) Start() { - if !atomic.CompareAndSwapInt32(&a.running, 0, 1) { - return - } - a.quitCh = make(chan struct{}) - a.workCh = make(chan *Work, 1) - go a.loop(a.workCh, a.quitCh) -} - -func (a *RemoteAgent) Stop() { - if !atomic.CompareAndSwapInt32(&a.running, 1, 0) { - return - } - close(a.quitCh) - close(a.workCh) -} - -// GetHashRate returns the accumulated hashrate of all identifier combined -func (a *RemoteAgent) GetHashRate() (tot int64) { - a.hashrateMu.RLock() - defer a.hashrateMu.RUnlock() - - // this could overflow - for _, hashrate := range a.hashrate { - tot += int64(hashrate.rate) - } - return -} - -func (a *RemoteAgent) GetWork() ([3]string, error) { - a.mu.Lock() - defer a.mu.Unlock() - - var res [3]string - - if a.currentWork != nil { - block := a.currentWork.Block - - res[0] = block.HashNoNonce().Hex() - seedHash := ethash.SeedHash(block.NumberU64()) - res[1] = common.BytesToHash(seedHash).Hex() - // Calculate the "target" to be returned to the external miner - n := big.NewInt(1) - n.Lsh(n, 255) - n.Div(n, block.Difficulty()) - n.Lsh(n, 1) - res[2] = common.BytesToHash(n.Bytes()).Hex() - - a.work[block.HashNoNonce()] = a.currentWork - return res, nil - } - return res, errors.New("No work available yet, don't panic.") -} - -// SubmitWork tries to inject a pow solution into the remote agent, returning -// whether the solution was accepted or not (not can be both a bad pow as well as -// any other error, like no work pending). -func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool { - a.mu.Lock() - defer a.mu.Unlock() - - // Make sure the work submitted is present - work := a.work[hash] - if work == nil { - log.Info("Work submitted but none pending", "hash", hash) - return false - } - // Make sure the Engine solutions is indeed valid - result := work.Block.Header() - result.Nonce = nonce - result.MixDigest = mixDigest - - if err := a.engine.VerifySeal(a.chain, result); err != nil { - log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) - return false - } - block := work.Block.WithSeal(result) - - // Solutions seems to be valid, return to the miner and notify acceptance - a.returnCh <- &Result{work, block} - delete(a.work, hash) - - return true -} - -// loop monitors mining events on the work and quit channels, updating the internal -// state of the remote miner until a termination is requested. -// -// Note, the reason the work and quit channels are passed as parameters is because -// RemoteAgent.Start() constantly recreates these channels, so the loop code cannot -// assume data stability in these member fields. -func (a *RemoteAgent) loop(workCh chan *Work, quitCh chan struct{}) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-quitCh: - return - case work := <-workCh: - a.mu.Lock() - a.currentWork = work - a.mu.Unlock() - case <-ticker.C: - // cleanup - a.mu.Lock() - for hash, work := range a.work { - if time.Since(work.createdAt) > 7*(12*time.Second) { - delete(a.work, hash) - } - } - a.mu.Unlock() - - a.hashrateMu.Lock() - for id, hashrate := range a.hashrate { - if time.Since(hashrate.ping) > 10*time.Second { - delete(a.hashrate, id) - } - } - a.hashrateMu.Unlock() - } - } -} diff --git a/miner/worker.go b/miner/worker.go index 34329f849..f1194fa18 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -55,9 +55,8 @@ const ( type Agent interface { Work() chan<- *Work SetReturnCh(chan<- *Result) - Stop() Start() - GetHashRate() int64 + Stop() } // Work is the workers current environment and holds @@ -102,7 +101,6 @@ type worker struct { chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -128,11 +126,11 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - mining int32 - atWork int32 + atWork int32 // The number of in-flight consensus engine work. + running int32 // The indicator whether the consensus engine is running or not. } -func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker { worker := &worker{ config: config, engine: engine, @@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), - coinbase: coinbase, agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } @@ -176,62 +173,51 @@ func (self *worker) setExtra(extra []byte) { } func (self *worker) pending() (*types.Block, *state.StateDB) { - if atomic.LoadInt32(&self.mining) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock, self.snapshotState.Copy() - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block, self.current.state.Copy() + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock, self.snapshotState.Copy() } func (self *worker) pendingBlock() *types.Block { - if atomic.LoadInt32(&self.mining) == 0 { - // return a snapshot to avoid contention on currentMu mutex - self.snapshotMu.RLock() - defer self.snapshotMu.RUnlock() - return self.snapshotBlock - } - - self.currentMu.Lock() - defer self.currentMu.Unlock() - return self.current.Block + // return a snapshot to avoid contention on currentMu mutex + self.snapshotMu.RLock() + defer self.snapshotMu.RUnlock() + return self.snapshotBlock } func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() - - atomic.StoreInt32(&self.mining, 1) - - // spin up agents + atomic.StoreInt32(&self.running, 1) for agent := range self.agents { agent.Start() } } func (self *worker) stop() { - self.wg.Wait() - self.mu.Lock() defer self.mu.Unlock() - if atomic.LoadInt32(&self.mining) == 1 { - for agent := range self.agents { - agent.Stop() - } + + atomic.StoreInt32(&self.running, 0) + for agent := range self.agents { + agent.Stop() } - atomic.StoreInt32(&self.mining, 0) atomic.StoreInt32(&self.atWork, 0) } +func (self *worker) isRunning() bool { + return atomic.LoadInt32(&self.running) == 1 +} + func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() self.agents[agent] = struct{}{} agent.SetReturnCh(self.recv) + if self.isRunning() { + agent.Start() + } } func (self *worker) unregister(agent Agent) { @@ -266,7 +252,7 @@ func (self *worker) update() { // Note all transactions received may not be continuous with transactions // already included in the current mining block. These transactions will // be automatically eliminated. - if atomic.LoadInt32(&self.mining) == 0 { + if !self.isRunning() { self.currentMu.Lock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { @@ -343,9 +329,6 @@ func (self *worker) wait() { // push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { - if atomic.LoadInt32(&self.mining) != 1 { - return - } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { @@ -416,8 +399,12 @@ func (self *worker) commitNewWork() { Extra: self.extra, Time: big.NewInt(tstamp), } - // Only set the coinbase if we are mining (avoid spurious block rewards) - if atomic.LoadInt32(&self.mining) == 1 { + // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) + if self.isRunning() { + if self.coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } header.Coinbase = self.coinbase } if err := self.engine.Prepare(self.chain, header); err != nil { @@ -448,13 +435,6 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { misc.ApplyDAOHardFork(work.state) } - pending, err := self.eth.TxPool().Pending() - if err != nil { - log.Error("Failed to fetch pending transactions", "err", err) - return - } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - work.commitTransactions(self.mux, txs, self.chain, self.coinbase) // compute uncles for the new block. var ( @@ -478,17 +458,41 @@ func (self *worker) commitNewWork() { for _, hash := range badUncles { delete(self.possibleUncles, hash) } - // Create the new block to seal with the consensus engine + + // Create an empty block based on temporary copied state for sealing in advance without waiting block + // execution finished. + if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil { + log.Error("Failed to finalize block for temporary sealing", "err", err) + } else { + // Push empty work in advance without applying pending transaction. + // The reason is transactions execution can cost a lot and sealer need to + // take advantage of this part time. + if self.isRunning() { + log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) + self.push(work) + } + } + + // Fill the block with all available pending transactions. + pending, err := self.eth.TxPool().Pending() + if err != nil { + log.Error("Failed to fetch pending transactions", "err", err) + return + } + txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) + work.commitTransactions(self.mux, txs, self.chain, self.coinbase) + + // Create the full block to seal with the consensus engine if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. - if atomic.LoadInt32(&self.mining) == 1 { - log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + if self.isRunning() { + log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) + self.push(work) } - self.push(work) self.updateSnapshot() } @@ -511,10 +515,19 @@ func (self *worker) updateSnapshot() { self.snapshotMu.Lock() defer self.snapshotMu.Unlock() + var uncles []*types.Header + self.current.uncles.Each(func(item interface{}) bool { + if header, ok := item.(*types.Header); ok { + uncles = append(uncles, header) + return true + } + return false + }) + self.snapshotBlock = types.NewBlock( self.current.header, self.current.txs, - nil, + uncles, self.current.receipts, ) self.snapshotState = self.current.state.Copy() -- cgit v1.2.3