From 55599ee95d4151a2502465e0afc7c47bd1acba77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 5 Feb 2018 18:40:32 +0200 Subject: core, trie: intermediate mempool between trie and database (#15857) This commit reduces database I/O by not writing every state trie to disk. --- eth/downloader/downloader_test.go | 192 +++++++++----------------------------- 1 file changed, 46 insertions(+), 146 deletions(-) (limited to 'eth/downloader/downloader_test.go') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e9c7b6170..d94d55f11 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -45,8 +44,8 @@ var ( // Reduce some of the parameters to make the tester faster. func init() { MaxForkAncestry = uint64(10000) - blockCacheLimit = 1024 - fsCriticalTrials = 10 + blockCacheItems = 1024 + fsHeaderContCheck = 500 * time.Millisecond } // downloadTester is a test simulator for mocking out local block chain. @@ -223,7 +222,7 @@ func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool { } // HasBlockAndState checks if a block and associated state is present in the testers canonical chain. -func (dl *downloadTester) HasBlockAndState(hash common.Hash) bool { +func (dl *downloadTester) HasBlockAndState(hash common.Hash, number uint64) bool { block := dl.GetBlockByHash(hash) if block == nil { return false @@ -293,7 +292,7 @@ func (dl *downloadTester) CurrentFastBlock() *types.Block { func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error { // For now only check that the state trie is correct if block := dl.GetBlockByHash(hash); block != nil { - _, err := trie.NewSecure(block.Root(), dl.stateDb, 0) + _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb), 0) return err } return fmt.Errorf("non existent block: %x", hash[:4]) @@ -619,28 +618,22 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) { // number of items of the various chain components. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) { // Initialize the counters for the first fork - headers, blocks := lengths[0], lengths[0] + headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks - minReceipts, maxReceipts := lengths[0]-fsMinFullBlocks-fsPivotInterval, lengths[0]-fsMinFullBlocks - if minReceipts < 0 { - minReceipts = 1 - } - if maxReceipts < 0 { - maxReceipts = 1 + if receipts < 0 { + receipts = 1 } // Update the counters for each subsequent fork for _, length := range lengths[1:] { headers += length - common blocks += length - common - - minReceipts += length - common - fsMinFullBlocks - fsPivotInterval - maxReceipts += length - common - fsMinFullBlocks + receipts += length - common - fsMinFullBlocks } switch tester.downloader.mode { case FullSync: - minReceipts, maxReceipts = 1, 1 + receipts = 1 case LightSync: - blocks, minReceipts, maxReceipts = 1, 1, 1 + blocks, receipts = 1, 1 } if hs := len(tester.ownHeaders); hs != headers { t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers) @@ -648,11 +641,12 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng if bs := len(tester.ownBlocks); bs != blocks { t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks) } - if rs := len(tester.ownReceipts); rs < minReceipts || rs > maxReceipts { - t.Fatalf("synchronised receipts mismatch: have %v, want between [%v, %v]", rs, minReceipts, maxReceipts) + if rs := len(tester.ownReceipts); rs != receipts { + t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts) } // Verify the state trie too for fast syncs - if tester.downloader.mode == FastSync { + /*if tester.downloader.mode == FastSync { + pivot := uint64(0) var index int if pivot := int(tester.downloader.queue.fastSyncPivot); pivot < common { index = pivot @@ -660,11 +654,11 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng index = len(tester.ownHashes) - lengths[len(lengths)-1] + int(tester.downloader.queue.fastSyncPivot) } if index > 0 { - if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(tester.stateDb)); statedb == nil || err != nil { + if statedb, err := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, state.NewDatabase(trie.NewDatabase(tester.stateDb))); statedb == nil || err != nil { t.Fatalf("state reconstruction failed: %v", err) } } - } + }*/ } // Tests that simple synchronization against a canonical chain works correctly. @@ -684,7 +678,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -710,7 +704,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a long block chain to download and the tester - targetBlocks := 8 * blockCacheLimit + targetBlocks := 8 * blockCacheItems hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -745,9 +739,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { - if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { - cached = receipts - } + //if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot { + cached = receipts + //} } } frozen = int(atomic.LoadUint32(&blocked)) @@ -755,7 +749,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.downloader.queue.lock.Unlock() tester.lock.Unlock() - if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { + if cached == blockCacheItems || retrieved+cached+frozen == targetBlocks+1 { break } } @@ -765,8 +759,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.lock.RLock() retrieved = len(tester.ownBlocks) tester.lock.RUnlock() - if cached != blockCacheLimit && retrieved+cached+frozen != targetBlocks+1 { - t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheLimit, retrieved, frozen, targetBlocks+1) + if cached != blockCacheItems && retrieved+cached+frozen != targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) } // Permit the blocked blocks to import if atomic.LoadUint32(&blocked) > 0 { @@ -974,7 +968,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 if targetBlocks >= MaxHashFetch { targetBlocks = MaxHashFetch - 15 } @@ -1016,12 +1010,12 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Create various peers with various parts of the chain targetPeers := 8 - targetBlocks := targetPeers*blockCacheLimit - 15 + targetBlocks := targetPeers*blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) + tester.newPeer(id, protocol, hashes[i*blockCacheItems:], headers, blocks, receipts) } if err := tester.sync("peer #0", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) @@ -1045,7 +1039,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Create peers of every type @@ -1084,7 +1078,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a block chain to download - targetBlocks := 2*blockCacheLimit - 15 + targetBlocks := 2*blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) @@ -1110,8 +1104,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { bodiesNeeded++ } } - for hash, receipt := range receipts { - if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= tester.downloader.queue.fastSyncPivot { + for _, receipt := range receipts { + if mode == FastSync && len(receipt) > 0 { receiptsNeeded++ } } @@ -1139,7 +1133,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Attempt a full sync with an attacker feeding gapped headers @@ -1174,7 +1168,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Attempt a full sync with an attacker feeding shifted headers @@ -1208,7 +1202,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := 3*fsHeaderSafetyNet + fsPivotInterval + fsMinFullBlocks + targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Attempt to sync with an attacker that feeds junk during the fast sync phase. @@ -1248,7 +1242,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("withhold-attack", protocol, hashes, headers, blocks, receipts) missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1 - tester.downloader.fsPivotFails = 0 tester.downloader.syncInitHook = func(uint64, uint64) { for i := missing; i <= len(hashes); i++ { delete(tester.peerHeaders["withhold-attack"], hashes[len(hashes)-i]) @@ -1267,8 +1260,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Errorf("fast sync pivot block #%d not rolled back", head) } } - tester.downloader.fsPivotFails = fsCriticalTrials - // Synchronise with the valid peer and make sure sync succeeds. Since the last // rollback should also disable fast syncing for this process, verify that we // did a fresh full sync. Note, we can't assert anything about the receipts @@ -1383,7 +1374,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes @@ -1532,7 +1523,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes @@ -1609,7 +1600,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small block chain - targetBlocks := blockCacheLimit - 15 + targetBlocks := blockCacheItems - 15 hashes, headers, blocks, receipts := tester.makeChain(targetBlocks+3, 0, tester.genesis, nil, false) // Set a sync init hook to catch progress changes @@ -1697,6 +1688,7 @@ func TestDeliverHeadersHang(t *testing.T) { type floodingTestPeer struct { peer Peer tester *downloadTester + pend sync.WaitGroup } func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } @@ -1717,9 +1709,12 @@ func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int deliveriesDone := make(chan struct{}, 500) for i := 0; i < cap(deliveriesDone); i++ { peer := fmt.Sprintf("fake-peer%d", i) + ftp.pend.Add(1) + go func() { ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}}) deliveriesDone <- struct{}{} + ftp.pend.Done() }() } // Deliver the actual requested headers. @@ -1751,110 +1746,15 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { // Whenever the downloader requests headers, flood it with // a lot of unrequested header deliveries. tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{ - tester.downloader.peers.peers["peer"].peer, - tester, + peer: tester.downloader.peers.peers["peer"].peer, + tester: tester, } if err := tester.sync("peer", nil, mode); err != nil { - t.Errorf("sync failed: %v", err) + t.Errorf("test %d: sync failed: %v", i, err) } tester.terminate() - } -} - -// Tests that if fast sync aborts in the critical section, it can restart a few -// times before giving up. -// We use data driven subtests to manage this so that it will be parallel on its own -// and not with the other tests, avoiding intermittent failures. -func TestFastCriticalRestarts(t *testing.T) { - testCases := []struct { - protocol int - progress bool - }{ - {63, false}, - {64, false}, - {63, true}, - {64, true}, - } - for _, tc := range testCases { - t.Run(fmt.Sprintf("protocol %d progress %v", tc.protocol, tc.progress), func(t *testing.T) { - testFastCriticalRestarts(t, tc.protocol, tc.progress) - }) - } -} - -func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) { - t.Parallel() - - tester := newTester() - defer tester.terminate() - - // Create a large enough blockchin to actually fast sync on - targetBlocks := fsMinFullBlocks + 2*fsPivotInterval - 15 - hashes, headers, blocks, receipts := tester.makeChain(targetBlocks, 0, tester.genesis, nil, false) - - // Create a tester peer with a critical section header missing (force failures) - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) - delete(tester.peerHeaders["peer"], hashes[fsMinFullBlocks-1]) - tester.downloader.dropPeer = func(id string) {} // We reuse the same "faulty" peer throughout the test - - // Remove all possible pivot state roots and slow down replies (test failure resets later) - for i := 0; i < fsPivotInterval; i++ { - tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true - } - (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(500 * time.Millisecond) // Enough to reach the critical section - - // Synchronise with the peer a few times and make sure they fail until the retry limit - for i := 0; i < int(fsCriticalTrials)-1; i++ { - // Attempt a sync and ensure it fails properly - if err := tester.sync("peer", nil, FastSync); err == nil { - t.Fatalf("failing fast sync succeeded: %v", err) - } - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - - // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes - if i == 0 { - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - if tester.downloader.fsPivotLock == nil { - time.Sleep(400 * time.Millisecond) // Make sure the first huge timeout expires too - t.Fatalf("pivot block not locked in after critical section failure") - } - tester.lock.Lock() - tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]] - tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} - (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).setDelay(0) - tester.lock.Unlock() - } - } - // Return all nodes if we're testing fast sync progression - if progress { - tester.lock.Lock() - tester.peerMissingStates["peer"] = map[common.Hash]bool{} - tester.lock.Unlock() - - if err := tester.sync("peer", nil, FastSync); err != nil { - t.Fatalf("failed to synchronise blocks in progressed fast sync: %v", err) - } - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != 1 { - t.Fatalf("progressed pivot trial count mismatch: have %v, want %v", fails, 1) - } - assertOwnChain(t, tester, targetBlocks+1) - } else { - if err := tester.sync("peer", nil, FastSync); err == nil { - t.Fatalf("succeeded to synchronise blocks in failed fast sync") - } - time.Sleep(150 * time.Millisecond) // Make sure no in-flight requests remain - - if fails := atomic.LoadUint32(&tester.downloader.fsPivotFails); fails != fsCriticalTrials { - t.Fatalf("failed pivot trial count mismatch: have %v, want %v", fails, fsCriticalTrials) - } - } - // Retry limit exhausted, downloader will switch to full sync, should succeed - if err := tester.sync("peer", nil, FastSync); err != nil { - t.Fatalf("failed to synchronise blocks in slow sync: %v", err) + // Flush all goroutines to prevent messing with subsequent tests + tester.downloader.peers.peers["peer"].peer.(*floodingTestPeer).pend.Wait() } - // Note, we can't assert the chain here because the test asserter assumes sync - // completed using a single mode of operation, whereas fast-then-slow can result - // in arbitrary intermediate state that's not cleanly verifiable. } -- cgit v1.2.3