From 55599ee95d4151a2502465e0afc7c47bd1acba77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 5 Feb 2018 18:40:32 +0200 Subject: core, trie: intermediate mempool between trie and database (#15857) This commit reduces database I/O by not writing every state trie to disk. --- core/bench_test.go | 4 +- core/block_validator.go | 9 +- core/block_validator_test.go | 8 +- core/blockchain.go | 334 ++++++++++++++++++++++++++++++++++--------- core/blockchain_test.go | 145 ++++++++++++++++--- core/chain_indexer.go | 3 + core/chain_makers.go | 9 +- core/chain_makers_test.go | 2 +- core/dao_test.go | 24 +++- core/genesis.go | 24 ++-- core/genesis_test.go | 6 +- core/state/database.go | 51 +++++-- core/state/iterator_test.go | 17 ++- core/state/state_object.go | 5 +- core/state/state_test.go | 6 +- core/state/statedb.go | 38 ++++- core/state/statedb_test.go | 14 +- core/state/sync_test.go | 44 +++--- core/tx_pool_test.go | 4 +- core/types/block.go | 9 ++ core/types/receipt.go | 13 ++ core/types/transaction.go | 2 + 22 files changed, 581 insertions(+), 190 deletions(-) (limited to 'core') diff --git a/core/bench_test.go b/core/bench_test.go index f976331d1..e23f0d19d 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -173,7 +173,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Time the insertion of the new chain. // State and blocks are stored in the same DB. - chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + chainman, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer chainman.Stop() b.ReportAllocs() b.ResetTimer() @@ -283,7 +283,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) { if err != nil { b.Fatalf("error opening database at %v: %v", dir, err) } - chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) + chain, err := NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) if err != nil { b.Fatalf("error creating chain: %v", err) } diff --git a/core/block_validator.go b/core/block_validator.go index 143728bb8..98958809b 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -50,11 +50,14 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin // validated at this point. func (v *BlockValidator) ValidateBody(block *types.Block) error { // Check whether the block's known, and if not, that it's linkable - if v.bc.HasBlockAndState(block.Hash()) { + if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { return ErrKnownBlock } - if !v.bc.HasBlockAndState(block.ParentHash()) { - return consensus.ErrUnknownAncestor + if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { + if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { + return consensus.ErrUnknownAncestor + } + return consensus.ErrPrunedAncestor } // Header validity is known at this point, check the uncles and transactions header := block.Header() diff --git a/core/block_validator_test.go b/core/block_validator_test.go index e668601f3..e334b3c3c 100644 --- a/core/block_validator_test.go +++ b/core/block_validator_test.go @@ -42,7 +42,7 @@ func TestHeaderVerification(t *testing.T) { headers[i] = block.Header() } // Run the header checker for blocks one-by-one, checking for both valid and invalid nonces - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) defer chain.Stop() for i := 0; i < len(blocks); i++ { @@ -106,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) { var results <-chan error if valid { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } else { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } @@ -173,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) { defer runtime.GOMAXPROCS(old) // Start the verifications and immediately abort - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}) + chain, _ := NewBlockChain(testdb, nil, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}) defer chain.Stop() abort, results := chain.engine.VerifyHeaders(chain, headers, seals) diff --git a/core/blockchain.go b/core/blockchain.go index d5e139e31..8d141fddb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -42,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" "github.com/hashicorp/golang-lru" + "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) var ( @@ -56,11 +57,20 @@ const ( maxFutureBlocks = 256 maxTimeFutureBlocks = 30 badBlockLimit = 10 + triesInMemory = 128 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. BlockChainVersion = 3 ) +// CacheConfig contains the configuration values for the trie caching/pruning +// that's resident in a blockchain. +type CacheConfig struct { + Disabled bool // Whether to disable trie write caching (archive node) + TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk +} + // BlockChain represents the canonical chain given a database with a genesis // block. The Blockchain manages chain imports, reverts, chain reorganisations. // @@ -76,10 +86,14 @@ const ( // included in the canonical one where as GetBlockByNumber always represents the // canonical chain. type BlockChain struct { - config *params.ChainConfig // chain & network configuration + chainConfig *params.ChainConfig // Chain & network configuration + cacheConfig *CacheConfig // Cache configuration for pruning + + db ethdb.Database // Low level persistent database to store final content in + triegc *prque.Prque // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping hc *HeaderChain - chainDb ethdb.Database rmLogsFeed event.Feed chainFeed event.Feed chainSideFeed event.Feed @@ -119,7 +133,13 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { +func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { + if cacheConfig == nil { + cacheConfig = &CacheConfig{ + TrieNodeLimit: 256 * 1024 * 1024, + TrieTimeLimit: 5 * time.Minute, + } + } bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -127,9 +147,11 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ - config: config, - chainDb: chainDb, - stateCache: state.NewDatabase(chainDb), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(), + stateCache: state.NewDatabase(db), quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -139,11 +161,11 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co vmConfig: vmConfig, badBlocks: badBlocks, } - bc.SetValidator(NewBlockValidator(config, bc, engine)) - bc.SetProcessor(NewStateProcessor(config, bc, engine)) + bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) + bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) var err error - bc.hc, err = NewHeaderChain(chainDb, config, engine, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) if err != nil { return nil, err } @@ -180,7 +202,7 @@ func (bc *BlockChain) getProcInterrupt() bool { // assumes that the chain manager mutex is held. func (bc *BlockChain) loadLastState() error { // Restore the last known head block - head := GetHeadBlockHash(bc.chainDb) + head := GetHeadBlockHash(bc.db) if head == (common.Hash{}) { // Corrupt or empty database, init from scratch log.Warn("Empty database, resetting chain") @@ -196,15 +218,17 @@ func (bc *BlockChain) loadLastState() error { // Make sure the state associated with the block is available if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { // Dangling block without a state associated, init from scratch - log.Warn("Head state missing, resetting chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) - return bc.Reset() + log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) + if err := bc.repair(¤tBlock); err != nil { + return err + } } // Everything seems to be fine, set as the head block bc.currentBlock = currentBlock // Restore the last known head header currentHeader := bc.currentBlock.Header() - if head := GetHeadHeaderHash(bc.chainDb); head != (common.Hash{}) { + if head := GetHeadHeaderHash(bc.db); head != (common.Hash{}) { if header := bc.GetHeaderByHash(head); header != nil { currentHeader = header } @@ -213,7 +237,7 @@ func (bc *BlockChain) loadLastState() error { // Restore the last known head fast block bc.currentFastBlock = bc.currentBlock - if head := GetHeadFastBlockHash(bc.chainDb); head != (common.Hash{}) { + if head := GetHeadFastBlockHash(bc.db); head != (common.Hash{}) { if block := bc.GetBlockByHash(head); block != nil { bc.currentFastBlock = block } @@ -243,7 +267,7 @@ func (bc *BlockChain) SetHead(head uint64) error { // Rewind the header chain, deleting all block bodies until then delFn := func(hash common.Hash, num uint64) { - DeleteBody(bc.chainDb, hash, num) + DeleteBody(bc.db, hash, num) } bc.hc.SetHead(head, delFn) currentHeader := bc.hc.CurrentHeader() @@ -275,10 +299,10 @@ func (bc *BlockChain) SetHead(head uint64) error { if bc.currentFastBlock == nil { bc.currentFastBlock = bc.genesisBlock } - if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil { + if err := WriteHeadBlockHash(bc.db, bc.currentBlock.Hash()); err != nil { log.Crit("Failed to reset head full block", "err", err) } - if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil { + if err := WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash()); err != nil { log.Crit("Failed to reset head fast block", "err", err) } return bc.loadLastState() @@ -292,7 +316,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if block == nil { return fmt.Errorf("non existent block [%x…]", hash[:4]) } - if _, err := trie.NewSecure(block.Root(), bc.chainDb, 0); err != nil { + if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB(), 0); err != nil { return err } // If all checks out, manually set the head block @@ -387,7 +411,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { log.Crit("Failed to write genesis block TD", "err", err) } - if err := WriteBlock(bc.chainDb, genesis); err != nil { + if err := WriteBlock(bc.db, genesis); err != nil { log.Crit("Failed to write genesis block", "err", err) } bc.genesisBlock = genesis @@ -400,6 +424,24 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { return nil } +// repair tries to repair the current blockchain by rolling back the current block +// until one with associated state is found. This is needed to fix incomplete db +// writes caused either by crashes/power outages, or simply non-committed tries. +// +// This method only rolls back the current block. The current header and current +// fast block are left intact. +func (bc *BlockChain) repair(head **types.Block) error { + for { + // Abort if we've rewound to a head block that does have associated state + if _, err := state.New((*head).Root(), bc.stateCache); err == nil { + log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash()) + return nil + } + // Otherwise rewind one block and recheck state availability there + (*head) = bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1) + } +} + // Export writes the active chain to the given writer. func (bc *BlockChain) Export(w io.Writer) error { return bc.ExportN(w, uint64(0), bc.currentBlock.NumberU64()) @@ -437,13 +479,13 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { // Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) insert(block *types.Block) { // If the block is on a side chain or an unknown one, force other heads onto it too - updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash() + updateHeads := GetCanonicalHash(bc.db, block.NumberU64()) != block.Hash() // Add the block to the canonical chain number scheme and mark as the head - if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil { + if err := WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()); err != nil { log.Crit("Failed to insert block number", "err", err) } - if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil { + if err := WriteHeadBlockHash(bc.db, block.Hash()); err != nil { log.Crit("Failed to insert head block hash", "err", err) } bc.currentBlock = block @@ -452,7 +494,7 @@ func (bc *BlockChain) insert(block *types.Block) { if updateHeads { bc.hc.SetCurrentHeader(block.Header()) - if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil { + if err := WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil { log.Crit("Failed to insert head fast block hash", "err", err) } bc.currentFastBlock = block @@ -472,7 +514,7 @@ func (bc *BlockChain) GetBody(hash common.Hash) *types.Body { body := cached.(*types.Body) return body } - body := GetBody(bc.chainDb, hash, bc.hc.GetBlockNumber(hash)) + body := GetBody(bc.db, hash, bc.hc.GetBlockNumber(hash)) if body == nil { return nil } @@ -488,7 +530,7 @@ func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue { if cached, ok := bc.bodyRLPCache.Get(hash); ok { return cached.(rlp.RawValue) } - body := GetBodyRLP(bc.chainDb, hash, bc.hc.GetBlockNumber(hash)) + body := GetBodyRLP(bc.db, hash, bc.hc.GetBlockNumber(hash)) if len(body) == 0 { return nil } @@ -502,21 +544,25 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool { if bc.blockCache.Contains(hash) { return true } - ok, _ := bc.chainDb.Has(blockBodyKey(hash, number)) + ok, _ := bc.db.Has(blockBodyKey(hash, number)) return ok } +// HasState checks if state trie is fully present in the database or not. +func (bc *BlockChain) HasState(hash common.Hash) bool { + _, err := bc.stateCache.OpenTrie(hash) + return err == nil +} + // HasBlockAndState checks if a block and associated state trie is fully present // in the database or not, caching it if present. -func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool { +func (bc *BlockChain) HasBlockAndState(hash common.Hash, number uint64) bool { // Check first that the block itself is known - block := bc.GetBlockByHash(hash) + block := bc.GetBlock(hash, number) if block == nil { return false } - // Ensure the associated state is also present - _, err := bc.stateCache.OpenTrie(block.Root()) - return err == nil + return bc.HasState(block.Root()) } // GetBlock retrieves a block from the database by hash and number, @@ -526,7 +572,7 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { if block, ok := bc.blockCache.Get(hash); ok { return block.(*types.Block) } - block := GetBlock(bc.chainDb, hash, number) + block := GetBlock(bc.db, hash, number) if block == nil { return nil } @@ -543,13 +589,18 @@ func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block { // GetBlockByNumber retrieves a block from the database by number, caching it // (associated with its hash) if found. func (bc *BlockChain) GetBlockByNumber(number uint64) *types.Block { - hash := GetCanonicalHash(bc.chainDb, number) + hash := GetCanonicalHash(bc.db, number) if hash == (common.Hash{}) { return nil } return bc.GetBlock(hash, number) } +// GetReceiptsByHash retrieves the receipts for all transactions in a given block. +func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { + return GetBlockReceipts(bc.db, hash, GetBlockNumber(bc.db, hash)) +} + // GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors. // [deprecated by eth/62] func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) { @@ -577,6 +628,12 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types. return uncles } +// TrieNode retrieves a blob of data associated with a trie node (or code hash) +// either from ephemeral in-memory cache, or from persistent storage. +func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) { + return bc.stateCache.TrieDB().Node(hash) +} + // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { @@ -589,6 +646,33 @@ func (bc *BlockChain) Stop() { atomic.StoreInt32(&bc.procInterrupt, 1) bc.wg.Wait() + + // Ensure the state of a recent block is also stored to disk before exiting. + // It is fine if this state does not exist (fast start/stop cycle), but it is + // advisable to leave an N block gap from the head so 1) a restart loads up + // the last N blocks as sync assistance to remote nodes; 2) a restart during + // a (small) reorg doesn't require deep reprocesses; 3) chain "repair" from + // missing states are constantly tested. + // + // This may be tuned a bit on mainnet if its too annoying to reprocess the last + // N blocks. + if !bc.cacheConfig.Disabled { + triedb := bc.stateCache.TrieDB() + if number := bc.CurrentBlock().NumberU64(); number >= triesInMemory { + recent := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - triesInMemory + 1) + + log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) + if err := triedb.Commit(recent.Root(), true); err != nil { + log.Error("Failed to commit recent state trie", "err", err) + } + } + for !bc.triegc.Empty() { + triedb.Dereference(bc.triegc.PopItem().(common.Hash), common.Hash{}) + } + if size := triedb.Size(); size != 0 { + log.Error("Dangling trie nodes after full cleanup") + } + } log.Info("Blockchain manager stopped") } @@ -633,11 +717,11 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { } if bc.currentFastBlock.Hash() == hash { bc.currentFastBlock = bc.GetBlock(bc.currentFastBlock.ParentHash(), bc.currentFastBlock.NumberU64()-1) - WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()) + WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash()) } if bc.currentBlock.Hash() == hash { bc.currentBlock = bc.GetBlock(bc.currentBlock.ParentHash(), bc.currentBlock.NumberU64()-1) - WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()) + WriteHeadBlockHash(bc.db, bc.currentBlock.Hash()) } } } @@ -696,7 +780,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ stats = struct{ processed, ignored int32 }{} start = time.Now() bytes = 0 - batch = bc.chainDb.NewBatch() + batch = bc.db.NewBatch() ) for i, block := range blockChain { receipts := receiptChain[i] @@ -714,7 +798,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ continue } // Compute all the non-consensus fields of the receipts - SetReceiptsData(bc.config, block, receipts) + SetReceiptsData(bc.chainConfig, block, receipts) // Write all the data out into the database if err := WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil { return i, fmt.Errorf("failed to write block body: %v", err) @@ -747,7 +831,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ head := blockChain[len(blockChain)-1] if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case if bc.GetTd(bc.currentFastBlock.Hash(), bc.currentFastBlock.NumberU64()).Cmp(td) < 0 { - if err := WriteHeadFastBlockHash(bc.chainDb, head.Hash()); err != nil { + if err := WriteHeadFastBlockHash(bc.db, head.Hash()); err != nil { log.Crit("Failed to update head fast block hash", "err", err) } bc.currentFastBlock = head @@ -758,15 +842,33 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ log.Info("Imported new block receipts", "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), - "bytes", bytes, "number", head.Number(), "hash", head.Hash(), + "size", common.StorageSize(bytes), "ignored", stats.ignored) return 0, nil } -// WriteBlock writes the block to the chain. -func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +var lastWrite uint64 + +// WriteBlockWithoutState writes only the block and its metadata to the database, +// but does not write any state. This is used to construct competing side forks +// up to the point where they exceed the canonical total difficulty. +func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) { + bc.wg.Add(1) + defer bc.wg.Done() + + if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil { + return err + } + if err := WriteBlock(bc.db, block); err != nil { + return err + } + return nil +} + +// WriteBlockWithState writes the block and all associated state to the database. +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -787,17 +889,73 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R return NonStatTy, err } // Write other block data using a batch. - batch := bc.chainDb.NewBatch() + batch := bc.db.NewBatch() if err := WriteBlock(batch, block); err != nil { return NonStatTy, err } - if _, err := state.CommitTo(batch, bc.config.IsEIP158(block.Number())); err != nil { + root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) + if err != nil { return NonStatTy, err } + triedb := bc.stateCache.TrieDB() + + // If we're running an archive node, always flush + if bc.cacheConfig.Disabled { + if err := triedb.Commit(root, false); err != nil { + return NonStatTy, err + } + } else { + // Full but not archive node, do proper garbage collection + triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(root, -float32(block.NumberU64())) + + if current := block.NumberU64(); current > triesInMemory { + // Find the next state trie we need to commit + header := bc.GetHeaderByNumber(current - triesInMemory) + chosen := header.Number.Uint64() + + // Only write to disk if we exceeded our memory allowance *and* also have at + // least a given number of tries gapped. + var ( + size = triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieNodeLimit) * 1024 * 1024 + ) + if size > limit || bc.gcproc > bc.cacheConfig.TrieTimeLimit { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < lastWrite+triesInMemory { + switch { + case size >= 2*limit: + log.Error("Trie memory critical, forcing to disk", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory) + case bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit: + log.Error("Trie timing critical, forcing to disk", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory) + case size > limit: + log.Warn("Trie memory at dangerous levels", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory) + case bc.gcproc > bc.cacheConfig.TrieTimeLimit: + log.Warn("Trie timing at dangerous levels", "time", bc.gcproc, "limit", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory) + } + } + // If optimum or critical limits reached, write to disk + if chosen >= lastWrite+triesInMemory || size >= 2*limit || bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { + triedb.Commit(header.Root, true) + lastWrite = chosen + bc.gcproc = 0 + } + } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + triedb.Dereference(root.(common.Hash), common.Hash{}) + } + } + } if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil { return NonStatTy, err } - // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -818,7 +976,7 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R return NonStatTy, err } // Write hash preimages - if err := WritePreimages(bc.chainDb, block.NumberU64(), state.Preimages()); err != nil { + if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil { return NonStatTy, err } status = CanonStatTy @@ -910,31 +1068,60 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty if err == nil { err = bc.Validator().ValidateBody(block) } - if err != nil { - if err == ErrKnownBlock { - stats.ignored++ - continue + switch { + case err == ErrKnownBlock: + stats.ignored++ + continue + + case err == consensus.ErrFutureBlock: + // Allow up to MaxFuture second in the future blocks. If this limit is exceeded + // the chain is discarded and processed at a later time if given. + max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) + if block.Time().Cmp(max) > 0 { + return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) } + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue - if err == consensus.ErrFutureBlock { - // Allow up to MaxFuture second in the future blocks. If this limit - // is exceeded the chain is discarded and processed at a later time - // if given. - max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time().Cmp(max) > 0 { - return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) + case err == consensus.ErrPrunedAncestor: + // Block competing with the canonical chain, store in the db, but don't process + // until the competitor TD goes above the canonical TD + localTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64()) + externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) + if localTd.Cmp(externTd) > 0 { + if err = bc.WriteBlockWithoutState(block, externTd); err != nil { + return i, events, coalescedLogs, err } - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ continue } + // Competitor chain beat canonical, gather all blocks from the common ancestor + var winner []*types.Block - if err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()) { - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue + parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + for !bc.HasState(parent.Root()) { + winner = append(winner, parent) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + } + for j := 0; j < len(winner)/2; j++ { + winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] + } + // Import all the pruned blocks to make the state available + bc.chainmu.Unlock() + _, evs, logs, err := bc.insertChain(winner) + bc.chainmu.Lock() + events, coalescedLogs = evs, logs + + if err != nil { + return i, events, coalescedLogs, err } + case err != nil: bc.reportBlock(block, nil, err) return i, events, coalescedLogs, err } @@ -962,8 +1149,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty bc.reportBlock(block, receipts, err) return i, events, coalescedLogs, err } + proctime := time.Since(bstart) + // Write the block to the chain and get the status. - status, err := bc.WriteBlockAndState(block, receipts, state) + status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { return i, events, coalescedLogs, err } @@ -977,6 +1166,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block + // Only count canonical blocks for GC processing time + bc.gcproc += proctime + case SideStatTy: log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) @@ -986,7 +1178,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty } stats.processed++ stats.usedGas += usedGas - stats.report(chain, i) + stats.report(chain, i, bc.stateCache.TrieDB().Size()) } // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { @@ -1009,7 +1201,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int) { +func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -1024,7 +1216,7 @@ func (st *insertStats) report(chain []*types.Block, index int) { context := []interface{}{ "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), - "number", end.Number(), "hash", end.Hash(), + "number", end.Number(), "hash", end.Hash(), "cache", cache, } if st.queued > 0 { context = append(context, []interface{}{"queued", st.queued}...) @@ -1060,7 +1252,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // These logs are later announced as deleted. collectLogs = func(h common.Hash) { // Coalesce logs and set 'Removed'. - receipts := GetBlockReceipts(bc.chainDb, h, bc.hc.GetBlockNumber(h)) + receipts := GetBlockReceipts(bc.db, h, bc.hc.GetBlockNumber(h)) for _, receipt := range receipts { for _, log := range receipt.Logs { del := *log @@ -1129,7 +1321,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // insert the block in the canonical way, re-writing history bc.insert(newChain[i]) // write lookup entries for hash based transaction/receipt searches - if err := WriteTxLookupEntries(bc.chainDb, newChain[i]); err != nil { + if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil { return err } addedTxs = append(addedTxs, newChain[i].Transactions()...) @@ -1139,7 +1331,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // When transactions get deleted from the database that means the // receipts that were created in the fork must also be deleted for _, tx := range diff { - DeleteTxLookupEntry(bc.chainDb, tx.Hash()) + DeleteTxLookupEntry(bc.db, tx.Hash()) } if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) @@ -1231,7 +1423,7 @@ Hash: 0x%x Error: %v ############################## -`, bc.config, block.Number(), block.Hash(), receiptString, err)) +`, bc.chainConfig, block.Number(), block.Hash(), receiptString, err)) } // InsertHeaderChain attempts to insert the given header chain in to the local @@ -1338,7 +1530,7 @@ func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header { } // Config retrieves the blockchain's chain configuration. -func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } +func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index cbde3bcd2..635379161 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -46,7 +46,7 @@ func newTestBlockChain(fake bool) *BlockChain { if !fake { engine = ethash.NewTester() } - blockchain, err := NewBlockChain(db, gspec.Config, engine, vm.Config{}) + blockchain, err := NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) if err != nil { panic(err) } @@ -148,9 +148,9 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { return err } blockchain.mu.Lock() - WriteTd(blockchain.chainDb, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) - WriteBlock(blockchain.chainDb, block) - statedb.CommitTo(blockchain.chainDb, false) + WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) + WriteBlock(blockchain.db, block) + statedb.Commit(false) blockchain.mu.Unlock() } return nil @@ -166,8 +166,8 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) blockchain.mu.Lock() - WriteTd(blockchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) - WriteHeader(blockchain.chainDb, header) + WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) + WriteHeader(blockchain.db, header) blockchain.mu.Unlock() } return nil @@ -186,9 +186,9 @@ func TestLastBlock(t *testing.T) { bchain := newTestBlockChain(false) defer bchain.Stop() - block := makeBlockChain(bchain.CurrentBlock(), 1, ethash.NewFaker(), bchain.chainDb, 0)[0] + block := makeBlockChain(bchain.CurrentBlock(), 1, ethash.NewFaker(), bchain.db, 0)[0] bchain.insert(block) - if block.Hash() != GetHeadBlockHash(bchain.chainDb) { + if block.Hash() != GetHeadBlockHash(bchain.db) { t.Errorf("Write/Get HeadBlockHash failed") } } @@ -496,7 +496,7 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Create a new BlockChain and check that it rolled back the state. - ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), vm.Config{}) + ncm, err := NewBlockChain(bc.db, nil, bc.chainConfig, ethash.NewFaker(), vm.Config{}) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } @@ -609,7 +609,7 @@ func TestFastVsFullChains(t *testing.T) { // Import the chain as an archive node for the comparison baseline archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer archive.Stop() if n, err := archive.InsertChain(blocks); err != nil { @@ -618,7 +618,7 @@ func TestFastVsFullChains(t *testing.T) { // Fast import the chain as a non-archive node to test fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -696,7 +696,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) } @@ -709,7 +709,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a non-archive node and ensure all pointers are updated fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -730,7 +730,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { lightDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(lightDb) - light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + light, _ := NewBlockChain(lightDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } @@ -799,7 +799,7 @@ func TestChainTxReorgs(t *testing.T) { } }) // Import the chain. This runs all block validation rules. - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) if i, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert original chain[%d]: %v", i, err) } @@ -870,7 +870,7 @@ func TestLogReorgs(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() rmLogsCh := make(chan RemovedLogsEvent) @@ -917,7 +917,7 @@ func TestReorgSideEvent(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {}) @@ -992,7 +992,7 @@ func TestCanonicalBlockRetrieval(t *testing.T) { bc := newTestBlockChain(true) defer bc.Stop() - chain, _ := GenerateChain(bc.config, bc.genesisBlock, ethash.NewFaker(), bc.chainDb, 10, func(i int, gen *BlockGen) {}) + chain, _ := GenerateChain(bc.chainConfig, bc.genesisBlock, ethash.NewFaker(), bc.db, 10, func(i int, gen *BlockGen) {}) var pend sync.WaitGroup pend.Add(len(chain)) @@ -1003,14 +1003,14 @@ func TestCanonicalBlockRetrieval(t *testing.T) { // try to retrieve a block by its canonical hash and see if the block data can be retrieved. for { - ch := GetCanonicalHash(bc.chainDb, block.NumberU64()) + ch := GetCanonicalHash(bc.db, block.NumberU64()) if ch == (common.Hash{}) { continue // busy wait for canonical hash to be written } if ch != block.Hash() { t.Fatalf("unknown canonical hash, want %s, got %s", block.Hash().Hex(), ch.Hex()) } - fb := GetBlock(bc.chainDb, ch, block.NumberU64()) + fb := GetBlock(bc.db, ch, block.NumberU64()) if fb == nil { t.Fatalf("unable to retrieve block %d for canonical hash: %s", block.NumberU64(), ch.Hex()) } @@ -1043,7 +1043,7 @@ func TestEIP155Transition(t *testing.T) { genesis = gspec.MustCommit(db) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 4, func(i int, block *BlockGen) { @@ -1151,7 +1151,7 @@ func TestEIP161AccountRemoval(t *testing.T) { } genesis = gspec.MustCommit(db) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, block *BlockGen) { @@ -1226,7 +1226,7 @@ func TestBlockchainHeaderchainReorgConsistency(t *testing.T) { diskdb, _ := ethdb.NewMemDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, params.TestChainConfig, engine, vm.Config{}) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1245,3 +1245,102 @@ func TestBlockchainHeaderchainReorgConsistency(t *testing.T) { } } } + +// Tests that importing small side forks doesn't leave junk in the trie database +// cache (which would eventually cause memory issues). +func TestTrieForkGC(t *testing.T) { + // Generate a canonical chain to act as the main dataset + engine := ethash.NewFaker() + + db, _ := ethdb.NewMemDatabase() + genesis := new(Genesis).MustCommit(db) + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) + + // Generate a bunch of fork blocks, each side forking from the canonical chain + forks := make([]*types.Block, len(blocks)) + for i := 0; i < len(forks); i++ { + parent := genesis + if i > 0 { + parent = blocks[i-1] + } + fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) + forks[i] = fork[0] + } + // Import the canonical and fork chain side by side, forcing the trie cache to cache both + diskdb, _ := ethdb.NewMemDatabase() + new(Genesis).MustCommit(diskdb) + + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + for i := 0; i < len(blocks); i++ { + if _, err := chain.InsertChain(blocks[i : i+1]); err != nil { + t.Fatalf("block %d: failed to insert into chain: %v", i, err) + } + if _, err := chain.InsertChain(forks[i : i+1]); err != nil { + t.Fatalf("fork %d: failed to insert into chain: %v", i, err) + } + } + // Dereference all the recent tries and ensure no past trie is left in + for i := 0; i < triesInMemory; i++ { + chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root(), common.Hash{}) + chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root(), common.Hash{}) + } + if len(chain.stateCache.TrieDB().Nodes()) > 0 { + t.Fatalf("stale tries still alive after garbase collection") + } +} + +// Tests that doing large reorgs works even if the state associated with the +// forking point is not available any more. +func TestLargeReorgTrieGC(t *testing.T) { + // Generate the original common chain segment and the two competing forks + engine := ethash.NewFaker() + + db, _ := ethdb.NewMemDatabase() + genesis := new(Genesis).MustCommit(db) + + shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 64, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) + original, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) + competitor, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) }) + + // Import the shared chain and the original canonical one + diskdb, _ := ethdb.NewMemDatabase() + new(Genesis).MustCommit(diskdb) + + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + if _, err := chain.InsertChain(shared); err != nil { + t.Fatalf("failed to insert shared chain: %v", err) + } + if _, err := chain.InsertChain(original); err != nil { + t.Fatalf("failed to insert shared chain: %v", err) + } + // Ensure that the state associated with the forking point is pruned away + if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil { + t.Fatalf("common-but-old ancestor still cache") + } + // Import the competitor chain without exceeding the canonical's TD and ensure + // we have not processed any of the blocks (protection against malicious blocks) + if _, err := chain.InsertChain(competitor[:len(competitor)-2]); err != nil { + t.Fatalf("failed to insert competitor chain: %v", err) + } + for i, block := range competitor[:len(competitor)-2] { + if node, _ := chain.stateCache.TrieDB().Node(block.Root()); node != nil { + t.Fatalf("competitor %d: low TD chain became processed", i) + } + } + // Import the head of the competitor chain, triggering the reorg and ensure we + // successfully reprocess all the stashed away blocks. + if _, err := chain.InsertChain(competitor[len(competitor)-2:]); err != nil { + t.Fatalf("failed to finalize competitor chain: %v", err) + } + for i, block := range competitor[:len(competitor)-triesInMemory] { + if node, _ := chain.stateCache.TrieDB().Node(block.Root()); node != nil { + t.Fatalf("competitor %d: competing chain state missing", i) + } + } +} diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 7fb184aaa..158ed8324 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -203,6 +203,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainE if header.ParentHash != prevHash { // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then) // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly? + + // TODO(karalabe): This operation is expensive and might block, causing the event system to + // potentially also lock up. We need to do with on a different thread somehow. if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { c.newHead(h.Number.Uint64(), true) } diff --git a/core/chain_makers.go b/core/chain_makers.go index 5e264a994..6744428ff 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -166,7 +166,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse genblock := func(i int, parent *types.Block, statedb *state.StateDB) (*types.Block, types.Receipts) { // TODO(karalabe): This is needed for clique, which depends on multiple blocks. // It's nonetheless ugly to spin up a blockchain here. Get rid of this somehow. - blockchain, _ := NewBlockChain(db, config, engine, vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, config, engine, vm.Config{}) defer blockchain.Stop() b := &BlockGen{i: i, parent: parent, chain: blocks, chainReader: blockchain, statedb: statedb, config: config, engine: engine} @@ -192,10 +192,13 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse if b.engine != nil { block, _ := b.engine.Finalize(b.chainReader, b.header, statedb, b.txs, b.uncles, b.receipts) // Write state changes to db - _, err := statedb.CommitTo(db, config.IsEIP158(b.header.Number)) + root, err := statedb.Commit(config.IsEIP158(b.header.Number)) if err != nil { panic(fmt.Sprintf("state write error: %v", err)) } + if err := statedb.Database().TrieDB().Commit(root, false); err != nil { + panic(fmt.Sprintf("trie write error: %v", err)) + } return block, b.receipts } return nil, nil @@ -246,7 +249,7 @@ func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *B db, _ := ethdb.NewMemDatabase() genesis := gspec.MustCommit(db) - blockchain, _ := NewBlockChain(db, params.AllEthashProtocolChanges, engine, vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}) // Create and inject the requested chain if n == 0 { return db, blockchain, nil diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index a3b80da29..93be43ddc 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -79,7 +79,7 @@ func ExampleGenerateChain() { }) // Import the chain. This runs all block validation rules. - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() if i, err := blockchain.InsertChain(chain); err != nil { diff --git a/core/dao_test.go b/core/dao_test.go index 43e2982a5..e0a3e3ff3 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -45,7 +45,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { proConf.DAOForkBlock = forkBlock proConf.DAOForkSupport = true - proBc, _ := NewBlockChain(proDb, &proConf, ethash.NewFaker(), vm.Config{}) + proBc, _ := NewBlockChain(proDb, nil, &proConf, ethash.NewFaker(), vm.Config{}) defer proBc.Stop() conDb, _ := ethdb.NewMemDatabase() @@ -55,7 +55,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { conConf.DAOForkBlock = forkBlock conConf.DAOForkSupport = false - conBc, _ := NewBlockChain(conDb, &conConf, ethash.NewFaker(), vm.Config{}) + conBc, _ := NewBlockChain(conDb, nil, &conConf, ethash.NewFaker(), vm.Config{}) defer conBc.Stop() if _, err := proBc.InsertChain(prefix); err != nil { @@ -69,7 +69,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a pro-fork block, and try to feed into the no-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - bc, _ := NewBlockChain(db, &conConf, ethash.NewFaker(), vm.Config{}) + bc, _ := NewBlockChain(db, nil, &conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -79,6 +79,9 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import contra-fork chain for expansion: %v", err) } + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + t.Fatalf("failed to commit contra-fork head for expansion: %v", err) + } blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) if _, err := conBc.InsertChain(blocks); err == nil { t.Fatalf("contra-fork chain accepted pro-fork block: %v", blocks[0]) @@ -91,7 +94,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a no-fork block, and try to feed into the pro-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - bc, _ = NewBlockChain(db, &proConf, ethash.NewFaker(), vm.Config{}) + bc, _ = NewBlockChain(db, nil, &proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) @@ -101,6 +104,9 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import pro-fork chain for expansion: %v", err) } + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + t.Fatalf("failed to commit pro-fork head for expansion: %v", err) + } blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) if _, err := proBc.InsertChain(blocks); err == nil { t.Fatalf("pro-fork chain accepted contra-fork block: %v", blocks[0]) @@ -114,7 +120,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that contra-forkers accept pro-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - bc, _ := NewBlockChain(db, &conConf, ethash.NewFaker(), vm.Config{}) + bc, _ := NewBlockChain(db, nil, &conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -124,6 +130,9 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import contra-fork chain for expansion: %v", err) } + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + t.Fatalf("failed to commit contra-fork head for expansion: %v", err) + } blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) if _, err := conBc.InsertChain(blocks); err != nil { t.Fatalf("contra-fork chain didn't accept pro-fork block post-fork: %v", err) @@ -131,7 +140,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that pro-forkers accept contra-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - bc, _ = NewBlockChain(db, &proConf, ethash.NewFaker(), vm.Config{}) + bc, _ = NewBlockChain(db, nil, &proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) @@ -141,6 +150,9 @@ func TestDAOForkRangeExtradata(t *testing.T) { if _, err := bc.InsertChain(blocks); err != nil { t.Fatalf("failed to import pro-fork chain for expansion: %v", err) } + if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true); err != nil { + t.Fatalf("failed to commit pro-fork head for expansion: %v", err) + } blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) if _, err := proBc.InsertChain(blocks); err != nil { t.Fatalf("pro-fork chain didn't accept contra-fork block post-fork: %v", err) diff --git a/core/genesis.go b/core/genesis.go index e22985b80..b6ead2250 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -169,10 +169,9 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig // Check whether the genesis block is already written. if genesis != nil { - block, _ := genesis.ToBlock() - hash := block.Hash() + hash := genesis.ToBlock(nil).Hash() if hash != stored { - return genesis.Config, block.Hash(), &GenesisMismatchError{stored, hash} + return genesis.Config, hash, &GenesisMismatchError{stored, hash} } } @@ -220,9 +219,12 @@ func (g *Genesis) configOrDefault(ghash common.Hash) *params.ChainConfig { } } -// ToBlock creates the block and state of a genesis specification. -func (g *Genesis) ToBlock() (*types.Block, *state.StateDB) { - db, _ := ethdb.NewMemDatabase() +// ToBlock creates the genesis block and writes state of a genesis specification +// to the given database (or discards it if nil). +func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { + if db == nil { + db, _ = ethdb.NewMemDatabase() + } statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) for addr, account := range g.Alloc { statedb.AddBalance(addr, account.Balance) @@ -252,19 +254,19 @@ func (g *Genesis) ToBlock() (*types.Block, *state.StateDB) { if g.Difficulty == nil { head.Difficulty = params.GenesisDifficulty } - return types.NewBlock(head, nil, nil, nil), statedb + statedb.Commit(false) + statedb.Database().TrieDB().Commit(root, true) + + return types.NewBlock(head, nil, nil, nil) } // Commit writes the block and state of a genesis specification to the database. // The block is committed as the canonical head block. func (g *Genesis) Commit(db ethdb.Database) (*types.Block, error) { - block, statedb := g.ToBlock() + block := g.ToBlock(db) if block.Number().Sign() != 0 { return nil, fmt.Errorf("can't commit genesis block with number > 0") } - if _, err := statedb.CommitTo(db, false); err != nil { - return nil, fmt.Errorf("cannot write state: %v", err) - } if err := WriteTd(db, block.Hash(), block.NumberU64(), g.Difficulty); err != nil { return nil, err } diff --git a/core/genesis_test.go b/core/genesis_test.go index 2fe931b24..cd548d4b1 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -30,11 +30,11 @@ import ( ) func TestDefaultGenesisBlock(t *testing.T) { - block, _ := DefaultGenesisBlock().ToBlock() + block := DefaultGenesisBlock().ToBlock(nil) if block.Hash() != params.MainnetGenesisHash { t.Errorf("wrong mainnet genesis hash, got %v, want %v", block.Hash(), params.MainnetGenesisHash) } - block, _ = DefaultTestnetGenesisBlock().ToBlock() + block = DefaultTestnetGenesisBlock().ToBlock(nil) if block.Hash() != params.TestnetGenesisHash { t.Errorf("wrong testnet genesis hash, got %v, want %v", block.Hash(), params.TestnetGenesisHash) } @@ -118,7 +118,7 @@ func TestSetupGenesis(t *testing.T) { // Commit the 'old' genesis block with Homestead transition at #2. // Advance to block #4, past the homestead transition block of customg. genesis := oldcustomg.MustCommit(db) - bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}) + bc, _ := NewBlockChain(db, nil, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}) defer bc.Stop() bc.SetValidator(bproc{}) bc.InsertChain(makeBlockChainWithDiff(genesis, []int{2, 3, 4, 5}, 0)) diff --git a/core/state/database.go b/core/state/database.go index 946625e76..36926ec69 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -40,16 +40,23 @@ const ( // Database wraps access to tries and contract code. type Database interface { - // Accessing tries: // OpenTrie opens the main account trie. - // OpenStorageTrie opens the storage trie of an account. OpenTrie(root common.Hash) (Trie, error) + + // OpenStorageTrie opens the storage trie of an account. OpenStorageTrie(addrHash, root common.Hash) (Trie, error) - // Accessing contract code: - ContractCode(addrHash, codeHash common.Hash) ([]byte, error) - ContractCodeSize(addrHash, codeHash common.Hash) (int, error) + // CopyTrie returns an independent copy of the given trie. CopyTrie(Trie) Trie + + // ContractCode retrieves a particular contract's code. + ContractCode(addrHash, codeHash common.Hash) ([]byte, error) + + // ContractCodeSize retrieves a particular contracts code's size. + ContractCodeSize(addrHash, codeHash common.Hash) (int, error) + + // TrieDB retrieves the low level trie database used for data storage. + TrieDB() *trie.Database } // Trie is a Ethereum Merkle Trie. @@ -57,26 +64,33 @@ type Trie interface { TryGet(key []byte) ([]byte, error) TryUpdate(key, value []byte) error TryDelete(key []byte) error - CommitTo(trie.DatabaseWriter) (common.Hash, error) + Commit(onleaf trie.LeafCallback) (common.Hash, error) Hash() common.Hash NodeIterator(startKey []byte) trie.NodeIterator GetKey([]byte) []byte // TODO(fjl): remove this when SecureTrie is removed + Prove(key []byte, fromLevel uint, proofDb ethdb.Putter) error } // NewDatabase creates a backing store for state. The returned database is safe for -// concurrent use and retains cached trie nodes in memory. +// concurrent use and retains cached trie nodes in memory. The pool is an optional +// intermediate trie-node memory pool between the low level storage layer and the +// high level trie abstraction. func NewDatabase(db ethdb.Database) Database { csc, _ := lru.New(codeSizeCacheSize) - return &cachingDB{db: db, codeSizeCache: csc} + return &cachingDB{ + db: trie.NewDatabase(db), + codeSizeCache: csc, + } } type cachingDB struct { - db ethdb.Database + db *trie.Database mu sync.Mutex pastTries []*trie.SecureTrie codeSizeCache *lru.Cache } +// OpenTrie opens the main account trie. func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { db.mu.Lock() defer db.mu.Unlock() @@ -105,10 +119,12 @@ func (db *cachingDB) pushTrie(t *trie.SecureTrie) { } } +// OpenStorageTrie opens the storage trie of an account. func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { return trie.NewSecure(root, db.db, 0) } +// CopyTrie returns an independent copy of the given trie. func (db *cachingDB) CopyTrie(t Trie) Trie { switch t := t.(type) { case cachedTrie: @@ -120,14 +136,16 @@ func (db *cachingDB) CopyTrie(t Trie) Trie { } } +// ContractCode retrieves a particular contract's code. func (db *cachingDB) ContractCode(addrHash, codeHash common.Hash) ([]byte, error) { - code, err := db.db.Get(codeHash[:]) + code, err := db.db.Node(codeHash) if err == nil { db.codeSizeCache.Add(codeHash, len(code)) } return code, err } +// ContractCodeSize retrieves a particular contracts code's size. func (db *cachingDB) ContractCodeSize(addrHash, codeHash common.Hash) (int, error) { if cached, ok := db.codeSizeCache.Get(codeHash); ok { return cached.(int), nil @@ -139,16 +157,25 @@ func (db *cachingDB) ContractCodeSize(addrHash, codeHash common.Hash) (int, erro return len(code), err } +// TrieDB retrieves any intermediate trie-node caching layer. +func (db *cachingDB) TrieDB() *trie.Database { + return db.db +} + // cachedTrie inserts its trie into a cachingDB on commit. type cachedTrie struct { *trie.SecureTrie db *cachingDB } -func (m cachedTrie) CommitTo(dbw trie.DatabaseWriter) (common.Hash, error) { - root, err := m.SecureTrie.CommitTo(dbw) +func (m cachedTrie) Commit(onleaf trie.LeafCallback) (common.Hash, error) { + root, err := m.SecureTrie.Commit(onleaf) if err == nil { m.db.pushTrie(m.SecureTrie) } return root, err } + +func (m cachedTrie) Prove(key []byte, fromLevel uint, proofDb ethdb.Putter) error { + return m.SecureTrie.Prove(key, fromLevel, proofDb) +} diff --git a/core/state/iterator_test.go b/core/state/iterator_test.go index ff66ba7a9..9e46c851c 100644 --- a/core/state/iterator_test.go +++ b/core/state/iterator_test.go @@ -21,12 +21,13 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" ) // Tests that the node iterator indeed walks over the entire database contents. func TestNodeIteratorCoverage(t *testing.T) { // Create some arbitrary test state to iterate - db, mem, root, _ := makeTestState() + db, root, _ := makeTestState() state, err := New(root, db) if err != nil { @@ -39,14 +40,18 @@ func TestNodeIteratorCoverage(t *testing.T) { hashes[it.Hash] = struct{}{} } } - - // Cross check the hashes and the database itself + // Cross check the iterated hashes and the database/nodepool content for hash := range hashes { - if _, err := mem.Get(hash.Bytes()); err != nil { - t.Errorf("failed to retrieve reported node %x: %v", hash, err) + if _, err := db.TrieDB().Node(hash); err != nil { + t.Errorf("failed to retrieve reported node %x", hash) + } + } + for _, hash := range db.TrieDB().Nodes() { + if _, ok := hashes[hash]; !ok { + t.Errorf("state entry not reported %x", hash) } } - for _, key := range mem.Keys() { + for _, key := range db.TrieDB().DiskDB().(*ethdb.MemDatabase).Keys() { if bytes.HasPrefix(key, []byte("secure-key-")) { continue } diff --git a/core/state/state_object.go b/core/state/state_object.go index b2378c69c..b2112bfae 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/trie" ) var emptyCodeHash = crypto.Keccak256(nil) @@ -238,12 +237,12 @@ func (self *stateObject) updateRoot(db Database) { // CommitTrie the storage trie of the object to dwb. // This updates the trie root. -func (self *stateObject) CommitTrie(db Database, dbw trie.DatabaseWriter) error { +func (self *stateObject) CommitTrie(db Database) error { self.updateTrie(db) if self.dbErr != nil { return self.dbErr } - root, err := self.trie.CommitTo(dbw) + root, err := self.trie.Commit(nil) if err == nil { self.data.Root = root } diff --git a/core/state/state_test.go b/core/state/state_test.go index bbae3685b..6d42d63d8 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -48,7 +48,7 @@ func (s *StateSuite) TestDump(c *checker.C) { // write some of them to the trie s.state.updateStateObject(obj1) s.state.updateStateObject(obj2) - s.state.CommitTo(s.db, false) + s.state.Commit(false) // check that dump contains the state objects that are in trie got := string(s.state.Dump()) @@ -97,7 +97,7 @@ func (s *StateSuite) TestNull(c *checker.C) { //value := common.FromHex("0x823140710bf13990e4500136726d8b55") var value common.Hash s.state.SetState(address, common.Hash{}, value) - s.state.CommitTo(s.db, false) + s.state.Commit(false) value = s.state.GetState(address, common.Hash{}) if !common.EmptyHash(value) { c.Errorf("expected empty hash. got %x", value) @@ -155,7 +155,7 @@ func TestSnapshot2(t *testing.T) { so0.deleted = false state.setStateObject(so0) - root, _ := state.CommitTo(db, false) + root, _ := state.Commit(false) state.Reset(root) // and one with deleted == true diff --git a/core/state/statedb.go b/core/state/statedb.go index 8e29104d5..776693e24 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -36,6 +36,14 @@ type revision struct { journalIndex int } +var ( + // emptyState is the known hash of an empty state trie entry. + emptyState = crypto.Keccak256Hash(nil) + + // emptyCode is the known hash of the empty EVM bytecode. + emptyCode = crypto.Keccak256Hash(nil) +) + // StateDBs within the ethereum protocol are used to store anything // within the merkle trie. StateDBs take care of caching and storing // nested states. It's the general query interface to retrieve: @@ -235,6 +243,11 @@ func (self *StateDB) GetState(a common.Address, b common.Hash) common.Hash { return common.Hash{} } +// Database retrieves the low level database supporting the lower level trie ops. +func (self *StateDB) Database() Database { + return self.db +} + // StorageTrie returns the storage trie of an account. // The return value is a copy and is nil for non-existent accounts. func (self *StateDB) StorageTrie(a common.Address) Trie { @@ -568,8 +581,8 @@ func (s *StateDB) clearJournalAndRefund() { s.refund = 0 } -// CommitTo writes the state to the given database. -func (s *StateDB) CommitTo(dbw trie.DatabaseWriter, deleteEmptyObjects bool) (root common.Hash, err error) { +// Commit writes the state to the underlying in-memory trie database. +func (s *StateDB) Commit(deleteEmptyObjects bool) (root common.Hash, err error) { defer s.clearJournalAndRefund() // Commit objects to the trie. @@ -583,13 +596,11 @@ func (s *StateDB) CommitTo(dbw trie.DatabaseWriter, deleteEmptyObjects bool) (ro case isDirty: // Write any contract code associated with the state object if stateObject.code != nil && stateObject.dirtyCode { - if err := dbw.Put(stateObject.CodeHash(), stateObject.code); err != nil { - return common.Hash{}, err - } + s.db.TrieDB().Insert(common.BytesToHash(stateObject.CodeHash()), stateObject.code) stateObject.dirtyCode = false } // Write any storage changes in the state object to its storage trie. - if err := stateObject.CommitTrie(s.db, dbw); err != nil { + if err := stateObject.CommitTrie(s.db); err != nil { return common.Hash{}, err } // Update the object in the main account trie. @@ -598,7 +609,20 @@ func (s *StateDB) CommitTo(dbw trie.DatabaseWriter, deleteEmptyObjects bool) (ro delete(s.stateObjectsDirty, addr) } // Write trie changes. - root, err = s.trie.CommitTo(dbw) + root, err = s.trie.Commit(func(leaf []byte, parent common.Hash) error { + var account Account + if err := rlp.DecodeBytes(leaf, &account); err != nil { + return nil + } + if account.Root != emptyState { + s.db.TrieDB().Reference(account.Root, parent) + } + code := common.BytesToHash(account.CodeHash) + if code != emptyCode { + s.db.TrieDB().Reference(code, parent) + } + return nil + }) log.Debug("Trie cache stats after commit", "misses", trie.CacheMisses(), "unloads", trie.CacheUnloads()) return root, err } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 5c80e3aa5..d9e3d9b79 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -97,10 +97,10 @@ func TestIntermediateLeaks(t *testing.T) { } // Commit and cross check the databases. - if _, err := transState.CommitTo(transDb, false); err != nil { + if _, err := transState.Commit(false); err != nil { t.Fatalf("failed to commit transition state: %v", err) } - if _, err := finalState.CommitTo(finalDb, false); err != nil { + if _, err := finalState.Commit(false); err != nil { t.Fatalf("failed to commit final state: %v", err) } for _, key := range finalDb.Keys() { @@ -122,8 +122,8 @@ func TestIntermediateLeaks(t *testing.T) { // https://github.com/ethereum/go-ethereum/pull/15549. func TestCopy(t *testing.T) { // Create a random state test to copy and modify "independently" - mem, _ := ethdb.NewMemDatabase() - orig, _ := New(common.Hash{}, NewDatabase(mem)) + db, _ := ethdb.NewMemDatabase() + orig, _ := New(common.Hash{}, NewDatabase(db)) for i := byte(0); i < 255; i++ { obj := orig.GetOrNewStateObject(common.BytesToAddress([]byte{i})) @@ -346,11 +346,10 @@ func (test *snapshotTest) run() bool { } action.fn(action, state) } - // Revert all snapshots in reverse order. Each revert must yield a state // that is equivalent to fresh state with all actions up the snapshot applied. for sindex--; sindex >= 0; sindex-- { - checkstate, _ := New(common.Hash{}, NewDatabase(db)) + checkstate, _ := New(common.Hash{}, state.Database()) for _, action := range test.actions[:test.snapshots[sindex]] { action.fn(action, checkstate) } @@ -409,7 +408,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error { func (s *StateSuite) TestTouchDelete(c *check.C) { s.state.GetOrNewStateObject(common.Address{}) - root, _ := s.state.CommitTo(s.db, false) + root, _ := s.state.Commit(false) s.state.Reset(root) snapshot := s.state.Snapshot() @@ -417,7 +416,6 @@ func (s *StateSuite) TestTouchDelete(c *check.C) { if len(s.state.stateObjectsDirty) != 1 { c.Fatal("expected one dirty state object") } - s.state.RevertToSnapshot(snapshot) if len(s.state.stateObjectsDirty) != 0 { c.Fatal("expected no dirty state object") diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 06c572ea6..8f14a44e7 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -36,10 +36,10 @@ type testAccount struct { } // makeTestState create a sample test state to test node-wise reconstruction. -func makeTestState() (Database, *ethdb.MemDatabase, common.Hash, []*testAccount) { +func makeTestState() (Database, common.Hash, []*testAccount) { // Create an empty state - mem, _ := ethdb.NewMemDatabase() - db := NewDatabase(mem) + diskdb, _ := ethdb.NewMemDatabase() + db := NewDatabase(diskdb) state, _ := New(common.Hash{}, db) // Fill it with some arbitrary data @@ -61,10 +61,10 @@ func makeTestState() (Database, *ethdb.MemDatabase, common.Hash, []*testAccount) state.updateStateObject(obj) accounts = append(accounts, acc) } - root, _ := state.CommitTo(mem, false) + root, _ := state.Commit(false) // Return the generated state - return db, mem, root, accounts + return db, root, accounts } // checkStateAccounts cross references a reconstructed state with an expected @@ -96,7 +96,7 @@ func checkTrieConsistency(db ethdb.Database, root common.Hash) error { if v, _ := db.Get(root[:]); v == nil { return nil // Consider a non existent state consistent. } - trie, err := trie.New(root, db) + trie, err := trie.New(root, trie.NewDatabase(db)) if err != nil { return err } @@ -138,7 +138,7 @@ func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, func testIterativeStateSync(t *testing.T, batch int) { // Create a random state to copy - _, srcMem, srcRoot, srcAccounts := makeTestState() + srcDb, srcRoot, srcAccounts := makeTestState() // Create a destination state and sync with the scheduler dstDb, _ := ethdb.NewMemDatabase() @@ -148,9 +148,9 @@ func testIterativeStateSync(t *testing.T, batch int) { for len(queue) > 0 { results := make([]trie.SyncResult, len(queue)) for i, hash := range queue { - data, err := srcMem.Get(hash.Bytes()) + data, err := srcDb.TrieDB().Node(hash) if err != nil { - t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + t.Fatalf("failed to retrieve node data for %x", hash) } results[i] = trie.SyncResult{Hash: hash, Data: data} } @@ -170,7 +170,7 @@ func testIterativeStateSync(t *testing.T, batch int) { // partial results are returned, and the others sent only later. func TestIterativeDelayedStateSync(t *testing.T) { // Create a random state to copy - _, srcMem, srcRoot, srcAccounts := makeTestState() + srcDb, srcRoot, srcAccounts := makeTestState() // Create a destination state and sync with the scheduler dstDb, _ := ethdb.NewMemDatabase() @@ -181,9 +181,9 @@ func TestIterativeDelayedStateSync(t *testing.T) { // Sync only half of the scheduled nodes results := make([]trie.SyncResult, len(queue)/2+1) for i, hash := range queue[:len(results)] { - data, err := srcMem.Get(hash.Bytes()) + data, err := srcDb.TrieDB().Node(hash) if err != nil { - t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + t.Fatalf("failed to retrieve node data for %x", hash) } results[i] = trie.SyncResult{Hash: hash, Data: data} } @@ -207,7 +207,7 @@ func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomS func testIterativeRandomStateSync(t *testing.T, batch int) { // Create a random state to copy - _, srcMem, srcRoot, srcAccounts := makeTestState() + srcDb, srcRoot, srcAccounts := makeTestState() // Create a destination state and sync with the scheduler dstDb, _ := ethdb.NewMemDatabase() @@ -221,9 +221,9 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { // Fetch all the queued nodes in a random order results := make([]trie.SyncResult, 0, len(queue)) for hash := range queue { - data, err := srcMem.Get(hash.Bytes()) + data, err := srcDb.TrieDB().Node(hash) if err != nil { - t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + t.Fatalf("failed to retrieve node data for %x", hash) } results = append(results, trie.SyncResult{Hash: hash, Data: data}) } @@ -247,7 +247,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { // partial results are returned (Even those randomly), others sent only later. func TestIterativeRandomDelayedStateSync(t *testing.T) { // Create a random state to copy - _, srcMem, srcRoot, srcAccounts := makeTestState() + srcDb, srcRoot, srcAccounts := makeTestState() // Create a destination state and sync with the scheduler dstDb, _ := ethdb.NewMemDatabase() @@ -263,9 +263,9 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { for hash := range queue { delete(queue, hash) - data, err := srcMem.Get(hash.Bytes()) + data, err := srcDb.TrieDB().Node(hash) if err != nil { - t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + t.Fatalf("failed to retrieve node data for %x", hash) } results = append(results, trie.SyncResult{Hash: hash, Data: data}) @@ -292,9 +292,9 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { // the database. func TestIncompleteStateSync(t *testing.T) { // Create a random state to copy - _, srcMem, srcRoot, srcAccounts := makeTestState() + srcDb, srcRoot, srcAccounts := makeTestState() - checkTrieConsistency(srcMem, srcRoot) + checkTrieConsistency(srcDb.TrieDB().DiskDB().(ethdb.Database), srcRoot) // Create a destination state and sync with the scheduler dstDb, _ := ethdb.NewMemDatabase() @@ -306,9 +306,9 @@ func TestIncompleteStateSync(t *testing.T) { // Fetch a batch of state nodes results := make([]trie.SyncResult, len(queue)) for i, hash := range queue { - data, err := srcMem.Get(hash.Bytes()) + data, err := srcDb.TrieDB().Node(hash) if err != nil { - t.Fatalf("failed to retrieve node data for %x: %v", hash, err) + t.Fatalf("failed to retrieve node data for %x", hash) } results[i] = trie.SyncResult{Hash: hash, Data: data} } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index cd11f2ba2..158b9776b 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -78,8 +78,8 @@ func pricedTransaction(nonce uint64, gaslimit uint64, gasprice *big.Int, key *ec } func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { - db, _ := ethdb.NewMemDatabase() - statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + diskdb, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, state.NewDatabase(diskdb)) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} key, _ := crypto.GenerateKey() diff --git a/core/types/block.go b/core/types/block.go index ffe317342..92b868d9d 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -25,6 +25,7 @@ import ( "sort" "sync/atomic" "time" + "unsafe" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -121,6 +122,12 @@ func (h *Header) HashNoNonce() common.Hash { }) } +// Size returns the approximate memory used by all internal contents. It is used +// to approximate and limit the memory consumption of various caches. +func (h *Header) Size() common.StorageSize { + return common.StorageSize(unsafe.Sizeof(*h)) + common.StorageSize(len(h.Extra)+(h.Difficulty.BitLen()+h.Number.BitLen()+h.Time.BitLen())/8) +} + func rlpHash(x interface{}) (h common.Hash) { hw := sha3.NewKeccak256() rlp.Encode(hw, x) @@ -322,6 +329,8 @@ func (b *Block) HashNoNonce() common.Hash { return b.header.HashNoNonce() } +// Size returns the true RLP encoded storage size of the block, either by encoding +// and returning it, or returning a previsouly cached value. func (b *Block) Size() common.StorageSize { if size := b.size.Load(); size != nil { return size.(common.StorageSize) diff --git a/core/types/receipt.go b/core/types/receipt.go index 208d54aaa..f945f6f6a 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "unsafe" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -136,6 +137,18 @@ func (r *Receipt) statusEncoding() []byte { return r.PostState } +// Size returns the approximate memory used by all internal contents. It is used +// to approximate and limit the memory consumption of various caches. +func (r *Receipt) Size() common.StorageSize { + size := common.StorageSize(unsafe.Sizeof(*r)) + common.StorageSize(len(r.PostState)) + + size += common.StorageSize(len(r.Logs)) * common.StorageSize(unsafe.Sizeof(Log{})) + for _, log := range r.Logs { + size += common.StorageSize(len(log.Topics)*common.HashLength + len(log.Data)) + } + return size +} + // String implements the Stringer interface. func (r *Receipt) String() string { if len(r.PostState) == 0 { diff --git a/core/types/transaction.go b/core/types/transaction.go index a7ed211e4..5660582ba 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -206,6 +206,8 @@ func (tx *Transaction) Hash() common.Hash { return v } +// Size returns the true RLP encoded storage size of the transaction, either by +// encoding and returning it, or returning a previsouly cached value. func (tx *Transaction) Size() common.StorageSize { if size := tx.size.Load(); size != nil { return size.(common.StorageSize) -- cgit v1.2.3