aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2018-08-22 03:56:54 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-08-22 03:56:54 +0800
commitb2c644ffb5c283a171ddf3889693673939917541 (patch)
tree501d3c337f5fb59404fd8d530eb247cdcc2144eb
parent522cfc68ff496aee4205add982db049dc3092024 (diff)
downloaddexon-b2c644ffb5c283a171ddf3889693673939917541.tar
dexon-b2c644ffb5c283a171ddf3889693673939917541.tar.gz
dexon-b2c644ffb5c283a171ddf3889693673939917541.tar.bz2
dexon-b2c644ffb5c283a171ddf3889693673939917541.tar.lz
dexon-b2c644ffb5c283a171ddf3889693673939917541.tar.xz
dexon-b2c644ffb5c283a171ddf3889693673939917541.tar.zst
dexon-b2c644ffb5c283a171ddf3889693673939917541.zip
cmd, eth, miner: make recommit configurable (#17444)
* cmd, eth, miner: make recommit configurable * cmd, eth, les, miner: polish a bit * miner: filter duplicate sealing work * cmd: remove uncessary conversion * miner: avoid microptimization in favor of cleaner code
-rw-r--r--cmd/geth/main.go1
-rw-r--r--cmd/geth/usage.go1
-rw-r--r--cmd/utils/flags.go22
-rw-r--r--eth/api.go6
-rw-r--r--eth/backend.go8
-rw-r--r--eth/config.go15
-rw-r--r--eth/gen_config.go53
-rw-r--r--internal/web3ext/web3ext.go5
-rw-r--r--les/backend.go2
-rw-r--r--miner/miner.go10
-rw-r--r--miner/worker.go202
-rw-r--r--miner/worker_test.go106
12 files changed, 360 insertions, 71 deletions
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index a06386051..2e87bb820 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -106,6 +106,7 @@ var (
utils.MinerLegacyEtherbaseFlag,
utils.MinerExtraDataFlag,
utils.MinerLegacyExtraDataFlag,
+ utils.MinerRecommitIntervalFlag,
utils.NATFlag,
utils.NoDiscoverFlag,
utils.DiscoveryV5Flag,
diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go
index 9e18f7047..674c5d901 100644
--- a/cmd/geth/usage.go
+++ b/cmd/geth/usage.go
@@ -190,6 +190,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.MinerGasTargetFlag,
utils.MinerEtherbaseFlag,
utils.MinerExtraDataFlag,
+ utils.MinerRecommitIntervalFlag,
},
},
{
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index e3a8cc2ea..731765583 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -335,12 +335,12 @@ var (
MinerGasPriceFlag = BigFlag{
Name: "miner.gasprice",
Usage: "Minimal gas price for mining a transactions",
- Value: eth.DefaultConfig.GasPrice,
+ Value: eth.DefaultConfig.MinerGasPrice,
}
MinerLegacyGasPriceFlag = BigFlag{
Name: "gasprice",
Usage: "Minimal gas price for mining a transactions (deprecated, use --miner.gasprice)",
- Value: eth.DefaultConfig.GasPrice,
+ Value: eth.DefaultConfig.MinerGasPrice,
}
MinerEtherbaseFlag = cli.StringFlag{
Name: "miner.etherbase",
@@ -360,6 +360,11 @@ var (
Name: "extradata",
Usage: "Block extra data set by the miner (default = client version, deprecated, use --miner.extradata)",
}
+ MinerRecommitIntervalFlag = cli.DurationFlag{
+ Name: "miner.recommit",
+ Usage: "Time interval to recreate the block being mined.",
+ Value: 3 * time.Second,
+ }
// Account settings
UnlockedAccountFlag = cli.StringFlag{
Name: "unlock",
@@ -1124,16 +1129,19 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name)
}
if ctx.GlobalIsSet(MinerLegacyExtraDataFlag.Name) {
- cfg.ExtraData = []byte(ctx.GlobalString(MinerLegacyExtraDataFlag.Name))
+ cfg.MinerExtraData = []byte(ctx.GlobalString(MinerLegacyExtraDataFlag.Name))
}
if ctx.GlobalIsSet(MinerExtraDataFlag.Name) {
- cfg.ExtraData = []byte(ctx.GlobalString(MinerExtraDataFlag.Name))
+ cfg.MinerExtraData = []byte(ctx.GlobalString(MinerExtraDataFlag.Name))
}
if ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) {
- cfg.GasPrice = GlobalBig(ctx, MinerLegacyGasPriceFlag.Name)
+ cfg.MinerGasPrice = GlobalBig(ctx, MinerLegacyGasPriceFlag.Name)
}
if ctx.GlobalIsSet(MinerGasPriceFlag.Name) {
- cfg.GasPrice = GlobalBig(ctx, MinerGasPriceFlag.Name)
+ cfg.MinerGasPrice = GlobalBig(ctx, MinerGasPriceFlag.Name)
+ }
+ if ctx.GlobalIsSet(MinerRecommitIntervalFlag.Name) {
+ cfg.MinerRecommit = ctx.Duration(MinerRecommitIntervalFlag.Name)
}
if ctx.GlobalIsSet(VMEnableDebugFlag.Name) {
// TODO(fjl): force-enable this in --dev mode
@@ -1176,7 +1184,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.Genesis = core.DeveloperGenesisBlock(uint64(ctx.GlobalInt(DeveloperPeriodFlag.Name)), developer.Address)
if !ctx.GlobalIsSet(MinerGasPriceFlag.Name) && !ctx.GlobalIsSet(MinerLegacyGasPriceFlag.Name) {
- cfg.GasPrice = big.NewInt(1)
+ cfg.MinerGasPrice = big.NewInt(1)
}
}
// TODO(fjl): move trie cache generations into config
diff --git a/eth/api.go b/eth/api.go
index c1fbcb6d4..4b0ba8edb 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -25,6 +25,7 @@ import (
"math/big"
"os"
"strings"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -160,6 +161,11 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool {
return true
}
+// SetRecommitInterval updates the interval for miner sealing work recommitting.
+func (api *PrivateMinerAPI) SetRecommitInterval(interval int) {
+ api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond)
+}
+
// GetHashrate returns the current hashrate of the miner.
func (api *PrivateMinerAPI) GetHashrate() uint64 {
return api.e.miner.HashRate()
diff --git a/eth/backend.go b/eth/backend.go
index 588b78256..648175acf 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -127,7 +127,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
- gasPrice: config.GasPrice,
+ gasPrice: config.MinerGasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
@@ -167,13 +167,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
return nil, err
}
- eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
- eth.miner.SetExtra(makeExtraData(config.ExtraData))
+ eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit)
+ eth.miner.SetExtra(makeExtraData(config.MinerExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
- gpoParams.Default = config.GasPrice
+ gpoParams.Default = config.MinerGasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
diff --git a/eth/config.go b/eth/config.go
index 0c82f2923..cbd02416b 100644
--- a/eth/config.go
+++ b/eth/config.go
@@ -48,7 +48,7 @@ var DefaultConfig = Config{
DatabaseCache: 768,
TrieCache: 256,
TrieTimeout: 60 * time.Minute,
- GasPrice: big.NewInt(18 * params.Shannon),
+ MinerGasPrice: big.NewInt(18 * params.Shannon),
TxPool: core.DefaultTxPoolConfig,
GPO: gasprice.Config{
@@ -95,11 +95,12 @@ type Config struct {
TrieTimeout time.Duration
// Mining-related options
- Etherbase common.Address `toml:",omitempty"`
- MinerThreads int `toml:",omitempty"`
- MinerNotify []string `toml:",omitempty"`
- ExtraData []byte `toml:",omitempty"`
- GasPrice *big.Int
+ Etherbase common.Address `toml:",omitempty"`
+ MinerThreads int `toml:",omitempty"`
+ MinerNotify []string `toml:",omitempty"`
+ MinerExtraData []byte `toml:",omitempty"`
+ MinerGasPrice *big.Int
+ MinerRecommit time.Duration
// Ethash options
Ethash ethash.Config
@@ -118,5 +119,5 @@ type Config struct {
}
type configMarshaling struct {
- ExtraData hexutil.Bytes
+ MinerExtraData hexutil.Bytes
}
diff --git a/eth/gen_config.go b/eth/gen_config.go
index 4f2e82d94..62556be7e 100644
--- a/eth/gen_config.go
+++ b/eth/gen_config.go
@@ -4,6 +4,7 @@ package eth
import (
"math/big"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -15,20 +16,26 @@ import (
var _ = (*configMarshaling)(nil)
+// MarshalTOML marshals as TOML.
func (c Config) MarshalTOML() (interface{}, error) {
type Config struct {
Genesis *core.Genesis `toml:",omitempty"`
NetworkId uint64
SyncMode downloader.SyncMode
+ NoPruning bool
LightServ int `toml:",omitempty"`
LightPeers int `toml:",omitempty"`
SkipBcVersionCheck bool `toml:"-"`
DatabaseHandles int `toml:"-"`
DatabaseCache int
+ TrieCache int
+ TrieTimeout time.Duration
Etherbase common.Address `toml:",omitempty"`
MinerThreads int `toml:",omitempty"`
- ExtraData hexutil.Bytes `toml:",omitempty"`
- GasPrice *big.Int
+ MinerNotify []string `toml:",omitempty"`
+ MinerExtraData hexutil.Bytes `toml:",omitempty"`
+ MinerGasPrice *big.Int
+ MinerRecommit time.Duration
Ethash ethash.Config
TxPool core.TxPoolConfig
GPO gasprice.Config
@@ -39,15 +46,20 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.Genesis = c.Genesis
enc.NetworkId = c.NetworkId
enc.SyncMode = c.SyncMode
+ enc.NoPruning = c.NoPruning
enc.LightServ = c.LightServ
enc.LightPeers = c.LightPeers
enc.SkipBcVersionCheck = c.SkipBcVersionCheck
enc.DatabaseHandles = c.DatabaseHandles
enc.DatabaseCache = c.DatabaseCache
+ enc.TrieCache = c.TrieCache
+ enc.TrieTimeout = c.TrieTimeout
enc.Etherbase = c.Etherbase
enc.MinerThreads = c.MinerThreads
- enc.ExtraData = c.ExtraData
- enc.GasPrice = c.GasPrice
+ enc.MinerNotify = c.MinerNotify
+ enc.MinerExtraData = c.MinerExtraData
+ enc.MinerGasPrice = c.MinerGasPrice
+ enc.MinerRecommit = c.MinerRecommit
enc.Ethash = c.Ethash
enc.TxPool = c.TxPool
enc.GPO = c.GPO
@@ -56,20 +68,26 @@ func (c Config) MarshalTOML() (interface{}, error) {
return &enc, nil
}
+// UnmarshalTOML unmarshals from TOML.
func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
type Config struct {
Genesis *core.Genesis `toml:",omitempty"`
NetworkId *uint64
SyncMode *downloader.SyncMode
+ NoPruning *bool
LightServ *int `toml:",omitempty"`
LightPeers *int `toml:",omitempty"`
SkipBcVersionCheck *bool `toml:"-"`
DatabaseHandles *int `toml:"-"`
DatabaseCache *int
+ TrieCache *int
+ TrieTimeout *time.Duration
Etherbase *common.Address `toml:",omitempty"`
MinerThreads *int `toml:",omitempty"`
- ExtraData *hexutil.Bytes `toml:",omitempty"`
- GasPrice *big.Int
+ MinerNotify []string `toml:",omitempty"`
+ MinerExtraData *hexutil.Bytes `toml:",omitempty"`
+ MinerGasPrice *big.Int
+ MinerRecommit *time.Duration
Ethash *ethash.Config
TxPool *core.TxPoolConfig
GPO *gasprice.Config
@@ -89,6 +107,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.SyncMode != nil {
c.SyncMode = *dec.SyncMode
}
+ if dec.NoPruning != nil {
+ c.NoPruning = *dec.NoPruning
+ }
if dec.LightServ != nil {
c.LightServ = *dec.LightServ
}
@@ -104,17 +125,29 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.DatabaseCache != nil {
c.DatabaseCache = *dec.DatabaseCache
}
+ if dec.TrieCache != nil {
+ c.TrieCache = *dec.TrieCache
+ }
+ if dec.TrieTimeout != nil {
+ c.TrieTimeout = *dec.TrieTimeout
+ }
if dec.Etherbase != nil {
c.Etherbase = *dec.Etherbase
}
if dec.MinerThreads != nil {
c.MinerThreads = *dec.MinerThreads
}
- if dec.ExtraData != nil {
- c.ExtraData = *dec.ExtraData
+ if dec.MinerNotify != nil {
+ c.MinerNotify = dec.MinerNotify
+ }
+ if dec.MinerExtraData != nil {
+ c.MinerExtraData = *dec.MinerExtraData
+ }
+ if dec.MinerGasPrice != nil {
+ c.MinerGasPrice = dec.MinerGasPrice
}
- if dec.GasPrice != nil {
- c.GasPrice = dec.GasPrice
+ if dec.MinerRecommit != nil {
+ c.MinerRecommit = *dec.MinerRecommit
}
if dec.Ethash != nil {
c.Ethash = *dec.Ethash
diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go
index 000e3728d..f4eb47a12 100644
--- a/internal/web3ext/web3ext.go
+++ b/internal/web3ext/web3ext.go
@@ -520,6 +520,11 @@ web3._extend({
inputFormatter: [web3._extend.utils.fromDecimal]
}),
new web3._extend.Method({
+ name: 'setRecommitInterval',
+ call: 'miner_setRecommitInterval',
+ params: 1,
+ }),
+ new web3._extend.Method({
name: 'getHashrate',
call: 'miner_getHashrate'
}),
diff --git a/les/backend.go b/les/backend.go
index d26c1470f..00025ba63 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -141,7 +141,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
leth.ApiBackend = &LesApiBackend{leth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
- gpoParams.Default = config.GasPrice
+ gpoParams.Default = config.MinerGasPrice
}
leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams)
return leth, nil
diff --git a/miner/miner.go b/miner/miner.go
index e350e456e..c5a0c9d62 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -20,6 +20,7 @@ package miner
import (
"fmt"
"sync/atomic"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
@@ -51,13 +52,13 @@ type Miner struct {
shouldStart int32 // should start indicates whether we should start after sync
}
-func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner {
+func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
- worker: newWorker(config, engine, eth, mux),
+ worker: newWorker(config, engine, eth, mux, recommit),
canStart: 1,
}
go miner.update()
@@ -144,6 +145,11 @@ func (self *Miner) SetExtra(extra []byte) error {
return nil
}
+// SetRecommitInterval sets the interval for sealing work resubmitting.
+func (self *Miner) SetRecommitInterval(interval time.Duration) {
+ self.worker.setRecommitInterval(interval)
+}
+
// Pending returns the currently pending block and associated state.
func (self *Miner) Pending() (*types.Block, *state.StateDB) {
return self.worker.pending()
diff --git a/miner/worker.go b/miner/worker.go
index 23cfaf225..c299ff9dc 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -51,12 +51,27 @@ const (
// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10
+ // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
+ resubmitAdjustChanSize = 10
+
// miningLogAtDepth is the number of confirmations before logging successful mining.
miningLogAtDepth = 5
- // blockRecommitInterval is the time interval to recreate the mining block with
+ // minRecommitInterval is the minimal time interval to recreate the mining block with
+ // any newly arrived transactions.
+ minRecommitInterval = 1 * time.Second
+
+ // maxRecommitInterval is the maximum time interval to recreate the mining block with
// any newly arrived transactions.
- blockRecommitInterval = 3 * time.Second
+ maxRecommitInterval = 15 * time.Second
+
+ // intervalAdjustRatio is the impact a single interval adjustment has on sealing work
+ // resubmitting interval.
+ intervalAdjustRatio = 0.1
+
+ // intervalAdjustBias is applied during the new resubmit interval calculation in favor of
+ // increasing upper limit or decreasing lower limit so that the limit can be reachable.
+ intervalAdjustBias = 200 * 1000.0 * 1000.0
)
// environment is the worker's current environment and holds all of the current state information.
@@ -89,11 +104,18 @@ const (
commitInterruptResubmit
)
+// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct {
interrupt *int32
noempty bool
}
+// intervalAdjust represents a resubmitting interval adjustment.
+type intervalAdjust struct {
+ ratio float64
+ inc bool
+}
+
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
@@ -112,11 +134,13 @@ type worker struct {
chainSideSub event.Subscription
// Channels
- newWorkCh chan *newWorkReq
- taskCh chan *task
- resultCh chan *task
- startCh chan struct{}
- exitCh chan struct{}
+ newWorkCh chan *newWorkReq
+ taskCh chan *task
+ resultCh chan *task
+ startCh chan struct{}
+ exitCh chan struct{}
+ resubmitIntervalCh chan time.Duration
+ resubmitAdjustCh chan *intervalAdjust
current *environment // An environment for current running cycle.
possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
@@ -132,30 +156,34 @@ type worker struct {
// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
+ newTxs int32 // New arrival transaction count since last sealing work submitting.
// Test hooks
- newTaskHook func(*task) // Method to call upon receiving a new sealing task
- skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
- fullTaskHook func() // Method to call before pushing the full sealing task
+ newTaskHook func(*task) // Method to call upon receiving a new sealing task.
+ skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
+ fullTaskHook func() // Method to call before pushing the full sealing task.
+ resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
}
-func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
+func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration) *worker {
worker := &worker{
- config: config,
- engine: engine,
- eth: eth,
- mux: mux,
- chain: eth.BlockChain(),
- possibleUncles: make(map[common.Hash]*types.Block),
- unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
- txsCh: make(chan core.NewTxsEvent, txChanSize),
- chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
- chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
- newWorkCh: make(chan *newWorkReq),
- taskCh: make(chan *task),
- resultCh: make(chan *task, resultQueueSize),
- exitCh: make(chan struct{}),
- startCh: make(chan struct{}, 1),
+ config: config,
+ engine: engine,
+ eth: eth,
+ mux: mux,
+ chain: eth.BlockChain(),
+ possibleUncles: make(map[common.Hash]*types.Block),
+ unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
+ chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
+ chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
+ newWorkCh: make(chan *newWorkReq),
+ taskCh: make(chan *task),
+ resultCh: make(chan *task, resultQueueSize),
+ exitCh: make(chan struct{}),
+ startCh: make(chan struct{}, 1),
+ resubmitIntervalCh: make(chan time.Duration),
+ resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
@@ -163,8 +191,14 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
+ // Sanitize recommit interval if the user-specified one is too short.
+ if recommit < minRecommitInterval {
+ log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
+ recommit = minRecommitInterval
+ }
+
go worker.mainLoop()
- go worker.newWorkLoop()
+ go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
@@ -188,6 +222,11 @@ func (w *worker) setExtra(extra []byte) {
w.extra = extra
}
+// setRecommitInterval updates the interval for miner sealing work recommitting.
+func (w *worker) setRecommitInterval(interval time.Duration) {
+ w.resubmitIntervalCh <- interval
+}
+
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
// return a snapshot to avoid contention on currentMu mutex
@@ -238,35 +277,94 @@ func (w *worker) close() {
}
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
-func (w *worker) newWorkLoop() {
- var interrupt *int32
+func (w *worker) newWorkLoop(recommit time.Duration) {
+ var (
+ interrupt *int32
+ minRecommit = recommit // minimal resubmit interval specified by user.
+ )
timer := time.NewTimer(0)
<-timer.C // discard the initial tick
- // recommit aborts in-flight transaction execution with given signal and resubmits a new one.
- recommit := func(noempty bool, s int32) {
+ // commit aborts in-flight transaction execution with given signal and resubmits a new one.
+ commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty}
- timer.Reset(blockRecommitInterval)
+ timer.Reset(recommit)
+ atomic.StoreInt32(&w.newTxs, 0)
+ }
+ // recalcRecommit recalculates the resubmitting interval upon feedback.
+ recalcRecommit := func(target float64, inc bool) {
+ var (
+ prev = float64(recommit.Nanoseconds())
+ next float64
+ )
+ if inc {
+ next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
+ // Recap if interval is larger than the maximum time interval
+ if next > float64(maxRecommitInterval.Nanoseconds()) {
+ next = float64(maxRecommitInterval.Nanoseconds())
+ }
+ } else {
+ next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
+ // Recap if interval is less than the user specified minimum
+ if next < float64(minRecommit.Nanoseconds()) {
+ next = float64(minRecommit.Nanoseconds())
+ }
+ }
+ recommit = time.Duration(int64(next))
}
for {
select {
case <-w.startCh:
- recommit(false, commitInterruptNewHead)
+ commit(false, commitInterruptNewHead)
case <-w.chainHeadCh:
- recommit(false, commitInterruptNewHead)
+ commit(false, commitInterruptNewHead)
case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) {
- recommit(true, commitInterruptResubmit)
+ // Short circuit if no new transaction arrives.
+ if atomic.LoadInt32(&w.newTxs) == 0 {
+ timer.Reset(recommit)
+ continue
+ }
+ commit(true, commitInterruptResubmit)
+ }
+
+ case interval := <-w.resubmitIntervalCh:
+ // Adjust resubmit interval explicitly by user.
+ if interval < minRecommitInterval {
+ log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
+ interval = minRecommitInterval
+ }
+ log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
+ minRecommit, recommit = interval, interval
+
+ if w.resubmitHook != nil {
+ w.resubmitHook(minRecommit, recommit)
+ }
+
+ case adjust := <-w.resubmitAdjustCh:
+ // Adjust resubmit interval by feedback.
+ if adjust.inc {
+ before := recommit
+ recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
+ log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
+ } else {
+ before := recommit
+ recalcRecommit(float64(minRecommit.Nanoseconds()), false)
+ log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
+ }
+
+ if w.resubmitHook != nil {
+ w.resubmitHook(minRecommit, recommit)
}
case <-w.exitCh:
@@ -339,6 +437,7 @@ func (w *worker) mainLoop() {
w.commitNewWork(nil, false)
}
}
+ atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
// System stopped
case <-w.exitCh:
@@ -383,7 +482,10 @@ func (w *worker) seal(t *task, stop <-chan struct{}) {
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
- var stopCh chan struct{}
+ var (
+ stopCh chan struct{}
+ prev common.Hash
+ )
// interrupt aborts the in-flight sealing task.
interrupt := func() {
@@ -398,8 +500,13 @@ func (w *worker) taskLoop() {
if w.newTaskHook != nil {
w.newTaskHook(task)
}
+ // Reject duplicate sealing work due to resubmitting.
+ if task.block.HashNoNonce() == prev {
+ continue
+ }
interrupt()
stopCh = make(chan struct{})
+ prev = task.block.HashNoNonce()
go w.seal(task, stopCh)
case <-w.exitCh:
interrupt()
@@ -414,11 +521,15 @@ func (w *worker) resultLoop() {
for {
select {
case result := <-w.resultCh:
+ // Short circuit when receiving empty result.
if result == nil {
continue
}
+ // Short circuit when receiving duplicate result caused by resubmitting.
block := result.block
-
+ if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
+ continue
+ }
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, r := range result.receipts {
@@ -568,8 +679,18 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
- // TODO(rjl493456442) give feedback to newWorkLoop to adjust resubmit interval if it is too short.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
+ // Notify resubmit loop to increase resubmitting interval due to too frequent commits.
+ if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
+ ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
+ if ratio < 0.1 {
+ ratio = 0.1
+ }
+ w.resubmitAdjustCh <- &intervalAdjust{
+ ratio: ratio,
+ inc: true,
+ }
+ }
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
@@ -644,6 +765,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
}
+ // Notify resubmit loop to decrease resubmitting interval if current interval is larger
+ // than the user-specified one.
+ if interrupt != nil {
+ w.resubmitAdjustCh <- &intervalAdjust{inc: false}
+ }
return false
}
diff --git a/miner/worker_test.go b/miner/worker_test.go
index 34bb7f5f3..16708c18c 100644
--- a/miner/worker_test.go
+++ b/miner/worker_test.go
@@ -119,7 +119,7 @@ func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine)
backend.txPool.AddLocals(pendingTxs)
- w := newWorker(chainConfig, engine, backend, new(event.TypeMux))
+ w := newWorker(chainConfig, engine, backend, new(event.TypeMux), time.Second)
w.setEtherbase(testBankAddress)
return w, backend
}
@@ -327,7 +327,7 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en
}
}
b.txPool.AddLocals(newTxs)
- time.Sleep(3 * time.Second)
+ time.Sleep(time.Second)
select {
case <-taskCh:
@@ -335,3 +335,105 @@ func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, en
t.Error("new task timeout")
}
}
+
+func TestAdjustIntervalEthash(t *testing.T) {
+ testAdjustInterval(t, ethashChainConfig, ethash.NewFaker())
+}
+
+func TestAdjustIntervalClique(t *testing.T) {
+ testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase()))
+}
+
+func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
+ defer engine.Close()
+
+ w, _ := newTestWorker(t, chainConfig, engine)
+ defer w.close()
+
+ w.skipSealHook = func(task *task) bool {
+ return true
+ }
+ w.fullTaskHook = func() {
+ time.Sleep(100 * time.Millisecond)
+ }
+ var (
+ progress = make(chan struct{}, 10)
+ result = make([]float64, 0, 10)
+ index = 0
+ start = false
+ )
+ w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
+ // Short circuit if interval checking hasn't started.
+ if !start {
+ return
+ }
+ var wantMinInterval, wantRecommitInterval time.Duration
+
+ switch index {
+ case 0:
+ wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second
+ case 1:
+ origin := float64(3 * time.Second.Nanoseconds())
+ estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias)
+ wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(int(estimate))*time.Nanosecond
+ case 2:
+ estimate := result[index-1]
+ min := float64(3 * time.Second.Nanoseconds())
+ estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias)
+ wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(int(estimate))*time.Nanosecond
+ case 3:
+ wantMinInterval, wantRecommitInterval = time.Second, time.Second
+ }
+
+ // Check interval
+ if minInterval != wantMinInterval {
+ t.Errorf("resubmit min interval mismatch want %s has %s", wantMinInterval, minInterval)
+ }
+ if recommitInterval != wantRecommitInterval {
+ t.Errorf("resubmit interval mismatch want %s has %s", wantRecommitInterval, recommitInterval)
+ }
+ result = append(result, float64(recommitInterval.Nanoseconds()))
+ index += 1
+ progress <- struct{}{}
+ }
+ // Ensure worker has finished initialization
+ for {
+ b := w.pendingBlock()
+ if b != nil && b.NumberU64() == 1 {
+ break
+ }
+ }
+
+ w.start()
+
+ time.Sleep(time.Second)
+
+ start = true
+ w.setRecommitInterval(3 * time.Second)
+ select {
+ case <-progress:
+ case <-time.NewTimer(time.Second).C:
+ t.Error("interval reset timeout")
+ }
+
+ w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8}
+ select {
+ case <-progress:
+ case <-time.NewTimer(time.Second).C:
+ t.Error("interval reset timeout")
+ }
+
+ w.resubmitAdjustCh <- &intervalAdjust{inc: false}
+ select {
+ case <-progress:
+ case <-time.NewTimer(time.Second).C:
+ t.Error("interval reset timeout")
+ }
+
+ w.setRecommitInterval(500 * time.Millisecond)
+ select {
+ case <-progress:
+ case <-time.NewTimer(time.Second).C:
+ t.Error("interval reset timeout")
+ }
+}