diff options
author | gary rong <garyrong0905@gmail.com> | 2018-08-23 21:02:57 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-08-23 21:02:57 +0800 |
commit | 40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2 (patch) | |
tree | 70d5c37338def1e3246a3117ac84abca793c476a | |
parent | c3f7e3be3b60df3edd168e80aa89ee2992932b0d (diff) | |
download | dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.gz dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.bz2 dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.lz dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.xz dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.zst dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.zip |
miner: fix state commit, track old work packages too (#17490)
* miner: commit state which is relative with sealing result
* consensus, core, miner, mobile: introduce sealHash interface
* miner: evict pending task with threshold
* miner: go fmt
-rw-r--r-- | consensus/clique/clique.go | 5 | ||||
-rw-r--r-- | consensus/consensus.go | 3 | ||||
-rw-r--r-- | consensus/ethash/consensus.go | 29 | ||||
-rw-r--r-- | consensus/ethash/ethash_test.go | 11 | ||||
-rw-r--r-- | consensus/ethash/sealer.go | 4 | ||||
-rw-r--r-- | consensus/ethash/sealer_test.go | 2 | ||||
-rw-r--r-- | core/types/block.go | 23 | ||||
-rw-r--r-- | miner/worker.go | 75 | ||||
-rw-r--r-- | mobile/types.go | 35 |
9 files changed, 113 insertions, 74 deletions
diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 085944701..3730c91f6 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -673,6 +673,11 @@ func CalcDifficulty(snap *Snapshot, signer common.Address) *big.Int { return new(big.Int).Set(diffNoTurn) } +// SealHash returns the hash of a block prior to it being sealed. +func (c *Clique) SealHash(header *types.Header) common.Hash { + return sigHash(header) +} + // Close implements consensus.Engine. It's a noop for clique as there is are no background threads. func (c *Clique) Close() error { return nil diff --git a/consensus/consensus.go b/consensus/consensus.go index 827175444..27799f13c 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -90,6 +90,9 @@ type Engine interface { // seal place on top. Seal(chain ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) + // SealHash returns the hash of a block prior to it being sealed. + SealHash(header *types.Header) common.Hash + // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have. CalcDifficulty(chain ChainReader, time uint64, parent *types.Header) *big.Int diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 86fd997ae..259cc5056 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -31,7 +31,9 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" ) // Ethash proof-of-work protocol constants. @@ -495,7 +497,7 @@ func (ethash *Ethash) verifySeal(chain consensus.ChainReader, header *types.Head if fulldag { dataset := ethash.dataset(number, true) if dataset.generated() { - digest, result = hashimotoFull(dataset.dataset, header.HashNoNonce().Bytes(), header.Nonce.Uint64()) + digest, result = hashimotoFull(dataset.dataset, ethash.SealHash(header).Bytes(), header.Nonce.Uint64()) // Datasets are unmapped in a finalizer. Ensure that the dataset stays alive // until after the call to hashimotoFull so it's not unmapped while being used. @@ -513,7 +515,7 @@ func (ethash *Ethash) verifySeal(chain consensus.ChainReader, header *types.Head if ethash.config.PowMode == ModeTest { size = 32 * 1024 } - digest, result = hashimotoLight(size, cache.cache, header.HashNoNonce().Bytes(), header.Nonce.Uint64()) + digest, result = hashimotoLight(size, cache.cache, ethash.SealHash(header).Bytes(), header.Nonce.Uint64()) // Caches are unmapped in a finalizer. Ensure that the cache stays alive // until after the call to hashimotoLight so it's not unmapped while being used. @@ -552,6 +554,29 @@ func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header return types.NewBlock(header, txs, uncles, receipts), nil } +// SealHash returns the hash of a block prior to it being sealed. +func (ethash *Ethash) SealHash(header *types.Header) (hash common.Hash) { + hasher := sha3.NewKeccak256() + + rlp.Encode(hasher, []interface{}{ + header.ParentHash, + header.UncleHash, + header.Coinbase, + header.Root, + header.TxHash, + header.ReceiptHash, + header.Bloom, + header.Difficulty, + header.Number, + header.GasLimit, + header.GasUsed, + header.Time, + header.Extra, + }) + hasher.Sum(hash[:0]) + return hash +} + // Some weird constants to avoid constant memory allocs for them. var ( big8 = big.NewInt(8) diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index 87ac17c2b..b190d63d6 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -94,6 +94,7 @@ func TestRemoteSealer(t *testing.T) { } header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) + sealhash := ethash.SealHash(header) // Push new work. ethash.Seal(nil, block, nil) @@ -102,27 +103,29 @@ func TestRemoteSealer(t *testing.T) { work [3]string err error ) - if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() { t.Error("expect to return a mining work has same hash") } - if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + if res := api.SubmitWork(types.BlockNonce{}, sealhash, common.Hash{}); res { t.Error("expect to return false when submit a fake solution") } // Push new block with same block number to replace the original one. header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} block = types.NewBlockWithHeader(header) + sealhash = ethash.SealHash(header) ethash.Seal(nil, block, nil) - if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { + if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() { t.Error("expect to return the latest pushed work") } // Push block with higher block number. newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} newBlock := types.NewBlockWithHeader(newHead) + newSealhash := ethash.SealHash(newHead) ethash.Seal(nil, newBlock, nil) - if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { + if res := api.SubmitWork(types.BlockNonce{}, newSealhash, common.Hash{}); res { t.Error("expect to return false when submit a stale solution") } } diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index c3b2c86d1..a458c60f6 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -111,7 +111,7 @@ func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan s // Extract some data from the header var ( header = block.Header() - hash = header.HashNoNonce().Bytes() + hash = ethash.SealHash(header).Bytes() target = new(big.Int).Div(two256, header.Difficulty) number = header.Number.Uint64() dataset = ethash.dataset(number, false) @@ -213,7 +213,7 @@ func (ethash *Ethash) remote(notify []string) { // result[1], 32 bytes hex encoded seed hash used for DAG // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty makeWork := func(block *types.Block) { - hash := block.HashNoNonce() + hash := ethash.SealHash(block.Header()) currentWork[0] = hash.Hex() currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go index 6c7157a5a..d1b66f9cf 100644 --- a/consensus/ethash/sealer_test.go +++ b/consensus/ethash/sealer_test.go @@ -51,7 +51,7 @@ func TestRemoteNotify(t *testing.T) { ethash.Seal(nil, block, nil) select { case work := <-sink: - if want := header.HashNoNonce().Hex(); work[0] != want { + if want := ethash.SealHash(header).Hex(); work[0] != want { t.Errorf("work packet hash mismatch: have %s, want %s", work[0], want) } if want := common.BytesToHash(SeedHash(header.Number.Uint64())).Hex(); work[1] != want { diff --git a/core/types/block.go b/core/types/block.go index ae1b4299d..8a21bba1e 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -102,25 +102,6 @@ func (h *Header) Hash() common.Hash { return rlpHash(h) } -// HashNoNonce returns the hash which is used as input for the proof-of-work search. -func (h *Header) HashNoNonce() common.Hash { - return rlpHash([]interface{}{ - h.ParentHash, - h.UncleHash, - h.Coinbase, - h.Root, - h.TxHash, - h.ReceiptHash, - h.Bloom, - h.Difficulty, - h.Number, - h.GasLimit, - h.GasUsed, - h.Time, - h.Extra, - }) -} - // Size returns the approximate memory used by all internal contents. It is used // to approximate and limit the memory consumption of various caches. func (h *Header) Size() common.StorageSize { @@ -324,10 +305,6 @@ func (b *Block) Header() *Header { return CopyHeader(b.header) } // Body returns the non-header content of the block. func (b *Block) Body() *Body { return &Body{b.transactions, b.uncles} } -func (b *Block) HashNoNonce() common.Hash { - return b.header.HashNoNonce() -} - // Size returns the true RLP encoded storage size of the block, either by encoding // and returning it, or returning a previsouly cached value. func (b *Block) Size() common.StorageSize { diff --git a/miner/worker.go b/miner/worker.go index 8c3337ba4..18fb12e45 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -72,6 +72,9 @@ const ( // intervalAdjustBias is applied during the new resubmit interval calculation in favor of // increasing upper limit or decreasing lower limit so that the limit can be reachable. intervalAdjustBias = 200 * 1000.0 * 1000.0 + + // staleThreshold is the maximum distance of the acceptable stale block. + staleThreshold = 7 ) // environment is the worker's current environment and holds all of the current state information. @@ -150,6 +153,9 @@ type worker struct { coinbase common.Address extra []byte + pendingMu sync.RWMutex + pendingTasks map[common.Hash]*task + snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot snapshotBlock *types.Block snapshotState *state.StateDB @@ -174,6 +180,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, chain: eth.BlockChain(), possibleUncles: make(map[common.Hash]*types.Block), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + pendingTasks: make(map[common.Hash]*task), txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), @@ -317,13 +324,25 @@ func (w *worker) newWorkLoop(recommit time.Duration) { } recommit = time.Duration(int64(next)) } + // clearPending cleans the stale pending tasks. + clearPending := func(number uint64) { + w.pendingMu.Lock() + for h, t := range w.pendingTasks { + if t.block.NumberU64()+staleThreshold <= number { + delete(w.pendingTasks, h) + } + } + w.pendingMu.Unlock() + } for { select { case <-w.startCh: + clearPending(w.chain.CurrentBlock().NumberU64()) commit(false, commitInterruptNewHead) - case <-w.chainHeadCh: + case head := <-w.chainHeadCh: + clearPending(head.Block.NumberU64()) commit(false, commitInterruptNewHead) case <-timer.C: @@ -454,28 +473,37 @@ func (w *worker) mainLoop() { // seal pushes a sealing task to consensus engine and submits the result. func (w *worker) seal(t *task, stop <-chan struct{}) { - var ( - err error - res *task - ) - if w.skipSealHook != nil && w.skipSealHook(t) { return } - - if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil { - log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(), - "elapsed", common.PrettyDuration(time.Since(t.createdAt))) - res = t - } else { - if err != nil { - log.Warn("Block sealing failed", "err", err) + // The reason for caching task first is: + // A previous sealing action will be canceled by subsequent actions, + // however, remote miner may submit a result based on the cancelled task. + // So we should only submit the pending state corresponding to the seal result. + // TODO(rjl493456442) Replace the seal-wait logic structure + w.pendingMu.Lock() + w.pendingTasks[w.engine.SealHash(t.block.Header())] = t + w.pendingMu.Unlock() + + if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil { + sealhash := w.engine.SealHash(block.Header()) + w.pendingMu.RLock() + task, exist := w.pendingTasks[sealhash] + w.pendingMu.RUnlock() + if !exist { + log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash()) + return } - res = nil - } - select { - case w.resultCh <- res: - case <-w.exitCh: + // Assemble sealing result + task.block = block + log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(), + "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + select { + case w.resultCh <- task: + case <-w.exitCh: + } + } else if err != nil { + log.Warn("Block sealing failed", "err", err) } } @@ -501,12 +529,13 @@ func (w *worker) taskLoop() { w.newTaskHook(task) } // Reject duplicate sealing work due to resubmitting. - if task.block.HashNoNonce() == prev { + sealHash := w.engine.SealHash(task.block.Header()) + if sealHash == prev { continue } interrupt() stopCh = make(chan struct{}) - prev = task.block.HashNoNonce() + prev = sealHash go w.seal(task, stopCh) case <-w.exitCh: interrupt() @@ -928,8 +957,8 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st } feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) - log.Info("Commit new mining work", "number", block.Number(), "uncles", len(uncles), "txs", w.current.tcount, - "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start))) + log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), + "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: log.Info("Worker has exited") diff --git a/mobile/types.go b/mobile/types.go index f32b4918f..b2780f307 100644 --- a/mobile/types.go +++ b/mobile/types.go @@ -168,25 +168,22 @@ func (b *Block) EncodeJSON() (string, error) { return string(data), err } -func (b *Block) GetParentHash() *Hash { return &Hash{b.block.ParentHash()} } -func (b *Block) GetUncleHash() *Hash { return &Hash{b.block.UncleHash()} } -func (b *Block) GetCoinbase() *Address { return &Address{b.block.Coinbase()} } -func (b *Block) GetRoot() *Hash { return &Hash{b.block.Root()} } -func (b *Block) GetTxHash() *Hash { return &Hash{b.block.TxHash()} } -func (b *Block) GetReceiptHash() *Hash { return &Hash{b.block.ReceiptHash()} } -func (b *Block) GetBloom() *Bloom { return &Bloom{b.block.Bloom()} } -func (b *Block) GetDifficulty() *BigInt { return &BigInt{b.block.Difficulty()} } -func (b *Block) GetNumber() int64 { return b.block.Number().Int64() } -func (b *Block) GetGasLimit() int64 { return int64(b.block.GasLimit()) } -func (b *Block) GetGasUsed() int64 { return int64(b.block.GasUsed()) } -func (b *Block) GetTime() int64 { return b.block.Time().Int64() } -func (b *Block) GetExtra() []byte { return b.block.Extra() } -func (b *Block) GetMixDigest() *Hash { return &Hash{b.block.MixDigest()} } -func (b *Block) GetNonce() int64 { return int64(b.block.Nonce()) } - -func (b *Block) GetHash() *Hash { return &Hash{b.block.Hash()} } -func (b *Block) GetHashNoNonce() *Hash { return &Hash{b.block.HashNoNonce()} } - +func (b *Block) GetParentHash() *Hash { return &Hash{b.block.ParentHash()} } +func (b *Block) GetUncleHash() *Hash { return &Hash{b.block.UncleHash()} } +func (b *Block) GetCoinbase() *Address { return &Address{b.block.Coinbase()} } +func (b *Block) GetRoot() *Hash { return &Hash{b.block.Root()} } +func (b *Block) GetTxHash() *Hash { return &Hash{b.block.TxHash()} } +func (b *Block) GetReceiptHash() *Hash { return &Hash{b.block.ReceiptHash()} } +func (b *Block) GetBloom() *Bloom { return &Bloom{b.block.Bloom()} } +func (b *Block) GetDifficulty() *BigInt { return &BigInt{b.block.Difficulty()} } +func (b *Block) GetNumber() int64 { return b.block.Number().Int64() } +func (b *Block) GetGasLimit() int64 { return int64(b.block.GasLimit()) } +func (b *Block) GetGasUsed() int64 { return int64(b.block.GasUsed()) } +func (b *Block) GetTime() int64 { return b.block.Time().Int64() } +func (b *Block) GetExtra() []byte { return b.block.Extra() } +func (b *Block) GetMixDigest() *Hash { return &Hash{b.block.MixDigest()} } +func (b *Block) GetNonce() int64 { return int64(b.block.Nonce()) } +func (b *Block) GetHash() *Hash { return &Hash{b.block.Hash()} } func (b *Block) GetHeader() *Header { return &Header{b.block.Header()} } func (b *Block) GetUncles() *Headers { return &Headers{b.block.Uncles()} } func (b *Block) GetTransactions() *Transactions { return &Transactions{b.block.Transactions()} } |