diff options
Diffstat (limited to 'eth/downloader/downloader_test.go')
-rw-r--r-- | eth/downloader/downloader_test.go | 697 |
1 files changed, 437 insertions, 260 deletions
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5f10fb41f..f71c16237 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -2,7 +2,10 @@ package downloader import ( "encoding/binary" + "errors" + "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -13,21 +16,29 @@ import ( ) var ( - knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} - unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} - bannedHash = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5} + knownHash = common.Hash{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + unknownHash = common.Hash{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} + bannedHash = common.Hash{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + + genesis = createBlock(1, common.Hash{}, knownHash) ) -func createHashes(start, amount int) (hashes []common.Hash) { +// idCounter is used by the createHashes method the generate deterministic but unique hashes +var idCounter = int64(2) // #1 is the genesis block + +// createHashes generates a batch of hashes rooted at a specific point in the chain. +func createHashes(amount int, root common.Hash) (hashes []common.Hash) { hashes = make([]common.Hash, amount+1) - hashes[len(hashes)-1] = knownHash + hashes[len(hashes)-1] = root - for i := range hashes[:len(hashes)-1] { - binary.BigEndian.PutUint64(hashes[i][:8], uint64(start+i+2)) + for i := 0; i < len(hashes)-1; i++ { + binary.BigEndian.PutUint64(hashes[i][:8], uint64(idCounter)) + idCounter++ } return } +// createBlock assembles a new block at the given chain height. func createBlock(i int, parent, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) @@ -36,6 +47,11 @@ func createBlock(i int, parent, hash common.Hash) *types.Block { return block } +// copyBlock makes a deep copy of a block suitable for local modifications. +func copyBlock(block *types.Block) *types.Block { + return createBlock(int(block.Number().Int64()), block.ParentHeaderHash, block.HeaderHash) +} + func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { blocks := make(map[common.Hash]*types.Block) for i := 0; i < len(hashes); i++ { @@ -51,181 +67,171 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { type downloadTester struct { downloader *Downloader - hashes []common.Hash // Chain of hashes simulating - blocks map[common.Hash]*types.Block // Blocks associated with the hashes - chain []common.Hash // Block-chain being constructed + ownHashes []common.Hash // Hash chain belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + peerHashes map[string][]common.Hash // Hash chain belonging to different test peers + peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers maxHashFetch int // Overrides the maximum number of retrieved hashes - - t *testing.T - done chan bool - activePeerId string } -func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester { +func newTester() *downloadTester { tester := &downloadTester{ - t: t, - - hashes: hashes, - blocks: blocks, - chain: []common.Hash{knownHash}, - - done: make(chan bool), + ownHashes: []common.Hash{knownHash}, + ownBlocks: map[common.Hash]*types.Block{knownHash: genesis}, + peerHashes: make(map[string][]common.Hash), + peerBlocks: make(map[string]map[common.Hash]*types.Block), } var mux event.TypeMux - downloader := New(&mux, tester.hasBlock, tester.getBlock) + downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer) tester.downloader = downloader return tester } -// sync is a simple wrapper around the downloader to start synchronisation and -// block until it returns -func (dl *downloadTester) sync(peerId string, head common.Hash) error { - dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, head) +// sync starts synchronizing with a remote peer, blocking until it completes. +func (dl *downloadTester) sync(id string) error { + err := dl.downloader.synchronise(id, dl.peerHashes[id][0]) + for atomic.LoadInt32(&dl.downloader.processing) == 1 { + time.Sleep(time.Millisecond) + } + return err +} + +// hasBlock checks if a block is pres ent in the testers canonical chain. +func (dl *downloadTester) hasBlock(hash common.Hash) bool { + return dl.getBlock(hash) != nil } -// syncTake is starts synchronising with a remote peer, but concurrently it also -// starts fetching blocks that the downloader retrieved. IT blocks until both go -// routines terminate. -func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) { - // Start a block collector to take blocks as they become available - done := make(chan struct{}) - took := []*Block{} - go func() { - for running := true; running; { - select { - case <-done: - running = false - default: - time.Sleep(time.Millisecond) - } - // Take a batch of blocks and accumulate - took = append(took, dl.downloader.TakeBlocks()...) - } - done <- struct{}{} - }() - // Start the downloading, sync the taker and return - err := dl.sync(peerId, head) +// getBlock retrieves a block from the testers canonical chain. +func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { + return dl.ownBlocks[hash] +} - done <- struct{}{} - <-done +// insertChain injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { + for i, block := range blocks { + if _, ok := dl.ownBlocks[block.ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + dl.ownHashes = append(dl.ownHashes, block.Hash()) + dl.ownBlocks[block.Hash()] = block + } + return len(blocks), nil +} - return took, err +// newPeer registers a new block download source into the downloader. +func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + return dl.newSlowPeer(id, hashes, blocks, 0) } -func (dl *downloadTester) hasBlock(hash common.Hash) bool { - for _, h := range dl.chain { - if h == hash { - return true +// newSlowPeer registers a new block download source into the downloader, with a +// specific delay time on processing the network packets sent to it, simulating +// potentially slow network IO. +func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) + if err == nil { + // Assign the owned hashes and blocks to the peer (deep copy) + dl.peerHashes[id] = make([]common.Hash, len(hashes)) + copy(dl.peerHashes[id], hashes) + + dl.peerBlocks[id] = make(map[common.Hash]*types.Block) + for hash, block := range blocks { + dl.peerBlocks[id][hash] = copyBlock(block) } } - return false + return err } -func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { - return dl.blocks[knownHash] -} - -// getHashes retrieves a batch of hashes for reconstructing the chain. -func (dl *downloadTester) getHashes(head common.Hash) error { - limit := MaxHashFetch - if dl.maxHashFetch > 0 { - limit = dl.maxHashFetch - } - // Gather the next batch of hashes - hashes := make([]common.Hash, 0, limit) - for i, hash := range dl.hashes { - if hash == head { - i++ - for len(hashes) < cap(hashes) && i < len(dl.hashes) { - hashes = append(hashes, dl.hashes[i]) +// dropPeer simulates a hard peer removal from the connection pool. +func (dl *downloadTester) dropPeer(id string) { + delete(dl.peerHashes, id) + delete(dl.peerBlocks, id) + + dl.downloader.UnregisterPeer(id) +} + +// peerGetBlocksFn constructs a getHashes function associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of hashes from the particularly requested peer. +func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error { + return func(head common.Hash) error { + time.Sleep(delay) + + limit := MaxHashFetch + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + result := make([]common.Hash, 0, limit) + for i, hash := range hashes { + if hash == head { i++ + for len(result) < cap(result) && i < len(hashes) { + result = append(result, hashes[i]) + i++ + } + break } - break } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, result) + }() + return nil } - // Delay delivery a bit to allow attacks to unfold - id := dl.activePeerId - go func() { - time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes(id, hashes) - }() - return nil } -func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { +// peerGetBlocksFn constructs a getBlocks function associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of blocks from the particularly requested peer. +func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { - blocks := make([]*types.Block, 0, len(hashes)) + time.Sleep(delay) + + blocks := dl.peerBlocks[id] + result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { - if block, ok := dl.blocks[hash]; ok { - blocks = append(blocks, block) + if block, ok := blocks[hash]; ok { + result = append(result, block) } } - go dl.downloader.DeliverBlocks(id, blocks) + go dl.downloader.DeliverBlocks(id, result) return nil } } -// newPeer registers a new block download source into the syncer. -func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) error { - return dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id)) -} - // Tests that simple synchronization, without throttling from a good peer works. func TestSynchronisation(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("peer", hashes[0]); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { - t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks) - } -} - -// Tests that the synchronized blocks can be correctly retrieved. -func TestBlockTaking(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) - blocks := createBlocksFromHashes(hashes) - - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) - - // Synchronise with the peer and test block retrieval - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { - t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks) + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) - blocks := createBlocksFromHashSet(createHashSet(hashes)) - - tester := newTester(t, nil, nil) + tester := newTester() // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes("bad peer", hashes); err != errNoSyncActive { + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } - if err := tester.downloader.DeliverBlocks("bad peer", blocks); err != errNoSyncActive { + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } @@ -234,27 +240,27 @@ func TestInactiveDownloader(t *testing.T) { func TestCancel(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) + // Make sure canceling works with a pristine downloader + tester.downloader.Cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if !tester.downloader.Cancel() { - t.Fatalf("cancel operation failed") - } - // Make sure the queue reports empty and no blocks can be taken - hashCount, blockCount := tester.downloader.queue.Size() + tester.downloader.Cancel() + hashCount, blockCount = tester.downloader.queue.Size() if hashCount > 0 || blockCount > 0 { t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) } - if took := tester.downloader.TakeBlocks(); len(took) != 0 { - t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0) - } } // Tests that if a large batch of blocks are being downloaded, it is throttled @@ -262,98 +268,167 @@ func TestCancel(t *testing.T) { func TestThrottling(t *testing.T) { // Create a long block chain to download and the tester targetBlocks := 8 * blockCacheLimit - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) + // Wrap the importer to allow stepping + done := make(chan int) + tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { + n, err := tester.insertChain(blocks) + done <- n + return n, err + } // Start a synchronisation concurrently errc := make(chan error) go func() { - errc <- tester.sync("peer", hashes[0]) + errc <- tester.sync("peer") }() // Iteratively take some blocks, always checking the retrieval count - for total := 0; total < targetBlocks; { - // Wait a bit for sync to complete + for len(tester.ownBlocks) < targetBlocks+1 { + // Wait a bit for sync to throttle itself + var cached int for start := time.Now(); time.Since(start) < 3*time.Second; { time.Sleep(25 * time.Millisecond) - if len(tester.downloader.queue.blockPool) == blockCacheLimit { + + cached = len(tester.downloader.queue.blockPool) + if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { break } } - // Fetch the next batch of blocks - took := tester.downloader.TakeBlocks() - if len(took) != blockCacheLimit { - t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit) + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) } - total += len(took) - if total > targetBlocks { - t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks) + <-done // finish previous blocking import + for cached > maxBlockProcess { + cached -= <-done } + time.Sleep(25 * time.Millisecond) // yield to the insertion + } + <-done // finish the last blocking import + + // Check that we haven't pulled more blocks than available + if len(tester.ownBlocks) > targetBlocks+1 { + t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) } if err := <-errc; err != nil { t.Fatalf("block synchronization failed: %v", err) } } +// Tests that synchronisation from multiple peers works as intended (multi thread sanity test). +func TestMultiSynchronisation(t *testing.T) { + // Create various peers with various parts of the chain + targetPeers := 16 + targetBlocks := targetPeers*blockCacheLimit - 15 + + hashes := createHashes(targetBlocks, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester := newTester() + for i := 0; i < targetPeers; i++ { + id := fmt.Sprintf("peer #%d", i) + tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) + } + // Synchronise with the middle peer and make sure half of the blocks were retrieved + id := fmt.Sprintf("peer #%d", targetPeers/2) + if err := tester.sync(id); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != len(tester.peerHashes[id]) { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(tester.peerHashes[id])) + } + // Synchronise with the best peer and make sure everything is retrieved + if err := tester.sync("peer #0"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } +} + +// Tests that synchronising with a peer who's very slow at network IO does not +// stall the other peers in the system. +func TestSlowSynchronisation(t *testing.T) { + tester := newTester() + + // Create a batch of blocks, with a slow and a full speed peer + targetCycles := 2 + targetBlocks := targetCycles*blockCacheLimit - 15 + targetIODelay := 500 * time.Millisecond + + hashes := createHashes(targetBlocks, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester.newSlowPeer("fast", hashes, blocks, 0) + tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + + // Try to sync with the peers (pull hashes from fast) + start := time.Now() + if err := tester.sync("fast"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } + // Check that the slow peer got hit at most once per block-cache-size import + limit := time.Duration(targetCycles+1) * targetIODelay + if delay := time.Since(start); delay >= limit { + t.Fatalf("synchronisation exceeded delay limit: have %v, want %v", delay, limit) + } +} + // Tests that if a peer returns an invalid chain with a block pointing to a non- // existing parent, it is correctly detected and handled. func TestNonExistingParentAttack(t *testing.T) { + tester := newTester() + // Forge a single-link chain with a forged header - hashes := createHashes(0, 1) + hashes := createHashes(1, knownHash) blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) - forged := blocks[hashes[0]] - forged.ParentHeaderHash = unknownHash + hashes = createHashes(1, knownHash) + blocks = createBlocksFromHashes(hashes) + blocks[hashes[0]].ParentHeaderHash = unknownHash + tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if err := tester.sync("attack", hashes[0]); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - bs := tester.downloader.TakeBlocks() - if len(bs) != 1 { - t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) + if err := tester.sync("attack"); err == nil { + t.Fatalf("block synchronization succeeded") } - if tester.hasBlock(bs[0].RawBlock.ParentHash()) { - t.Fatalf("tester knows about the unknown hash") + if tester.hasBlock(hashes[0]) { + t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]]) } - tester.downloader.Cancel() - - // Reconstruct a valid chain, and try to synchronize with it - forged.ParentHeaderHash = knownHash - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + // Try to synchronize with the valid chain and make sure it succeeds + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs = tester.downloader.TakeBlocks() - if len(bs) != 1 { - t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) - } - if !tester.hasBlock(bs[0].RawBlock.ParentHash()) { - t.Fatalf("tester doesn't know about the origin hash") + if !tester.hasBlock(tester.peerHashes["valid"][0]) { + t.Fatalf("tester didn't accept known-parent block: %v", tester.peerBlocks["valid"][hashes[0]]) } } // Tests that if a malicious peers keeps sending us repeating hashes, we don't // loop indefinitely. -func TestRepeatingHashAttack(t *testing.T) { +func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid?? + tester := newTester() + // Create a valid chain, but drop the last link - hashes := createHashes(0, blockCacheLimit) + hashes := createHashes(blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) - forged := hashes[:len(hashes)-1] + tester.newPeer("valid", hashes, blocks) + tester.newPeer("attack", hashes[:len(hashes)-1], blocks) // Try and sync with the malicious node - tester := newTester(t, forged, blocks) - tester.newPeer("attack", big.NewInt(10000), forged[0]) - errc := make(chan error) go func() { - errc <- tester.sync("attack", hashes[0]) + errc <- tester.sync("attack") }() - // Make sure that syncing returns and does so with a failure select { case <-time.After(time.Second): @@ -364,9 +439,7 @@ func TestRepeatingHashAttack(t *testing.T) { } } // Ensure that a valid chain can still pass sync - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -374,23 +447,22 @@ func TestRepeatingHashAttack(t *testing.T) { // Tests that if a malicious peers returns a non-existent block hash, it should // eventually time out and the sync reattempted. func TestNonExistingBlockAttack(t *testing.T) { + tester := newTester() + // Create a valid chain, but forge the last link - hashes := createHashes(0, blockCacheLimit) + hashes := createHashes(blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) - origin := hashes[len(hashes)/2] + tester.newPeer("valid", hashes, blocks) hashes[len(hashes)/2] = unknownHash + tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable { + if err := tester.sync("attack"); err != errPeersUnavailable { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) } // Ensure that a valid chain can still pass sync - hashes[len(hashes)/2] = origin - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -398,30 +470,28 @@ func TestNonExistingBlockAttack(t *testing.T) { // Tests that if a malicious peer is returning hashes in a weird order, that the // sync throttler doesn't choke on them waiting for the valid blocks. func TestInvalidHashOrderAttack(t *testing.T) { + tester := newTester() + // Create a valid long chain, but reverse some hashes within - hashes := createHashes(0, 4*blockCacheLimit) + hashes := createHashes(4*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) chunk1 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit) copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit]) copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit]) - reverse := make([]common.Hash, len(hashes)) - copy(reverse, hashes) - copy(reverse[2*blockCacheLimit:], chunk1) - copy(reverse[blockCacheLimit:], chunk2) + copy(hashes[2*blockCacheLimit:], chunk1) + copy(hashes[blockCacheLimit:], chunk2) + tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - tester := newTester(t, reverse, blocks) - tester.newPeer("attack", big.NewInt(10000), reverse[0]) - if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if err := tester.sync("attack"); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if _, err := tester.syncTake("valid", hashes[0]); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -429,17 +499,24 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Tests that if a malicious peer makes up a random hash chain and tries to push // indefinitely, it actually gets caught with it. func TestMadeupHashChainAttack(t *testing.T) { + tester := newTester() blockSoftTTL = 100 * time.Millisecond crossCheckCycle = 25 * time.Millisecond // Create a long chain of hashes without backing blocks - hashes := createHashes(0, 1024*blockCacheLimit) + hashes := createHashes(4*blockCacheLimit, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester.newPeer("valid", hashes, blocks) + tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil) // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, nil) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if err := tester.sync("attack"); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) + } + // Ensure that a valid chain can still pass sync + if err := tester.sync("valid"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -449,14 +526,14 @@ func TestMadeupHashChainAttack(t *testing.T) { // one by one prevents reliable block/parent verification. func TestMadeupHashChainDrippingAttack(t *testing.T) { // Create a random chain of hashes to drip - hashes := createHashes(0, 16*blockCacheLimit) - tester := newTester(t, hashes, nil) + hashes := createHashes(16*blockCacheLimit, knownHash) + tester := newTester() // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer) + tester.newPeer("attack", hashes, nil) + if err := tester.sync("attack"); err != errStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -470,7 +547,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 16*blockCacheLimit) + hashes := createHashes(16*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) gapped := make([]common.Hash, len(hashes)/2) @@ -478,18 +555,17 @@ func TestMadeupBlockChainAttack(t *testing.T) { gapped[i] = hashes[2*i] } // Try and sync with the malicious node and check that it fails - tester := newTester(t, gapped, blocks) - tester.newPeer("attack", big.NewInt(10000), gapped[0]) - if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + tester := newTester() + tester.newPeer("attack", gapped, blocks) + if err := tester.sync("attack"); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if _, err := tester.syncTake("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -498,6 +574,8 @@ func TestMadeupBlockChainAttack(t *testing.T) { // attacker make up a valid hashes for random blocks, but also forges the block // parents to point to existing hashes. func TestMadeupParentBlockChainAttack(t *testing.T) { + tester := newTester() + defaultBlockTTL := blockSoftTTL defaultCrossCheckCycle := crossCheckCycle @@ -505,25 +583,24 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 16*blockCacheLimit) + hashes := createHashes(16*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) - forges := createBlocksFromHashes(hashes) - for hash, block := range forges { - block.ParentHeaderHash = hash // Simulate pointing to already known hash + tester.newPeer("valid", hashes, blocks) + + for _, block := range blocks { + block.ParentHeaderHash = knownHash // Simulate pointing to already known hash } + tester.newPeer("attack", hashes, blocks) + // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, forges) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if err := tester.sync("attack"); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.blocks = blocks - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if _, err := tester.syncTake("valid", hashes[0]); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -532,68 +609,81 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { // the downloader, it will not keep refetching the same chain indefinitely, but // gradually block pieces of it, until it's head is also blocked. func TestBannedChainStarvationAttack(t *testing.T) { - // Construct a valid chain, but ban one of the hashes in it - hashes := createHashes(0, 8*blockCacheLimit) - hashes[len(hashes)/2+23] = bannedHash // weird index to have non multiple of ban chunk size + // Create the tester and ban the selected hash + tester := newTester() + tester.downloader.banned.Add(bannedHash) + // Construct a valid chain, for it and ban the fork + hashes := createHashes(8*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) - // Create the tester and ban the selected hash - tester := newTester(t, hashes, blocks) - tester.downloader.banned.Add(bannedHash) + fork := len(hashes)/2 - 23 + hashes = append(createHashes(4*blockCacheLimit, bannedHash), hashes[fork:]...) + blocks = createBlocksFromHashes(hashes) + tester.newPeer("attack", hashes, blocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", big.NewInt(10000), hashes[0]) for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if err := tester.sync("attack"); err != errInvalidChain { + if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead { + break + } + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Check that the ban list grew with at least 1 new item, or all banned bans := tester.downloader.banned.Size() if bans < banned+1 { - if tester.downloader.banned.Has(hashes[0]) { - break - } t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1) } banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", big.NewInt(10000), hashes[0]); err != errBannedHead { + if err := tester.newPeer("new attacker", hashes, blocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } - if peer := tester.downloader.peers.Peer("net attacker"); peer != nil { + if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { t.Fatalf("banned attacker registered: %v", peer) } + // Ensure that a valid chain can still pass sync + if err := tester.sync("valid"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } } // Tests that if a peer sends excessively many/large invalid chains that are // gradually banned, it will have an upper limit on the consumed memory and also // the origin bad hashes will not be evacuated. func TestBannedChainMemoryExhaustionAttack(t *testing.T) { + // Create the tester and ban the selected hash + tester := newTester() + tester.downloader.banned.Add(bannedHash) + // Reduce the test size a bit + defaultMaxBlockFetch := MaxBlockFetch + defaultMaxBannedHashes := maxBannedHashes + MaxBlockFetch = 4 maxBannedHashes = 256 // Construct a banned chain with more chunks than the ban limit - hashes := createHashes(0, maxBannedHashes*MaxBlockFetch) - hashes[len(hashes)-1] = bannedHash // weird index to have non multiple of ban chunk size - + hashes := createHashes(8*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) - // Create the tester and ban the selected hash - tester := newTester(t, hashes, blocks) - tester.downloader.banned.Add(bannedHash) + fork := len(hashes)/2 - 23 + hashes = append(createHashes(maxBannedHashes*MaxBlockFetch, bannedHash), hashes[fork:]...) + blocks = createBlocksFromHashes(hashes) + tester.newPeer("attack", hashes, blocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", big.NewInt(10000), hashes[0]) for { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if err := tester.sync("attack"); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Short circuit if the entire chain was banned if tester.downloader.banned.Has(hashes[0]) { @@ -609,4 +699,91 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { } } } + // Ensure that a valid chain can still pass sync + MaxBlockFetch = defaultMaxBlockFetch + maxBannedHashes = defaultMaxBannedHashes + + if err := tester.sync("valid"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} + +// Tests that misbehaving peers are disconnected, whilst behaving ones are not. +func TestHashAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual hash fetch errors + tests := []struct { + result error + drop bool + }{ + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop + } + // Run the tests and check disconnection status + tester := newTester() + for i, tt := range tests { + // Register a new peer and ensure it's presence + id := fmt.Sprintf("test %d", i) + if err := tester.newPeer(id, []common.Hash{knownHash}, nil); err != nil { + t.Fatalf("test %d: failed to register new peer: %v", i, err) + } + if _, ok := tester.peerHashes[id]; !ok { + t.Fatalf("test %d: registered peer not found", i) + } + // Simulate a synchronisation and check the required result + tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } + + tester.downloader.Synchronise(id, knownHash) + if _, ok := tester.peerHashes[id]; !ok != tt.drop { + t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) + } + } +} + +// Tests that feeding bad blocks will result in a peer drop. +func TestBlockAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual block import errors + tests := []struct { + failure bool + drop bool + }{{true, true}, {false, false}} + + // Run the tests and check disconnection status + tester := newTester() + for i, tt := range tests { + // Register a new peer and ensure it's presence + id := fmt.Sprintf("test %d", i) + if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + t.Fatalf("test %d: failed to register new peer: %v", i, err) + } + if _, ok := tester.peerHashes[id]; !ok { + t.Fatalf("test %d: registered peer not found", i) + } + // Assemble a good or bad block, depending of the test + raw := createBlock(1, knownHash, common.Hash{}) + if tt.failure { + raw = createBlock(1, unknownHash, common.Hash{}) + } + block := &Block{OriginPeer: id, RawBlock: raw} + + // Simulate block processing and check the result + tester.downloader.queue.blockCache[0] = block + tester.downloader.process() + if _, ok := tester.peerHashes[id]; !ok != tt.drop { + t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop) + } + } } |