aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2019-04-02 22:03:12 +0800
committerGitHub <noreply@github.com>2019-04-02 22:03:12 +0800
commite14f8a408c17fd6c57d769cd4635ad6cc8bde769 (patch)
treeba0d75fe1d1797be2322366e3d74cc54f77a6184 /core
parent88d7119ebb7bcb38e16f0feb571c98f2197d7cf3 (diff)
parented34a5e08a475fdf1b3116b1f17879411bfe411d (diff)
downloadgo-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.tar
go-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.tar.gz
go-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.tar.bz2
go-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.tar.lz
go-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.tar.xz
go-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.tar.zst
go-tangerine-e14f8a408c17fd6c57d769cd4635ad6cc8bde769.zip
Merge pull request #19328 from karalabe/preload
core: prefetch next block state concurrently
Diffstat (limited to 'core')
-rw-r--r--core/blockchain.go111
-rw-r--r--core/blockchain_insert.go43
-rw-r--r--core/blockchain_test.go2
-rw-r--r--core/state_prefetcher.go85
-rw-r--r--core/types.go17
-rw-r--r--core/vm/interpreter.go25
6 files changed, 202 insertions, 81 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 843247c71..dd3edeb45 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -62,6 +62,9 @@ var (
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
+ blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
+ blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
+
ErrNoGenesis = errors.New("Genesis not found in chain")
)
@@ -87,10 +90,11 @@ const (
// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
- Disabled bool // Whether to disable trie write caching (archive node)
- TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
- TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
- TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
+ TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
+ TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
+ TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
+ TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
+ TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
}
// BlockChain represents the canonical chain given a database with a genesis
@@ -126,7 +130,6 @@ type BlockChain struct {
genesisBlock *types.Block
chainmu sync.RWMutex // blockchain insertion lock
- procmu sync.RWMutex // block processor lock
checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain
@@ -145,10 +148,11 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down
- engine consensus.Engine
- processor Processor // block processor interface
- validator Validator // block and state validator interface
- vmConfig vm.Config
+ engine consensus.Engine
+ validator Validator // Block and state validator interface
+ prefetcher Prefetcher // Block state prefetcher interface
+ processor Processor // Block transaction processor interface
+ vmConfig vm.Config
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
@@ -189,8 +193,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
vmConfig: vmConfig,
badBlocks: badBlocks,
}
- bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
- bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
+ bc.validator = NewBlockValidator(chainConfig, bc, engine)
+ bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
+ bc.processor = NewStateProcessor(chainConfig, bc, engine)
var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
@@ -381,31 +386,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block {
return bc.currentFastBlock.Load().(*types.Block)
}
-// SetProcessor sets the processor required for making state modifications.
-func (bc *BlockChain) SetProcessor(processor Processor) {
- bc.procmu.Lock()
- defer bc.procmu.Unlock()
- bc.processor = processor
-}
-
-// SetValidator sets the validator which is used to validate incoming blocks.
-func (bc *BlockChain) SetValidator(validator Validator) {
- bc.procmu.Lock()
- defer bc.procmu.Unlock()
- bc.validator = validator
-}
-
// Validator returns the current validator.
func (bc *BlockChain) Validator() Validator {
- bc.procmu.RLock()
- defer bc.procmu.RUnlock()
return bc.validator
}
// Processor returns the current processor.
func (bc *BlockChain) Processor() Processor {
- bc.procmu.RLock()
- defer bc.procmu.RUnlock()
return bc.processor
}
@@ -722,7 +709,7 @@ func (bc *BlockChain) Stop() {
// - HEAD: So we don't need to reprocess any blocks in the general case
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
- if !bc.cacheConfig.Disabled {
+ if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()
for _, offset := range []uint64{0, 1, triesInMemory - 1} {
@@ -982,7 +969,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush
- if bc.cacheConfig.Disabled {
+ if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err
}
@@ -1147,7 +1134,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
- // If the chain is terminating, don't even bother starting u
+ // If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
}
@@ -1175,7 +1162,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
defer close(abort)
// Peek the error for the first block to decide the directing import logic
- it := newInsertIterator(chain, results, bc.Validator())
+ it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()
@@ -1238,54 +1225,76 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
- state, err := state.New(parent.Root, bc.stateCache)
+ statedb, err := state.New(parent.Root, bc.stateCache)
if err != nil {
return it.index, events, coalescedLogs, err
}
- // Process block using the parent state as reference point.
+ // If we have a followup block, run that against the current state to pre-cache
+ // transactions and probabilistically some of the account/storage trie nodes.
+ var followupInterrupt uint32
+
+ if !bc.cacheConfig.TrieCleanNoPrefetch {
+ if followup, err := it.peek(); followup != nil && err == nil {
+ go func(start time.Time) {
+ throwaway, _ := state.New(parent.Root, bc.stateCache)
+ bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
+
+ blockPrefetchExecuteTimer.Update(time.Since(start))
+ if atomic.LoadUint32(&followupInterrupt) == 1 {
+ blockPrefetchInterruptMeter.Mark(1)
+ }
+ }(time.Now())
+ }
+ }
+ // Process block using the parent state as reference point
substart := time.Now()
- receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
+ receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
+ atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
// Update the metrics touched during block processing
- accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them
- storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them
- accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them
- storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them
+ accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
+ storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
+ accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
+ storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
- triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation
- trieproc := state.AccountReads + state.AccountUpdates
- trieproc += state.StorageReads + state.StorageUpdates
+ triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
+ trieproc := statedb.AccountReads + statedb.AccountUpdates
+ trieproc += statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
// Validate the state using the default validator
substart = time.Now()
- if err := bc.Validator().ValidateState(block, state, receipts, usedGas); err != nil {
+ if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
+ atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
proctime := time.Since(start)
// Update the metrics touched during block validation
- accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them
- storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them
+ accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
+ storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
- blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash))
+ blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
// Write the block to the chain and get the status.
substart = time.Now()
- status, err := bc.writeBlockWithState(block, receipts, state)
+ status, err := bc.writeBlockWithState(block, receipts, statedb)
if err != nil {
+ atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
+ atomic.StoreUint32(&followupInterrupt, 1)
+
// Update the metrics touched during block commit
- accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them
- storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them
+ accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
+ storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
- blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits)
+ blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
blockInsertTimer.UpdateSince(start)
switch status {
diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go
index e2a385164..e4d758d4c 100644
--- a/core/blockchain_insert.go
+++ b/core/blockchain_insert.go
@@ -80,10 +80,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
// insertIterator is a helper to assist during chain import.
type insertIterator struct {
- chain types.Blocks
- results <-chan error
- index int
- validator Validator
+ chain types.Blocks // Chain of blocks being iterated over
+
+ results <-chan error // Verification result sink from the consensus engine
+ errors []error // Header verification errors for the blocks
+
+ index int // Current offset of the iterator
+ validator Validator // Validator to run if verification succeeds
}
// newInsertIterator creates a new iterator based on the given blocks, which are
@@ -92,6 +95,7 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
return &insertIterator{
chain: chain,
results: results,
+ errors: make([]error, 0, len(chain)),
index: -1,
validator: validator,
}
@@ -100,17 +104,44 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
// next returns the next block in the iterator, along with any potential validation
// error for that block. When the end is reached, it will return (nil, nil).
func (it *insertIterator) next() (*types.Block, error) {
+ // If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
it.index = len(it.chain)
return nil, nil
}
+ // Advance the iterator and wait for verification result if not yet done
it.index++
- if err := <-it.results; err != nil {
- return it.chain[it.index], err
+ if len(it.errors) <= it.index {
+ it.errors = append(it.errors, <-it.results)
+ }
+ if it.errors[it.index] != nil {
+ return it.chain[it.index], it.errors[it.index]
}
+ // Block header valid, run body validation and return
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
}
+// peek returns the next block in the iterator, along with any potential validation
+// error for that block, but does **not** advance the iterator.
+//
+// Both header and body validation errors (nil too) is cached into the iterator
+// to avoid duplicating work on the following next() call.
+func (it *insertIterator) peek() (*types.Block, error) {
+ // If we reached the end of the chain, abort
+ if it.index+1 >= len(it.chain) {
+ return nil, nil
+ }
+ // Wait for verification result if not yet done
+ if len(it.errors) <= it.index+1 {
+ it.errors = append(it.errors, <-it.results)
+ }
+ if it.errors[it.index+1] != nil {
+ return it.chain[it.index+1], it.errors[it.index+1]
+ }
+ // Block header valid, ignore body validation since we don't have a parent anyway
+ return it.chain[it.index+1], nil
+}
+
// previous returns the previous header that was being processed, or nil.
func (it *insertIterator) previous() *types.Header {
if it.index < 1 {
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index d1681ce3b..c8cae969c 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -144,7 +144,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
- receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb, vm.Config{})
+ receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go
new file mode 100644
index 000000000..cb85a05b5
--- /dev/null
+++ b/core/state_prefetcher.go
@@ -0,0 +1,85 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
+// of an arbitrary state with the goal of prefetching potentially useful state
+// data from disk before the main block processor start executing.
+type statePrefetcher struct {
+ config *params.ChainConfig // Chain configuration options
+ bc *BlockChain // Canonical block chain
+ engine consensus.Engine // Consensus engine used for block rewards
+}
+
+// newStatePrefetcher initialises a new statePrefetcher.
+func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher {
+ return &statePrefetcher{
+ config: config,
+ bc: bc,
+ engine: engine,
+ }
+}
+
+// Prefetch processes the state changes according to the Ethereum rules by running
+// the transaction messages using the statedb, but any changes are discarded. The
+// only goal is to pre-cache transaction signatures and state trie nodes.
+func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
+ var (
+ header = block.Header()
+ gaspool = new(GasPool).AddGas(block.GasLimit())
+ )
+ // Iterate over and process the individual transactions
+ for i, tx := range block.Transactions() {
+ // If block precaching was interrupted, abort
+ if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
+ return
+ }
+ // Block precaching permitted to continue, execute the transaction
+ statedb.Prepare(tx.Hash(), block.Hash(), i)
+ if err := precacheTransaction(p.config, p.bc, nil, gaspool, statedb, header, tx, cfg); err != nil {
+ return // Ugh, something went horribly wrong, bail out
+ }
+ }
+}
+
+// precacheTransaction attempts to apply a transaction to the given state database
+// and uses the input parameters for its environment. The goal is not to execute
+// the transaction successfully, rather to warm up touched data slots.
+func precacheTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gaspool *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, cfg vm.Config) error {
+ // Convert the transaction into an executable message and pre-cache its sender
+ msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
+ if err != nil {
+ return err
+ }
+ // Create the EVM and execute the transaction
+ context := NewEVMContext(msg, header, bc, author)
+ vm := vm.NewEVM(context, statedb, config, cfg)
+
+ _, _, _, err = ApplyMessage(vm, msg, gaspool)
+ return err
+}
diff --git a/core/types.go b/core/types.go
index 5c963e665..4c5b74a49 100644
--- a/core/types.go
+++ b/core/types.go
@@ -25,7 +25,6 @@ import (
// Validator is an interface which defines the standard for block validation. It
// is only responsible for validating block contents, as the header validation is
// done by the specific consensus engines.
-//
type Validator interface {
// ValidateBody validates the given block's content.
ValidateBody(block *types.Block) error
@@ -35,12 +34,18 @@ type Validator interface {
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
}
+// Prefetcher is an interface for pre-caching transaction signatures and state.
+type Prefetcher interface {
+ // Prefetch processes the state changes according to the Ethereum rules by running
+ // the transaction messages using the statedb, but any changes are discarded. The
+ // only goal is to pre-cache transaction signatures and state trie nodes.
+ Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
+}
+
// Processor is an interface for processing blocks using a given initial state.
-//
-// Process takes the block to be processed and the statedb upon which the
-// initial state is based. It should return the receipts generated, amount
-// of gas used in the process and return an error if any of the internal rules
-// failed.
type Processor interface {
+ // Process processes the state changes according to the Ethereum rules by running
+ // the transaction messages using the statedb and applying any rewards to both
+ // the processor (coinbase) and any included uncles.
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
}
diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go
index 417665370..989f85f5d 100644
--- a/core/vm/interpreter.go
+++ b/core/vm/interpreter.go
@@ -28,24 +28,15 @@ import (
// Config are the configuration options for the Interpreter
type Config struct {
- // Debug enabled debugging Interpreter options
- Debug bool
- // Tracer is the op code logger
- Tracer Tracer
- // NoRecursion disabled Interpreter call, callcode,
- // delegate call and create.
- NoRecursion bool
- // Enable recording of SHA3/keccak preimages
- EnablePreimageRecording bool
- // JumpTable contains the EVM instruction table. This
- // may be left uninitialised and will be set to the default
- // table.
- JumpTable [256]operation
+ Debug bool // Enables debugging
+ Tracer Tracer // Opcode logger
+ NoRecursion bool // Disables call, callcode, delegate call and create
+ EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
- // Type of the EWASM interpreter
- EWASMInterpreter string
- // Type of the EVM interpreter
- EVMInterpreter string
+ JumpTable [256]operation // EVM instruction table, automatically populated if unset
+
+ EWASMInterpreter string // External EWASM interpreter options
+ EVMInterpreter string // External EVM interpreter options
}
// Interpreter is used to run Ethereum based contracts and will utilise the