aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader_test.go')
-rw-r--r--eth/downloader/downloader_test.go135
1 files changed, 101 insertions, 34 deletions
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ef6f74a6b..cfcc8a2ef 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
}
}
dl.lock.RUnlock()
-
- err := dl.downloader.synchronise(id, hash, td, mode)
- for {
- // If the queue is empty and processing stopped, break
- if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
- break
- }
- // Otherwise sleep a bit and retry
- time.Sleep(time.Millisecond)
- }
- return err
+ return dl.downloader.synchronise(id, hash, td, mode)
}
// hasHeader checks if a header is present in the testers canonical chain.
@@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis
func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
for start := time.Now(); time.Since(start) < time.Second; {
time.Sleep(25 * time.Millisecond)
- tester.lock.RLock()
- tester.downloader.queue.lock.RLock()
+ tester.lock.Lock()
+ tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
}
frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks)
- tester.downloader.queue.lock.RUnlock()
- tester.lock.RUnlock()
+ tester.downloader.queue.lock.Unlock()
+ tester.lock.Unlock()
if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
break
@@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(
func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither hashes nor blocks are accepted
@@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers and
// bodies.
func TestInactiveDownloader62(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers,
// bodies and receipts.
func TestInactiveDownloader63(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch {
@@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t,
func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
@@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t,
func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6
func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac
func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
tester := newTester()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
@@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
- {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
- {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()
@@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64,
func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64,
func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F
func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small block chain
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
@@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
}
}
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) }
+func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) }
+func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) }
+func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
+func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
+func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+ hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
+ fakeHeads := []*types.Header{{}, {}, {}, {}}
+ for i := 0; i < 200; i++ {
+ tester := newTester()
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+ // Whenever the downloader requests headers, flood it with
+ // a lot of unrequested header deliveries.
+ tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
+ deliveriesDone := make(chan struct{}, 500)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ peer := fmt.Sprintf("fake-peer%d", i)
+ go func() {
+ tester.downloader.DeliverHeaders(peer, fakeHeads)
+ deliveriesDone <- struct{}{}
+ }()
+ }
+ // Deliver the actual requested headers.
+ impl := tester.peerGetAbsHeadersFn("peer", 0)
+ go impl(from, count, skip, reverse)
+ // None of the extra deliveries should block.
+ timeout := time.After(5 * time.Second)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ select {
+ case <-deliveriesDone:
+ case <-timeout:
+ panic("blocked")
+ }
+ }
+ return nil
+ }
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Errorf("sync failed: %v", err)
+ }
+ }
+}