diff options
Diffstat (limited to 'eth/downloader/downloader_test.go')
-rw-r--r-- | eth/downloader/downloader_test.go | 293 |
1 files changed, 231 insertions, 62 deletions
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7feca8782..c5fb00289 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -21,7 +21,7 @@ var ( genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0)) ) -// makeChain creates a chain of n blocks starting at and including +// makeChain creates a chain of n blocks starting at but not including // parent. the returned hash chain is ordered head->parent. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) { blocks := core.GenerateChain(parent, testdb, n, func(i int, gen *core.BlockGen) { @@ -42,7 +42,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common // h2[:f] are different but have a common suffix of length n-f. func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 map[common.Hash]*types.Block) { // Create the common suffix. - h, b := makeChain(n-f-1, 0, parent) + h, b := makeChain(n-f, 0, parent) // Create the forks. h1, b1 = makeChain(f, 1, b[h[0]]) h1 = append(h1, h[1:]...) @@ -75,7 +75,7 @@ func newTester() *downloadTester { peerHashes: make(map[string][]common.Hash), peerBlocks: make(map[string]map[common.Hash]*types.Block), } - tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer) + tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.insertChain, tester.dropPeer) return tester } @@ -99,6 +99,11 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.ownBlocks[hash] } +// headBlock retrieves the current head block from the canonical chain. +func (dl *downloadTester) headBlock() *types.Block { + return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1]) +} + // insertChain injects a new batch of blocks into the simulated chain. func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { for i, block := range blocks { @@ -112,15 +117,15 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { } // newPeer registers a new block download source into the downloader. -func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - return dl.newSlowPeer(id, hashes, blocks, 0) +func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + return dl.newSlowPeer(id, version, hashes, blocks, 0) } // newSlowPeer registers a new block download source into the downloader, with a // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. -func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { - err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) +func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, version, delay), dl.peerGetBlocksFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -141,10 +146,10 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } -// peerGetBlocksFn constructs a getHashes function associated with a particular +// peerGetRelHashesFn constructs a GetHashes function associated with a specific // peer in the download tester. The returned function can be used to retrieve // batches of hashes from the particularly requested peer. -func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error { +func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) func(head common.Hash) error { return func(head common.Hash) error { time.Sleep(delay) @@ -174,13 +179,43 @@ func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(h } } +// peerGetAbsHashesFn constructs a GetHashesFromNumber function associated with +// a particular peer in the download tester. The returned function can be used to +// retrieve batches of hashes from the particularly requested peer. +func (dl *downloadTester) peerGetAbsHashesFn(id string, version int, delay time.Duration) func(uint64, int) error { + // If the simulated peer runs eth/60, this message is not supported + if version == eth60 { + return func(uint64, int) error { return nil } + } + // Otherwise create a method to request the blocks by number + return func(head uint64, count int) error { + time.Sleep(delay) + + limit := count + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + result := make([]common.Hash, 0, limit) + for i := 0; i < limit && len(hashes)-int(head)-1-i >= 0; i++ { + result = append(result, hashes[len(hashes)-int(head)-1-i]) + } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, result) + }() + return nil + } +} + // peerGetBlocksFn constructs a getBlocks function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of blocks from the particularly requested peer. func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { time.Sleep(delay) - blocks := dl.peerBlocks[id] result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { @@ -195,13 +230,13 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ } // Tests that simple synchronization, without throttling from a good peer works. -func TestSynchronisation(t *testing.T) { +func TestSynchronisation60(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer"); err != nil { @@ -212,42 +247,79 @@ func TestSynchronisation(t *testing.T) { } } -// Tests that an inactive downloader will not accept incoming hashes and blocks. -func TestInactiveDownloader(t *testing.T) { +// Tests that simple synchronization against a canonical chain works correctly. +// In this test common ancestor lookup should be short circuited and not require +// binary searching. +func TestCanonicalSynchronisation(t *testing.T) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + tester := newTester() + tester.newPeer("peer", eth61, hashes, blocks) - // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } - if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } -// Tests that a canceled download wipes all previously accumulated state. -func TestCancel(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 +// Tests that if a large batch of blocks are being downloaded, it is throttled +// until the cached blocks are retrieved. +func TestThrottling60(t *testing.T) { + // Create a long block chain to download and the tester + targetBlocks := 8 * blockCacheLimit hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) - // Make sure canceling works with a pristine downloader - tester.downloader.cancel() - hashCount, blockCount := tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + // Wrap the importer to allow stepping + done := make(chan int) + tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { + n, err := tester.insertChain(blocks) + done <- n + return n, err } - // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + // Start a synchronisation concurrently + errc := make(chan error) + go func() { + errc <- tester.sync("peer") + }() + // Iteratively take some blocks, always checking the retrieval count + for len(tester.ownBlocks) < targetBlocks+1 { + // Wait a bit for sync to throttle itself + var cached int + for start := time.Now(); time.Since(start) < 3*time.Second; { + time.Sleep(25 * time.Millisecond) + + cached = len(tester.downloader.queue.blockPool) + if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { + break + } + } + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) + } + <-done // finish previous blocking import + for cached > maxBlockProcess { + cached -= <-done + } + time.Sleep(25 * time.Millisecond) // yield to the insertion } - tester.downloader.cancel() - hashCount, blockCount = tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + <-done // finish the last blocking import + + // Check that we haven't pulled more blocks than available + if len(tester.ownBlocks) > targetBlocks+1 { + t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) + } + if err := <-errc; err != nil { + t.Fatalf("block synchronization failed: %v", err) } } @@ -259,7 +331,7 @@ func TestThrottling(t *testing.T) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth61, hashes, blocks) // Wrap the importer to allow stepping done := make(chan int) @@ -307,6 +379,102 @@ func TestThrottling(t *testing.T) { } } +// Tests that simple synchronization against a forked chain works correctly. In +// this test common ancestor lookup should *not* be short circuited, and a full +// binary search should be executed. +func TestForkedSynchronisation(t *testing.T) { + // Create a long enough forked chain + common, fork := MaxHashFetch, 2*MaxHashFetch + hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) + + tester := newTester() + tester.newPeer("fork A", eth61, hashesA, blocksA) + tester.newPeer("fork B", eth61, hashesB, blocksB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("fork A"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != common+fork+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1) + } + // Synchronise with the second peer and make sure that fork is pulled too + if err := tester.sync("fork B"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != common+2*fork+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+2*fork+1) + } +} + +// Tests that an inactive downloader will not accept incoming hashes and blocks. +func TestInactiveDownloader(t *testing.T) { + tester := newTester() + + // Check that neither hashes nor blocks are accepted + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel60(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", eth60, hashes, blocks) + + // Make sure canceling works with a pristine downloader + tester.downloader.cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 + if targetBlocks >= MaxHashFetch { + targetBlocks = MaxHashFetch - 15 + } + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", eth61, hashes, blocks) + + // Make sure canceling works with a pristine downloader + tester.downloader.cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } +} + // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). func TestMultiSynchronisation(t *testing.T) { // Create various peers with various parts of the chain @@ -317,7 +485,7 @@ func TestMultiSynchronisation(t *testing.T) { tester := newTester() for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) + tester.newPeer(id, eth60, hashes[i*blockCacheLimit:], blocks) } // Synchronise with the middle peer and make sure half of the blocks were retrieved id := fmt.Sprintf("peer #%d", targetPeers/2) @@ -347,8 +515,8 @@ func TestSlowSynchronisation(t *testing.T) { targetIODelay := time.Second hashes, blocks := makeChain(targetBlocks, 0, genesis) - tester.newSlowPeer("fast", hashes, blocks, 0) - tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + tester.newSlowPeer("fast", eth60, hashes, blocks, 0) + tester.newSlowPeer("slow", eth60, hashes, blocks, targetIODelay) // Try to sync with the peers (pull hashes from fast) start := time.Now() @@ -370,13 +538,14 @@ func TestSlowSynchronisation(t *testing.T) { func TestNonExistingParentAttack(t *testing.T) { tester := newTester() + // Forge a single-link chain with a forged header hashes, blocks := makeChain(1, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil) wrongblock.Td = blocks[hashes[0]].Td hashes, blocks = makeChain(1, 0, wrongblock) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err == nil { @@ -401,8 +570,8 @@ func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid?? // Create a valid chain, but drop the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", hashes[:len(hashes)-1], blocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, hashes[:len(hashes)-1], blocks) // Try and sync with the malicious node errc := make(chan error) @@ -431,10 +600,10 @@ func TestNonExistingBlockAttack(t *testing.T) { // Create a valid chain, but forge the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) hashes[len(hashes)/2] = common.Hash{} - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errPeersUnavailable { @@ -453,7 +622,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Create a valid long chain, but reverse some hashes within hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) chunk1 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit) @@ -462,7 +631,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { copy(hashes[2*blockCacheLimit:], chunk1) copy(hashes[blockCacheLimit:], chunk2) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errInvalidChain { @@ -489,8 +658,8 @@ func TestMadeupHashChainAttack(t *testing.T) { rand.Read(randomHashes[i][:]) } - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, randomHashes, nil) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errCrossCheckFailed { @@ -517,7 +686,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("attack", eth60, randomHashes, nil) if err := tester.sync("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -540,7 +709,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { } // Try and sync with the malicious node and check that it fails tester := newTester() - tester.newPeer("attack", gapped, blocks) + tester.newPeer("attack", eth60, gapped, blocks) if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -548,13 +717,13 @@ func TestMadeupBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } -// tests that if one/multiple malicious peers try to feed a banned blockchain to +// Tests that if one/multiple malicious peers try to feed a banned blockchain to // the downloader, it will not keep refetching the same chain indefinitely, but // gradually block pieces of it, until its head is also blocked. func TestBannedChainStarvationAttack(t *testing.T) { @@ -565,8 +734,8 @@ func TestBannedChainStarvationAttack(t *testing.T) { // Create the tester and ban the selected hash. tester := newTester() tester.downloader.banned.Add(forkHashes[fork-1]) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -586,7 +755,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", forkHashes, forkBlocks); err != errBannedHead { + if err := tester.newPeer("new attacker", eth60, forkHashes, forkBlocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { @@ -618,8 +787,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { MaxBlockFetch = 4 maxBannedHashes = 256 - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -664,7 +833,7 @@ func TestOverlappingDeliveryAttack(t *testing.T) { // Register an attacker that always returns non-requested blocks too tester := newTester() - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error { @@ -712,7 +881,7 @@ func TestHashAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{genesis.Hash()}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -744,7 +913,7 @@ func TestBlockAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{common.Hash{}}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { |