From 55599ee95d4151a2502465e0afc7c47bd1acba77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 5 Feb 2018 18:40:32 +0200 Subject: core, trie: intermediate mempool between trie and database (#15857) This commit reduces database I/O by not writing every state trie to disk. --- core/blockchain_test.go | 145 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 23 deletions(-) (limited to 'core/blockchain_test.go') diff --git a/core/blockchain_test.go b/core/blockchain_test.go index cbde3bcd2..635379161 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -46,7 +46,7 @@ func newTestBlockChain(fake bool) *BlockChain { if !fake { engine = ethash.NewTester() } - blockchain, err := NewBlockChain(db, gspec.Config, engine, vm.Config{}) + blockchain, err := NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) if err != nil { panic(err) } @@ -148,9 +148,9 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { return err } blockchain.mu.Lock() - WriteTd(blockchain.chainDb, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) - WriteBlock(blockchain.chainDb, block) - statedb.CommitTo(blockchain.chainDb, false) + WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) + WriteBlock(blockchain.db, block) + statedb.Commit(false) blockchain.mu.Unlock() } return nil @@ -166,8 +166,8 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) blockchain.mu.Lock() - WriteTd(blockchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) - WriteHeader(blockchain.chainDb, header) + WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) + WriteHeader(blockchain.db, header) blockchain.mu.Unlock() } return nil @@ -186,9 +186,9 @@ func TestLastBlock(t *testing.T) { bchain := newTestBlockChain(false) defer bchain.Stop() - block := makeBlockChain(bchain.CurrentBlock(), 1, ethash.NewFaker(), bchain.chainDb, 0)[0] + block := makeBlockChain(bchain.CurrentBlock(), 1, ethash.NewFaker(), bchain.db, 0)[0] bchain.insert(block) - if block.Hash() != GetHeadBlockHash(bchain.chainDb) { + if block.Hash() != GetHeadBlockHash(bchain.db) { t.Errorf("Write/Get HeadBlockHash failed") } } @@ -496,7 +496,7 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Create a new BlockChain and check that it rolled back the state. - ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), vm.Config{}) + ncm, err := NewBlockChain(bc.db, nil, bc.chainConfig, ethash.NewFaker(), vm.Config{}) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } @@ -609,7 +609,7 @@ func TestFastVsFullChains(t *testing.T) { // Import the chain as an archive node for the comparison baseline archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer archive.Stop() if n, err := archive.InsertChain(blocks); err != nil { @@ -618,7 +618,7 @@ func TestFastVsFullChains(t *testing.T) { // Fast import the chain as a non-archive node to test fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -696,7 +696,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) } @@ -709,7 +709,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a non-archive node and ensure all pointers are updated fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + fast, _ := NewBlockChain(fastDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -730,7 +730,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { lightDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(lightDb) - light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), vm.Config{}) + light, _ := NewBlockChain(lightDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } @@ -799,7 +799,7 @@ func TestChainTxReorgs(t *testing.T) { } }) // Import the chain. This runs all block validation rules. - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) if i, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert original chain[%d]: %v", i, err) } @@ -870,7 +870,7 @@ func TestLogReorgs(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() rmLogsCh := make(chan RemovedLogsEvent) @@ -917,7 +917,7 @@ func TestReorgSideEvent(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {}) @@ -992,7 +992,7 @@ func TestCanonicalBlockRetrieval(t *testing.T) { bc := newTestBlockChain(true) defer bc.Stop() - chain, _ := GenerateChain(bc.config, bc.genesisBlock, ethash.NewFaker(), bc.chainDb, 10, func(i int, gen *BlockGen) {}) + chain, _ := GenerateChain(bc.chainConfig, bc.genesisBlock, ethash.NewFaker(), bc.db, 10, func(i int, gen *BlockGen) {}) var pend sync.WaitGroup pend.Add(len(chain)) @@ -1003,14 +1003,14 @@ func TestCanonicalBlockRetrieval(t *testing.T) { // try to retrieve a block by its canonical hash and see if the block data can be retrieved. for { - ch := GetCanonicalHash(bc.chainDb, block.NumberU64()) + ch := GetCanonicalHash(bc.db, block.NumberU64()) if ch == (common.Hash{}) { continue // busy wait for canonical hash to be written } if ch != block.Hash() { t.Fatalf("unknown canonical hash, want %s, got %s", block.Hash().Hex(), ch.Hex()) } - fb := GetBlock(bc.chainDb, ch, block.NumberU64()) + fb := GetBlock(bc.db, ch, block.NumberU64()) if fb == nil { t.Fatalf("unable to retrieve block %d for canonical hash: %s", block.NumberU64(), ch.Hex()) } @@ -1043,7 +1043,7 @@ func TestEIP155Transition(t *testing.T) { genesis = gspec.MustCommit(db) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 4, func(i int, block *BlockGen) { @@ -1151,7 +1151,7 @@ func TestEIP161AccountRemoval(t *testing.T) { } genesis = gspec.MustCommit(db) ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) + blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 3, func(i int, block *BlockGen) { @@ -1226,7 +1226,7 @@ func TestBlockchainHeaderchainReorgConsistency(t *testing.T) { diskdb, _ := ethdb.NewMemDatabase() new(Genesis).MustCommit(diskdb) - chain, err := NewBlockChain(diskdb, params.TestChainConfig, engine, vm.Config{}) + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}) if err != nil { t.Fatalf("failed to create tester chain: %v", err) } @@ -1245,3 +1245,102 @@ func TestBlockchainHeaderchainReorgConsistency(t *testing.T) { } } } + +// Tests that importing small side forks doesn't leave junk in the trie database +// cache (which would eventually cause memory issues). +func TestTrieForkGC(t *testing.T) { + // Generate a canonical chain to act as the main dataset + engine := ethash.NewFaker() + + db, _ := ethdb.NewMemDatabase() + genesis := new(Genesis).MustCommit(db) + blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) + + // Generate a bunch of fork blocks, each side forking from the canonical chain + forks := make([]*types.Block, len(blocks)) + for i := 0; i < len(forks); i++ { + parent := genesis + if i > 0 { + parent = blocks[i-1] + } + fork, _ := GenerateChain(params.TestChainConfig, parent, engine, db, 1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) + forks[i] = fork[0] + } + // Import the canonical and fork chain side by side, forcing the trie cache to cache both + diskdb, _ := ethdb.NewMemDatabase() + new(Genesis).MustCommit(diskdb) + + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + for i := 0; i < len(blocks); i++ { + if _, err := chain.InsertChain(blocks[i : i+1]); err != nil { + t.Fatalf("block %d: failed to insert into chain: %v", i, err) + } + if _, err := chain.InsertChain(forks[i : i+1]); err != nil { + t.Fatalf("fork %d: failed to insert into chain: %v", i, err) + } + } + // Dereference all the recent tries and ensure no past trie is left in + for i := 0; i < triesInMemory; i++ { + chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root(), common.Hash{}) + chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root(), common.Hash{}) + } + if len(chain.stateCache.TrieDB().Nodes()) > 0 { + t.Fatalf("stale tries still alive after garbase collection") + } +} + +// Tests that doing large reorgs works even if the state associated with the +// forking point is not available any more. +func TestLargeReorgTrieGC(t *testing.T) { + // Generate the original common chain segment and the two competing forks + engine := ethash.NewFaker() + + db, _ := ethdb.NewMemDatabase() + genesis := new(Genesis).MustCommit(db) + + shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 64, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) }) + original, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{2}) }) + competitor, _ := GenerateChain(params.TestChainConfig, shared[len(shared)-1], engine, db, 2*triesInMemory+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) }) + + // Import the shared chain and the original canonical one + diskdb, _ := ethdb.NewMemDatabase() + new(Genesis).MustCommit(diskdb) + + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + if _, err := chain.InsertChain(shared); err != nil { + t.Fatalf("failed to insert shared chain: %v", err) + } + if _, err := chain.InsertChain(original); err != nil { + t.Fatalf("failed to insert shared chain: %v", err) + } + // Ensure that the state associated with the forking point is pruned away + if node, _ := chain.stateCache.TrieDB().Node(shared[len(shared)-1].Root()); node != nil { + t.Fatalf("common-but-old ancestor still cache") + } + // Import the competitor chain without exceeding the canonical's TD and ensure + // we have not processed any of the blocks (protection against malicious blocks) + if _, err := chain.InsertChain(competitor[:len(competitor)-2]); err != nil { + t.Fatalf("failed to insert competitor chain: %v", err) + } + for i, block := range competitor[:len(competitor)-2] { + if node, _ := chain.stateCache.TrieDB().Node(block.Root()); node != nil { + t.Fatalf("competitor %d: low TD chain became processed", i) + } + } + // Import the head of the competitor chain, triggering the reorg and ensure we + // successfully reprocess all the stashed away blocks. + if _, err := chain.InsertChain(competitor[len(competitor)-2:]); err != nil { + t.Fatalf("failed to finalize competitor chain: %v", err) + } + for i, block := range competitor[:len(competitor)-triesInMemory] { + if node, _ := chain.stateCache.TrieDB().Node(block.Root()); node != nil { + t.Fatalf("competitor %d: competing chain state missing", i) + } + } +} -- cgit v1.2.3