From 51db5975cc5fb88db6a0dba1826b534fd4df29d7 Mon Sep 17 00:00:00 2001 From: gary rong Date: Fri, 3 Aug 2018 16:33:37 +0800 Subject: consensus/ethash: move remote agent logic to ethash internal (#15853) * consensus/ethash: start remote ggoroutine to handle remote mining * consensus/ethash: expose remote miner api * consensus/ethash: expose submitHashrate api * miner, ethash: push empty block to sealer without waiting execution * consensus, internal: add getHashrate API for ethash * consensus: add three method for consensus interface * miner: expose consensus engine running status to miner * eth, miner: specify etherbase when miner created * miner: commit new work when consensus engine is started * consensus, miner: fix some logics * all: delete useless interfaces * consensus: polish a bit --- consensus/ethash/algorithm_test.go | 1 + consensus/ethash/api.go | 117 +++++++++++++++++++++++++++++ consensus/ethash/ethash.go | 132 ++++++++++++++++++++++++++++---- consensus/ethash/ethash_test.go | 92 +++++++++++++++++++++++ consensus/ethash/sealer.go | 149 ++++++++++++++++++++++++++++++++++++- 5 files changed, 475 insertions(+), 16 deletions(-) create mode 100644 consensus/ethash/api.go (limited to 'consensus/ethash') diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index f0c6465fd..e7625f7c0 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -730,6 +730,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}) + defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) } diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go new file mode 100644 index 000000000..a04ea235d --- /dev/null +++ b/consensus/ethash/api.go @@ -0,0 +1,117 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package ethash + +import ( + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" +) + +var errEthashStopped = errors.New("ethash stopped") + +// API exposes ethash related methods for the RPC interface. +type API struct { + ethash *Ethash // Make sure the mode of ethash is normal. +} + +// GetWork returns a work package for external miner. +// +// The work package consists of 3 strings: +// result[0] - 32 bytes hex encoded current block header pow-hash +// result[1] - 32 bytes hex encoded seed hash used for DAG +// result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +func (api *API) GetWork() ([3]string, error) { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return [3]string{}, errors.New("not supported") + } + + var ( + workCh = make(chan [3]string, 1) + errc = make(chan error, 1) + ) + + select { + case api.ethash.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: + case <-api.ethash.exitCh: + return [3]string{}, errEthashStopped + } + + select { + case work := <-workCh: + return work, nil + case err := <-errc: + return [3]string{}, err + } +} + +// SubmitWork can be used by external miner to submit their POW solution. +// It returns an indication if the work was accepted. +// Note either an invalid solution, a stale work a non-existent work will return false. +func (api *API) SubmitWork(nonce types.BlockNonce, hash, digest common.Hash) bool { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return false + } + + var errc = make(chan error, 1) + + select { + case api.ethash.submitWorkCh <- &mineResult{ + nonce: nonce, + mixDigest: digest, + hash: hash, + errc: errc, + }: + case <-api.ethash.exitCh: + return false + } + + err := <-errc + return err == nil +} + +// SubmitHashrate can be used for remote miners to submit their hash rate. +// This enables the node to report the combined hash rate of all miners +// which submit work through this node. +// +// It accepts the miner hash rate and an identifier which must be unique +// between nodes. +func (api *API) SubmitHashRate(rate hexutil.Uint64, id common.Hash) bool { + if api.ethash.config.PowMode != ModeNormal && api.ethash.config.PowMode != ModeTest { + return false + } + + var done = make(chan struct{}, 1) + + select { + case api.ethash.submitRateCh <- &hashrate{done: done, rate: uint64(rate), id: id}: + case <-api.ethash.exitCh: + return false + } + + // Block until hash rate submitted successfully. + <-done + + return true +} + +// GetHashrate returns the current hashrate for local CPU miner and remote miner. +func (api *API) GetHashrate() uint64 { + return uint64(api.ethash.Hashrate()) +} diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index f79dd6c36..0cb3059b9 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -33,7 +33,9 @@ import ( "unsafe" mmap "github.com/edsrzf/mmap-go" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" @@ -389,6 +391,30 @@ type Config struct { PowMode Mode } +// mineResult wraps the pow solution parameters for the specified block. +type mineResult struct { + nonce types.BlockNonce + mixDigest common.Hash + hash common.Hash + + errc chan error +} + +// hashrate wraps the hash rate submitted by the remote sealer. +type hashrate struct { + id common.Hash + ping time.Time + rate uint64 + + done chan struct{} +} + +// sealWork wraps a seal work package for remote sealer. +type sealWork struct { + errc chan error + res chan [3]string +} + // Ethash is a consensus engine based on proof-of-work implementing the ethash // algorithm. type Ethash struct { @@ -403,15 +429,25 @@ type Ethash struct { update chan struct{} // Notification channel to update mining parameters hashrate metrics.Meter // Meter tracking the average hashrate + // Remote sealer related fields + workCh chan *types.Block // Notification channel to push new work to remote sealer + resultCh chan *types.Block // Channel used by mining threads to return result + fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work + submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result + fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. + submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate + // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration fakeFail uint64 // Block number which fails PoW check even in fake mode fakeDelay time.Duration // Time delay to sleep for before returning from verify - lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + lock sync.Mutex // Ensures thread safety for the in-memory caches and mining fields + closeOnce sync.Once // Ensures exit channel will not be closed twice. + exitCh chan chan error // Notification channel to exiting backend threads } -// New creates a full sized ethash PoW scheme. +// New creates a full sized ethash PoW scheme and starts a background thread for remote mining. func New(config Config) *Ethash { if config.CachesInMem <= 0 { log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) @@ -423,19 +459,43 @@ func New(config Config) *Ethash { if config.DatasetDir != "" && config.DatasetsOnDisk > 0 { log.Info("Disk storage enabled for ethash DAGs", "dir", config.DatasetDir, "count", config.DatasetsOnDisk) } - return &Ethash{ - config: config, - caches: newlru("cache", config.CachesInMem, newCache), - datasets: newlru("dataset", config.DatasetsInMem, newDataset), - update: make(chan struct{}), - hashrate: metrics.NewMeter(), + ethash := &Ethash{ + config: config, + caches: newlru("cache", config.CachesInMem, newCache), + datasets: newlru("dataset", config.DatasetsInMem, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + exitCh: make(chan chan error), } + go ethash.remote() + return ethash } // NewTester creates a small sized ethash PoW scheme useful only for testing // purposes. func NewTester() *Ethash { - return New(Config{CachesInMem: 1, PowMode: ModeTest}) + ethash := &Ethash{ + config: Config{PowMode: ModeTest}, + caches: newlru("cache", 1, newCache), + datasets: newlru("dataset", 1, newDataset), + update: make(chan struct{}), + hashrate: metrics.NewMeter(), + workCh: make(chan *types.Block), + resultCh: make(chan *types.Block), + fetchWorkCh: make(chan *sealWork), + submitWorkCh: make(chan *mineResult), + fetchRateCh: make(chan chan uint64), + submitRateCh: make(chan *hashrate), + exitCh: make(chan chan error), + } + go ethash.remote() + return ethash } // NewFaker creates a ethash consensus engine with a fake PoW scheme that accepts @@ -489,6 +549,22 @@ func NewShared() *Ethash { return &Ethash{shared: sharedEthash} } +// Close closes the exit channel to notify all backend threads exiting. +func (ethash *Ethash) Close() error { + var err error + ethash.closeOnce.Do(func() { + // Short circuit if the exit channel is not allocated. + if ethash.exitCh == nil { + return + } + errc := make(chan error) + ethash.exitCh <- errc + err = <-errc + close(ethash.exitCh) + }) + return err +} + // cache tries to retrieve a verification cache for the specified block number // by first checking against a list of in-memory caches, then against caches // stored on disk, and finally generating one if none can be found. @@ -561,14 +637,44 @@ func (ethash *Ethash) SetThreads(threads int) { // Hashrate implements PoW, returning the measured rate of the search invocations // per second over the last minute. +// Note the returned hashrate includes local hashrate, but also includes the total +// hashrate of all remote miner. func (ethash *Ethash) Hashrate() float64 { - return ethash.hashrate.Rate1() + // Short circuit if we are run the ethash in normal/test mode. + if ethash.config.PowMode != ModeNormal && ethash.config.PowMode != ModeTest { + return ethash.hashrate.Rate1() + } + var res = make(chan uint64, 1) + + select { + case ethash.fetchRateCh <- res: + case <-ethash.exitCh: + // Return local hashrate only if ethash is stopped. + return ethash.hashrate.Rate1() + } + + // Gather total submitted hash rate of remote sealers. + return ethash.hashrate.Rate1() + float64(<-res) } -// APIs implements consensus.Engine, returning the user facing RPC APIs. Currently -// that is empty. +// APIs implements consensus.Engine, returning the user facing RPC APIs. func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { - return nil + // In order to ensure backward compatibility, we exposes ethash RPC APIs + // to both eth and ethash namespaces. + return []rpc.API{ + { + Namespace: "eth", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }, + { + Namespace: "ethash", + Version: "1.0", + Service: &API{ethash}, + Public: true, + }, + } } // SeedHash is the seed to use for generating a verification cache and the mining diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index 31116da43..ccdd30fb0 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -23,7 +23,10 @@ import ( "os" "sync" "testing" + "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" ) @@ -32,6 +35,7 @@ func TestTestMode(t *testing.T) { head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} ethash := NewTester() + defer ethash.Close() block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil) if err != nil { t.Fatalf("failed to seal block: %v", err) @@ -52,6 +56,7 @@ func TestCacheFileEvict(t *testing.T) { } defer os.RemoveAll(tmpdir) e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}) + defer e.Close() workers := 8 epochs := 100 @@ -77,3 +82,90 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { e.VerifySeal(nil, head) } } + +func TestRemoteSealer(t *testing.T) { + ethash := NewTester() + defer ethash.Close() + api := &API{ethash} + if _, err := api.GetWork(); err != errNoMiningWork { + t.Error("expect to return an error indicate there is no mining work") + } + + head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(head) + + // Push new work. + ethash.Seal(nil, block, nil) + + var ( + work [3]string + err error + ) + if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + t.Error("expect to return a mining work has same hash") + } + + if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + t.Error("expect to return false when submit a fake solution") + } + + // Push new block with same block number to replace the original one. + head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} + block = types.NewBlockWithHeader(head) + ethash.Seal(nil, block, nil) + + if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + t.Error("expect to return the latest pushed work") + } + + // Push block with higher block number. + newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} + newBlock := types.NewBlockWithHeader(newHead) + ethash.Seal(nil, newBlock, nil) + + if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + t.Error("expect to return false when submit a stale solution") + } +} + +func TestHashRate(t *testing.T) { + var ( + ethash = NewTester() + api = &API{ethash} + hashrate = []hexutil.Uint64{100, 200, 300} + expect uint64 + ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} + ) + + defer ethash.Close() + + if tot := ethash.Hashrate(); tot != 0 { + t.Error("expect the result should be zero") + } + + for i := 0; i < len(hashrate); i += 1 { + if res := api.SubmitHashRate(hashrate[i], ids[i]); !res { + t.Error("remote miner submit hashrate failed") + } + expect += uint64(hashrate[i]) + } + if tot := ethash.Hashrate(); tot != float64(expect) { + t.Error("expect total hashrate should be same") + } +} + +func TestClosedRemoteSealer(t *testing.T) { + ethash := NewTester() + // Make sure exit channel has been listened + time.Sleep(1 * time.Second) + ethash.Close() + + api := &API{ethash} + if _, err := api.GetWork(); err != errEthashStopped { + t.Error("expect to return an error to indicate ethash is stopped") + } + + if res := api.SubmitHashRate(hexutil.Uint64(100), common.HexToHash("a")); res { + t.Error("expect to return false when submit hashrate to a stopped ethash") + } +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index b5e742d8b..a9449d406 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -18,11 +18,13 @@ package ethash import ( crand "crypto/rand" + "errors" "math" "math/big" "math/rand" "runtime" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -30,6 +32,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +var ( + errNoMiningWork = errors.New("no mining work available yet") + errInvalidSealResult = errors.New("invalid or stale proof-of-work solution") +) + // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { @@ -45,7 +52,6 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) - found := make(chan *types.Block) ethash.lock.Lock() threads := ethash.threads @@ -64,12 +70,16 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop if threads < 0 { threads = 0 // Allows disabling local mining without extra logic around local/remote } + // Push new work to remote sealer + if ethash.workCh != nil { + ethash.workCh <- block + } var pend sync.WaitGroup for i := 0; i < threads; i++ { pend.Add(1) go func(id int, nonce uint64) { defer pend.Done() - ethash.mine(block, id, nonce, abort, found) + ethash.mine(block, id, nonce, abort, ethash.resultCh) }(i, uint64(ethash.rand.Int63())) } // Wait until sealing is terminated or a nonce is found @@ -78,7 +88,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop case <-stop: // Outside abort, stop all miner threads close(abort) - case result = <-found: + case result = <-ethash.resultCh: // One of the threads found a block, abort all others close(abort) case <-ethash.update: @@ -150,3 +160,136 @@ search: // during sealing so it's not unmapped while being read. runtime.KeepAlive(dataset) } + +// remote starts a standalone goroutine to handle remote mining related stuff. +func (ethash *Ethash) remote() { + var ( + works = make(map[common.Hash]*types.Block) + rates = make(map[common.Hash]hashrate) + currentWork *types.Block + ) + + // getWork returns a work package for external miner. + // + // The work package consists of 3 strings: + // result[0], 32 bytes hex encoded current block header pow-hash + // result[1], 32 bytes hex encoded seed hash used for DAG + // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty + getWork := func() ([3]string, error) { + var res [3]string + if currentWork == nil { + return res, errNoMiningWork + } + res[0] = currentWork.HashNoNonce().Hex() + res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() + + // Calculate the "target" to be returned to the external sealer. + n := big.NewInt(1) + n.Lsh(n, 255) + n.Div(n, currentWork.Difficulty()) + n.Lsh(n, 1) + res[2] = common.BytesToHash(n.Bytes()).Hex() + + // Trace the seal work fetched by remote sealer. + works[currentWork.HashNoNonce()] = currentWork + return res, nil + } + + // submitWork verifies the submitted pow solution, returning + // whether the solution was accepted or not (not can be both a bad pow as well as + // any other error, like no pending work or stale mining result). + submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, hash common.Hash) bool { + // Make sure the work submitted is present + block := works[hash] + if block == nil { + log.Info("Work submitted but none pending", "hash", hash) + return false + } + + // Verify the correctness of submitted result. + header := block.Header() + header.Nonce = nonce + header.MixDigest = mixDigest + if err := ethash.VerifySeal(nil, header); err != nil { + log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err) + return false + } + + // Make sure the result channel is created. + if ethash.resultCh == nil { + log.Warn("Ethash result channel is empty, submitted mining result is rejected") + return false + } + + // Solutions seems to be valid, return to the miner and notify acceptance. + select { + case ethash.resultCh <- block.WithSeal(header): + delete(works, hash) + return true + default: + log.Info("Work submitted is stale", "hash", hash) + return false + } + } + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case block := <-ethash.workCh: + if currentWork != nil && block.ParentHash() != currentWork.ParentHash() { + // Start new round mining, throw out all previous work. + works = make(map[common.Hash]*types.Block) + } + // Update current work with new received block. + // Note same work can be past twice, happens when changing CPU threads. + currentWork = block + + case work := <-ethash.fetchWorkCh: + // Return current mining work to remote miner. + miningWork, err := getWork() + if err != nil { + work.errc <- err + } else { + work.res <- miningWork + } + + case result := <-ethash.submitWorkCh: + // Verify submitted PoW solution based on maintained mining blocks. + if submitWork(result.nonce, result.mixDigest, result.hash) { + result.errc <- nil + } else { + result.errc <- errInvalidSealResult + } + + case result := <-ethash.submitRateCh: + // Trace remote sealer's hash rate by submitted value. + rates[result.id] = hashrate{rate: result.rate, ping: time.Now()} + close(result.done) + + case req := <-ethash.fetchRateCh: + // Gather all hash rate submitted by remote sealer. + var total uint64 + for _, rate := range rates { + // this could overflow + total += rate.rate + } + req <- total + + case <-ticker.C: + // Clear stale submitted hash rate. + for id, rate := range rates { + if time.Since(rate.ping) > 10*time.Second { + delete(rates, id) + } + } + + case errc := <-ethash.exitCh: + // Exit remote loop if ethash is closed and return relevant error. + errc <- nil + log.Trace("Ethash remote sealer is exiting") + return + } + } +} -- cgit v1.2.3