diff options
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 111 |
1 files changed, 60 insertions, 51 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 843247c71..dd3edeb45 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -62,6 +62,9 @@ var ( blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) + blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) + blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + ErrNoGenesis = errors.New("Genesis not found in chain") ) @@ -87,10 +90,11 @@ const ( // CacheConfig contains the configuration values for the trie caching/pruning // that's resident in a blockchain. type CacheConfig struct { - Disabled bool // Whether to disable trie write caching (archive node) - TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory - TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk - TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks + TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk + TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk } // BlockChain represents the canonical chain given a database with a genesis @@ -126,7 +130,6 @@ type BlockChain struct { genesisBlock *types.Block chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock checkpoint int // checkpoint counts towards the new checkpoint currentBlock atomic.Value // Current head of the block chain @@ -145,10 +148,11 @@ type BlockChain struct { procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup // chain processing wait group for shutting down - engine consensus.Engine - processor Processor // block processor interface - validator Validator // block and state validator interface - vmConfig vm.Config + engine consensus.Engine + validator Validator // Block and state validator interface + prefetcher Prefetcher // Block state prefetcher interface + processor Processor // Block transaction processor interface + vmConfig vm.Config badBlocks *lru.Cache // Bad block cache shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. @@ -189,8 +193,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par vmConfig: vmConfig, badBlocks: badBlocks, } - bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) - bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) + bc.validator = NewBlockValidator(chainConfig, bc, engine) + bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) + bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) @@ -381,31 +386,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block { return bc.currentFastBlock.Load().(*types.Block) } -// SetProcessor sets the processor required for making state modifications. -func (bc *BlockChain) SetProcessor(processor Processor) { - bc.procmu.Lock() - defer bc.procmu.Unlock() - bc.processor = processor -} - -// SetValidator sets the validator which is used to validate incoming blocks. -func (bc *BlockChain) SetValidator(validator Validator) { - bc.procmu.Lock() - defer bc.procmu.Unlock() - bc.validator = validator -} - // Validator returns the current validator. func (bc *BlockChain) Validator() Validator { - bc.procmu.RLock() - defer bc.procmu.RUnlock() return bc.validator } // Processor returns the current processor. func (bc *BlockChain) Processor() Processor { - bc.procmu.RLock() - defer bc.procmu.RUnlock() return bc.processor } @@ -722,7 +709,7 @@ func (bc *BlockChain) Stop() { // - HEAD: So we don't need to reprocess any blocks in the general case // - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle // - HEAD-127: So we have a hard limit on the number of blocks reexecuted - if !bc.cacheConfig.Disabled { + if !bc.cacheConfig.TrieDirtyDisabled { triedb := bc.stateCache.TrieDB() for _, offset := range []uint64{0, 1, triesInMemory - 1} { @@ -982,7 +969,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. triedb := bc.stateCache.TrieDB() // If we're running an archive node, always flush - if bc.cacheConfig.Disabled { + if bc.cacheConfig.TrieDirtyDisabled { if err := triedb.Commit(root, false); err != nil { return NonStatTy, err } @@ -1147,7 +1134,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { - // If the chain is terminating, don't even bother starting u + // If the chain is terminating, don't even bother starting up if atomic.LoadInt32(&bc.procInterrupt) == 1 { return 0, nil, nil, nil } @@ -1175,7 +1162,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] defer close(abort) // Peek the error for the first block to decide the directing import logic - it := newInsertIterator(chain, results, bc.Validator()) + it := newInsertIterator(chain, results, bc.validator) block, err := it.next() @@ -1238,54 +1225,76 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - state, err := state.New(parent.Root, bc.stateCache) + statedb, err := state.New(parent.Root, bc.stateCache) if err != nil { return it.index, events, coalescedLogs, err } - // Process block using the parent state as reference point. + // If we have a followup block, run that against the current state to pre-cache + // transactions and probabilistically some of the account/storage trie nodes. + var followupInterrupt uint32 + + if !bc.cacheConfig.TrieCleanNoPrefetch { + if followup, err := it.peek(); followup != nil && err == nil { + go func(start time.Time) { + throwaway, _ := state.New(parent.Root, bc.stateCache) + bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + + blockPrefetchExecuteTimer.Update(time.Since(start)) + if atomic.LoadUint32(&followupInterrupt) == 1 { + blockPrefetchInterruptMeter.Mark(1) + } + }(time.Now()) + } + } + // Process block using the parent state as reference point substart := time.Now() - receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) + receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) + atomic.StoreUint32(&followupInterrupt, 1) return it.index, events, coalescedLogs, err } // Update the metrics touched during block processing - accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them - storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them - accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them - storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them - triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation - trieproc := state.AccountReads + state.AccountUpdates - trieproc += state.StorageReads + state.StorageUpdates + triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation + trieproc := statedb.AccountReads + statedb.AccountUpdates + trieproc += statedb.StorageReads + statedb.StorageUpdates blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) // Validate the state using the default validator substart = time.Now() - if err := bc.Validator().ValidateState(block, state, receipts, usedGas); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) + atomic.StoreUint32(&followupInterrupt, 1) return it.index, events, coalescedLogs, err } proctime := time.Since(start) // Update the metrics touched during block validation - accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them - storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them + storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them - blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash)) + blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)) // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState(block, receipts, state) + status, err := bc.writeBlockWithState(block, receipts, statedb) if err != nil { + atomic.StoreUint32(&followupInterrupt, 1) return it.index, events, coalescedLogs, err } + atomic.StoreUint32(&followupInterrupt, 1) + // Update the metrics touched during block commit - accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them - storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them + accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them + storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them - blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits) + blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits) blockInsertTimer.UpdateSince(start) switch status { |