diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/downloader/downloader.go | 397 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 380 | ||||
-rw-r--r-- | eth/handler_test.go | 3 | ||||
-rw-r--r-- | eth/metrics.go | 24 | ||||
-rw-r--r-- | eth/protocol.go | 25 | ||||
-rw-r--r-- | eth/protocol_test.go | 3 |
6 files changed, 28 insertions, 804 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6a6bce644..b28879ee6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -18,11 +18,9 @@ package downloader import ( - "bytes" "errors" "math" "math/big" - "math/rand" "sync" "sync/atomic" "time" @@ -37,8 +35,8 @@ import ( ) const ( - eth60 = 60 // Constant to check for old protocol support - eth61 = 61 // Constant to check for new protocol support + eth61 = 61 // Constant to check for old protocol support + eth62 = 62 // Constant to check for new protocol support ) var ( @@ -324,16 +322,8 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version) switch p.version { - case eth60: - // Old eth/60 version, use reverse hash retrieval algorithm - if err = d.fetchHashes60(p, hash); err != nil { - return err - } - if err = d.fetchBlocks60(); err != nil { - return err - } case eth61: - // New eth/61, use forward, concurrent hash and block retrieval algorithm + // Old eth/61, use forward, concurrent hash and block retrieval algorithm number, err := d.findAncestor(p) if err != nil { return err @@ -355,8 +345,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) return errBadPeer } - glog.V(logger.Debug).Infoln("Synchronization completed") - return nil } @@ -385,299 +373,6 @@ func (d *Downloader) Terminate() { d.cancel() } -// fetchHashes60 starts retrieving hashes backwards from a specific peer and hash, -// up until it finds a common ancestor. If the source peer times out, alternative -// ones are tried for continuation. -func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error { - var ( - start = time.Now() - active = p // active peer will help determine the current active peer - head = common.Hash{} // common and last hash - - timeout = time.NewTimer(0) // timer to dump a non-responsive active peer - attempted = make(map[string]bool) // attempted peers will help with retries - crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks - ) - defer crossTicker.Stop() - defer timeout.Stop() - - glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) - <-timeout.C // timeout channel should be initially empty. - - getHashes := func(from common.Hash) { - go active.getRelHashes(from) - timeout.Reset(hashTTL) - } - - // Add the hash to the queue, and start hash retrieval. - d.queue.Insert([]common.Hash{h}, false) - getHashes(h) - - attempted[p.id] = true - for finished := false; !finished; { - select { - case <-d.cancelCh: - return errCancelHashFetch - - case hashPack := <-d.hashCh: - // Make sure the active peer is giving us the hashes - if hashPack.peerId != active.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) - break - } - timeout.Stop() - - // Make sure the peer actually gave something valid - if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) - return errEmptyHashSet - } - for index, hash := range hashPack.hashes { - if d.banned.Has(hash) { - glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) - - d.queue.Insert(hashPack.hashes[:index+1], false) - if err := d.banBlocks(active.id, hash); err != nil { - glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) - } - return errInvalidChain - } - } - // Determine if we're done fetching hashes (queue up all pending), and continue if not done - done, index := false, 0 - for index, head = range hashPack.hashes { - if d.hasBlock(head) || d.queue.GetBlock(head) != nil { - glog.V(logger.Debug).Infof("Found common hash %x", head[:4]) - hashPack.hashes = hashPack.hashes[:index] - done = true - break - } - } - // Insert all the new hashes, but only continue if got something useful - inserts := d.queue.Insert(hashPack.hashes, false) - if len(inserts) == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) - return errBadPeer - } - if !done { - // Check that the peer is not stalling the sync - if len(inserts) < MinHashFetch { - return errStallingPeer - } - // Try and fetch a random block to verify the hash batch - // Skip the last hash as the cross check races with the next hash fetch - cross := rand.Intn(len(inserts) - 1) - origin, parent := inserts[cross], inserts[cross+1] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent) - - d.checks[origin] = &crossCheck{ - expire: time.Now().Add(blockSoftTTL), - parent: parent, - } - go active.getBlocks([]common.Hash{origin}) - - // Also fetch a fresh batch of hashes - getHashes(head) - continue - } - // We're done, prepare the download cache and proceed pulling the blocks - offset := uint64(0) - if block := d.getBlock(head); block != nil { - offset = block.NumberU64() + 1 - } - d.queue.Prepare(offset) - finished = true - - case blockPack := <-d.blockCh: - // Cross check the block with the random verifications - if blockPack.peerId != active.id || len(blockPack.blocks) != 1 { - continue - } - block := blockPack.blocks[0] - if check, ok := d.checks[block.Hash()]; ok { - if block.ParentHash() != check.parent { - return errCrossCheckFailed - } - delete(d.checks, block.Hash()) - } - - case <-crossTicker.C: - // Iterate over all the cross checks and fail the hash chain if they're not verified - for hash, check := range d.checks { - if time.Now().After(check.expire) { - glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) - return errCrossCheckFailed - } - } - - case <-timeout.C: - glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id) - - var p *peer // p will be set if a peer can be found - // Attempt to find a new peer by checking inclusion of peers best hash in our - // already fetched hash list. This can't guarantee 100% correctness but does - // a fair job. This is always either correct or false incorrect. - for _, peer := range d.peers.AllPeers() { - if d.queue.Has(peer.head) && !attempted[peer.id] { - p = peer - break - } - } - // if all peers have been tried, abort the process entirely or if the hash is - // the zero hash. - if p == nil || (head == common.Hash{}) { - return errTimeout - } - // set p to the active peer. this will invalidate any hashes that may be returned - // by our previous (delayed) peer. - active = p - getHashes(head) - glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) - } - } - glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start)) - - return nil -} - -// fetchBlocks60 iteratively downloads the entire schedules block-chain, taking -// any available peers, reserving a chunk of blocks for each, wait for delivery -// and periodically checking for timeouts. -func (d *Downloader) fetchBlocks60() error { - glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") - start := time.Now() - - // Start a ticker to continue throttled downloads and check for bad peers - ticker := time.NewTicker(20 * time.Millisecond) - defer ticker.Stop() - -out: - for { - select { - case <-d.cancelCh: - return errCancelBlockFetch - - case <-d.hashCh: - // Out of bounds hashes received, ignore them - - case blockPack := <-d.blockCh: - // Short circuit if it's a stale cross check - if len(blockPack.blocks) == 1 { - block := blockPack.blocks[0] - if _, ok := d.checks[block.Hash()]; ok { - delete(d.checks, block.Hash()) - break - } - } - // If the peer was previously banned and failed to deliver it's pack - // in a reasonable time frame, ignore it's message. - if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks, and demote in case of errors - err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) - switch err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(blockPack.blocks) == 0 { - peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) - go d.process() - - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort - return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) - - default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) - go d.process() - } - } - - case <-ticker.C: - // Short circuit if we lost all our peers - if d.peers.Len() == 0 { - return errNoPeers - } - // Check for block request timeouts and demote the responsible peers - badPeers := d.queue.Expire(blockHardTTL) - for _, pid := range badPeers { - if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) - } - } - // If there are unrequested hashes left start fetching from the available peers - if d.queue.Pending() > 0 { - // Throttle the download if block cache is full and waiting processing - if d.queue.Throttle() { - break - } - // Send a download request to all idle peers, until throttled - idlePeers := d.peers.IdlePeers() - for _, peer := range idlePeers { - // Short circuit if throttling activated since above - if d.queue.Throttle() { - break - } - // Get a possible chunk. If nil is returned no chunk - // could be returned due to no hashes available. - request := d.queue.Reserve(peer, peer.Capacity()) - if request == nil { - continue - } - if glog.V(logger.Detail) { - glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) - } - // Fetch the chunk and check for error. If the peer was somehow - // already fetching a chunk due to a bug, it will be returned to - // the queue - if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("Peer %s received double work", peer.id) - d.queue.Cancel(request) - } - } - // Make sure that we have peers available for fetching. If all peers have been tried - // and all failed throw an error - if d.queue.InFlight() == 0 { - return errPeersUnavailable - } - - } else if d.queue.InFlight() == 0 { - // When there are no more queue and no more in flight, We can - // safely assume we're done. Another part of the process will check - // for parent errors and will re-request anything that's missing - break out - } - } - } - glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start)) - return nil -} - // findAncestor tries to locate the common ancestor block of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N blocks should already get us a match. @@ -1023,92 +718,6 @@ func (d *Downloader) fetchBlocks(from uint64) error { } } -// banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes, -// and bans the head of the retrieved batch. -// -// This method only fetches one single batch as the goal is not ban an entire -// (potentially long) invalid chain - wasting a lot of time in the meanwhile -, -// but rather to gradually build up a blacklist if the peer keeps reconnecting. -func (d *Downloader) banBlocks(peerId string, head common.Hash) error { - glog.V(logger.Debug).Infof("Banning a batch out of %d blocks from %s", d.queue.Pending(), peerId) - - // Ask the peer being banned for a batch of blocks from the banning point - peer := d.peers.Peer(peerId) - if peer == nil { - return nil - } - request := d.queue.Reserve(peer, MaxBlockFetch) - if request == nil { - return nil - } - if err := peer.Fetch(request); err != nil { - return err - } - // Wait a bit for the reply to arrive, and ban if done so - timeout := time.After(blockHardTTL) - for { - select { - case <-d.cancelCh: - return errCancelBlockFetch - - case <-timeout: - return errTimeout - - case <-d.hashCh: - // Out of bounds hashes received, ignore them - - case blockPack := <-d.blockCh: - blocks := blockPack.blocks - - // Short circuit if it's a stale cross check - if len(blocks) == 1 { - block := blocks[0] - if _, ok := d.checks[block.Hash()]; ok { - delete(d.checks, block.Hash()) - break - } - } - // Short circuit if it's not from the peer being banned - if blockPack.peerId != peerId { - break - } - // Short circuit if no blocks were returned - if len(blocks) == 0 { - return errors.New("no blocks returned to ban") - } - // Reconstruct the original chain order and ensure we're banning the correct blocks - types.BlockBy(types.Number).Sort(blocks) - if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 { - return errors.New("head block not the banned one") - } - index := 0 - for _, block := range blocks[1:] { - if bytes.Compare(block.ParentHash().Bytes(), blocks[index].Hash().Bytes()) != 0 { - break - } - index++ - } - // Ban the head hash and phase out any excess - d.banned.Add(blocks[index].Hash()) - for d.banned.Size() > maxBannedHashes { - var evacuate common.Hash - - d.banned.Each(func(item interface{}) bool { - // Skip any hard coded bans - if core.BadHashes[item.(common.Hash)] { - return true - } - evacuate = item.(common.Hash) - return false - }) - d.banned.Remove(evacuate) - } - glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId) - return nil - } - } -} - // process takes blocks from the queue and tries to import them into the chain. // // The algorithmic flow is as follows: diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 61fc7827b..7e3456433 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -17,7 +17,6 @@ package downloader import ( - "crypto/rand" "errors" "fmt" "math/big" @@ -215,11 +214,6 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun // a particular peer in the download tester. The returned function can be used to // retrieve batches of hashes from the particularly requested peer. func (dl *downloadTester) peerGetAbsHashesFn(id string, version int, delay time.Duration) func(uint64, int) error { - // If the simulated peer runs eth/60, this message is not supported - if version == eth60 { - return func(uint64, int) error { return nil } - } - // Otherwise create a method to request the blocks by number return func(head uint64, count int) error { time.Sleep(delay) @@ -261,24 +255,6 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ } } -// Tests that simple synchronization, without throttling from a good peer works. -func TestSynchronisation60(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 - hashes, blocks := makeChain(targetBlocks, 0, genesis) - - tester := newTester() - tester.newPeer("peer", eth60, hashes, blocks) - - // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("peer", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if imported := len(tester.ownBlocks); imported != targetBlocks+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) - } -} - // Tests that simple synchronization against a canonical chain works correctly. // In this test common ancestor lookup should be short circuited and not require // binary searching. @@ -301,7 +277,6 @@ func TestCanonicalSynchronisation61(t *testing.T) { // Tests that if a large batch of blocks are being downloaded, it is throttled // until the cached blocks are retrieved. -func TestThrottling60(t *testing.T) { testThrottling(t, eth60) } func TestThrottling61(t *testing.T) { testThrottling(t, eth61) } func testThrottling(t *testing.T, protocol int) { @@ -400,7 +375,6 @@ func TestInactiveDownloader(t *testing.T) { } // Tests that a canceled download wipes all previously accumulated state. -func TestCancel60(t *testing.T) { testCancel(t, eth60) } func TestCancel61(t *testing.T) { testCancel(t, eth61) } func testCancel(t *testing.T, protocol int) { @@ -432,7 +406,6 @@ func testCancel(t *testing.T, protocol int) { } // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). -func TestMultiSynchronisation60(t *testing.T) { testMultiSynchronisation(t, eth60) } func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, eth61) } func testMultiSynchronisation(t *testing.T, protocol int) { @@ -463,355 +436,6 @@ func testMultiSynchronisation(t *testing.T, protocol int) { } } -// Tests that synchronising with a peer who's very slow at network IO does not -// stall the other peers in the system. -func TestSlowSynchronisation60(t *testing.T) { - tester := newTester() - - // Create a batch of blocks, with a slow and a full speed peer - targetCycles := 2 - targetBlocks := targetCycles*blockCacheLimit - 15 - targetIODelay := time.Second - hashes, blocks := makeChain(targetBlocks, 0, genesis) - - tester.newSlowPeer("fast", eth60, hashes, blocks, 0) - tester.newSlowPeer("slow", eth60, hashes, blocks, targetIODelay) - - // Try to sync with the peers (pull hashes from fast) - start := time.Now() - if err := tester.sync("fast", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if imported := len(tester.ownBlocks); imported != targetBlocks+1 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) - } - // Check that the slow peer got hit at most once per block-cache-size import - limit := time.Duration(targetCycles+1) * targetIODelay - if delay := time.Since(start); delay >= limit { - t.Fatalf("synchronisation exceeded delay limit: have %v, want %v", delay, limit) - } -} - -// Tests that if a peer returns an invalid chain with a block pointing to a non- -// existing parent, it is correctly detected and handled. -func TestNonExistingParentAttack60(t *testing.T) { - tester := newTester() - - // Forge a single-link chain with a forged header - hashes, blocks := makeChain(1, 0, genesis) - tester.newPeer("valid", eth60, hashes, blocks) - - wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil) - wrongblock.Td = blocks[hashes[0]].Td - hashes, blocks = makeChain(1, 0, wrongblock) - tester.newPeer("attack", eth60, hashes, blocks) - - // Try and sync with the malicious node and check that it fails - if err := tester.sync("attack", nil); err == nil { - t.Fatalf("block synchronization succeeded") - } - if tester.hasBlock(hashes[0]) { - t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]]) - } - // Try to synchronize with the valid chain and make sure it succeeds - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if !tester.hasBlock(tester.peerHashes["valid"][0]) { - t.Fatalf("tester didn't accept known-parent block: %v", tester.peerBlocks["valid"][hashes[0]]) - } -} - -// Tests that if a malicious peers keeps sending us repeating hashes, we don't -// loop indefinitely. -func TestRepeatingHashAttack60(t *testing.T) { // TODO: Is this thing valid?? - tester := newTester() - - // Create a valid chain, but drop the last link - hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", eth60, hashes, blocks) - tester.newPeer("attack", eth60, hashes[:len(hashes)-1], blocks) - - // Try and sync with the malicious node - errc := make(chan error) - go func() { - errc <- tester.sync("attack", nil) - }() - // Make sure that syncing returns and does so with a failure - select { - case <-time.After(time.Second): - t.Fatalf("synchronisation blocked") - case err := <-errc: - if err == nil { - t.Fatalf("synchronisation succeeded") - } - } - // Ensure that a valid chain can still pass sync - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests that if a malicious peers returns a non-existent block hash, it should -// eventually time out and the sync reattempted. -func TestNonExistingBlockAttack60(t *testing.T) { - tester := newTester() - - // Create a valid chain, but forge the last link - hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", eth60, hashes, blocks) - - hashes[len(hashes)/2] = common.Hash{} - tester.newPeer("attack", eth60, hashes, blocks) - - // Try and sync with the malicious node and check that it fails - if err := tester.sync("attack", nil); err != errPeersUnavailable { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) - } - // Ensure that a valid chain can still pass sync - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests that if a malicious peer is returning hashes in a weird order, that the -// sync throttler doesn't choke on them waiting for the valid blocks. -func TestInvalidHashOrderAttack60(t *testing.T) { - tester := newTester() - - // Create a valid long chain, but reverse some hashes within - hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) - tester.newPeer("valid", eth60, hashes, blocks) - - chunk1 := make([]common.Hash, blockCacheLimit) - chunk2 := make([]common.Hash, blockCacheLimit) - copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit]) - copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit]) - - copy(hashes[2*blockCacheLimit:], chunk1) - copy(hashes[blockCacheLimit:], chunk2) - tester.newPeer("attack", eth60, hashes, blocks) - - // Try and sync with the malicious node and check that it fails - if err := tester.sync("attack", nil); err != errInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) - } - // Ensure that a valid chain can still pass sync - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests that if a malicious peer makes up a random hash chain and tries to push -// indefinitely, it actually gets caught with it. -func TestMadeupHashChainAttack60(t *testing.T) { - tester := newTester() - blockSoftTTL = 100 * time.Millisecond - crossCheckCycle = 25 * time.Millisecond - - // Create a long chain of hashes without backing blocks - hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) - - randomHashes := make([]common.Hash, 1024*blockCacheLimit) - for i := range randomHashes { - rand.Read(randomHashes[i][:]) - } - - tester.newPeer("valid", eth60, hashes, blocks) - tester.newPeer("attack", eth60, randomHashes, nil) - - // Try and sync with the malicious node and check that it fails - if err := tester.sync("attack", nil); err != errCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) - } - // Ensure that a valid chain can still pass sync - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests that if a malicious peer makes up a random hash chain, and tries to push -// indefinitely, one hash at a time, it actually gets caught with it. The reason -// this is separate from the classical made up chain attack is that sending hashes -// one by one prevents reliable block/parent verification. -func TestMadeupHashChainDrippingAttack60(t *testing.T) { - // Create a random chain of hashes to drip - randomHashes := make([]common.Hash, 16*blockCacheLimit) - for i := range randomHashes { - rand.Read(randomHashes[i][:]) - } - randomHashes[len(randomHashes)-1] = genesis.Hash() - tester := newTester() - - // Try and sync with the attacker, one hash at a time - tester.maxHashFetch = 1 - tester.newPeer("attack", eth60, randomHashes, nil) - if err := tester.sync("attack", nil); err != errStallingPeer { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) - } -} - -// Tests that if a malicious peer makes up a random block chain, and tried to -// push indefinitely, it actually gets caught with it. -func TestMadeupBlockChainAttack60(t *testing.T) { - defaultBlockTTL := blockSoftTTL - defaultCrossCheckCycle := crossCheckCycle - - blockSoftTTL = 100 * time.Millisecond - crossCheckCycle = 25 * time.Millisecond - - // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes, blocks := makeChain(16*blockCacheLimit, 0, genesis) - gapped := make([]common.Hash, len(hashes)/2) - for i := 0; i < len(gapped); i++ { - gapped[i] = hashes[2*i] - } - // Try and sync with the malicious node and check that it fails - tester := newTester() - tester.newPeer("attack", eth60, gapped, blocks) - if err := tester.sync("attack", nil); err != errCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) - } - // Ensure that a valid chain can still pass sync - blockSoftTTL = defaultBlockTTL - crossCheckCycle = defaultCrossCheckCycle - - tester.newPeer("valid", eth60, hashes, blocks) - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests that if one/multiple malicious peers try to feed a banned blockchain to -// the downloader, it will not keep refetching the same chain indefinitely, but -// gradually block pieces of it, until its head is also blocked. -func TestBannedChainStarvationAttack60(t *testing.T) { - n := 8 * blockCacheLimit - fork := n/2 - 23 - hashes, forkHashes, blocks, forkBlocks := makeChainFork(n, fork, genesis) - - // Create the tester and ban the selected hash. - tester := newTester() - tester.downloader.banned.Add(forkHashes[fork-1]) - tester.newPeer("valid", eth60, hashes, blocks) - tester.newPeer("attack", eth60, forkHashes, forkBlocks) - - // Iteratively try to sync, and verify that the banned hash list grows until - // the head of the invalid chain is blocked too. - for banned := tester.downloader.banned.Size(); ; { - // Try to sync with the attacker, check hash chain failure - if err := tester.sync("attack", nil); err != errInvalidChain { - if tester.downloader.banned.Has(forkHashes[0]) && err == errBannedHead { - break - } - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) - } - // Check that the ban list grew with at least 1 new item, or all banned - bans := tester.downloader.banned.Size() - if bans < banned+1 { - t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1) - } - banned = bans - } - // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", eth60, forkHashes, forkBlocks); err != errBannedHead { - t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) - } - if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { - t.Fatalf("banned attacker registered: %v", peer) - } - // Ensure that a valid chain can still pass sync - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests that if a peer sends excessively many/large invalid chains that are -// gradually banned, it will have an upper limit on the consumed memory and also -// the origin bad hashes will not be evacuated. -func TestBannedChainMemoryExhaustionAttack60(t *testing.T) { - // Construct a banned chain with more chunks than the ban limit - n := 8 * blockCacheLimit - fork := n/2 - 23 - hashes, forkHashes, blocks, forkBlocks := makeChainFork(n, fork, genesis) - - // Create the tester and ban the root hash of the fork. - tester := newTester() - tester.downloader.banned.Add(forkHashes[fork-1]) - - // Reduce the test size a bit - defaultMaxBlockFetch := MaxBlockFetch - defaultMaxBannedHashes := maxBannedHashes - - MaxBlockFetch = 4 - maxBannedHashes = 256 - - tester.newPeer("valid", eth60, hashes, blocks) - tester.newPeer("attack", eth60, forkHashes, forkBlocks) - - // Iteratively try to sync, and verify that the banned hash list grows until - // the head of the invalid chain is blocked too. - for { - // Try to sync with the attacker, check hash chain failure - if err := tester.sync("attack", nil); err != errInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) - } - // Short circuit if the entire chain was banned. - if tester.downloader.banned.Has(forkHashes[0]) { - break - } - // Otherwise ensure we never exceed the memory allowance and the hard coded bans are untouched - if bans := tester.downloader.banned.Size(); bans > maxBannedHashes { - t.Fatalf("ban cap exceeded: have %v, want max %v", bans, maxBannedHashes) - } - for hash := range core.BadHashes { - if !tester.downloader.banned.Has(hash) { - t.Fatalf("hard coded ban evacuated: %x", hash) - } - } - } - // Ensure that a valid chain can still pass sync - MaxBlockFetch = defaultMaxBlockFetch - maxBannedHashes = defaultMaxBannedHashes - - if err := tester.sync("valid", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } -} - -// Tests a corner case (potential attack) where a peer delivers both good as well -// as unrequested blocks to a hash request. This may trigger a different code -// path than the fully correct or fully invalid delivery, potentially causing -// internal state problems -// -// No, don't delete this test, it actually did happen! -func TestOverlappingDeliveryAttack60(t *testing.T) { - // Create an arbitrary batch of blocks ( < cache-size not to block) - targetBlocks := blockCacheLimit - 23 - hashes, blocks := makeChain(targetBlocks, 0, genesis) - - // Register an attacker that always returns non-requested blocks too - tester := newTester() - tester.newPeer("attack", eth60, hashes, blocks) - - rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks - tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error { - // Add a non requested hash the screw the delivery (genesis should be fine) - return rawGetBlocks(append(request, hashes[0])) - } - // Test that synchronisation can complete, check for import success - if err := tester.sync("attack", nil); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - start := time.Now() - for len(tester.ownHashes) != len(hashes) && time.Since(start) < time.Second { - time.Sleep(50 * time.Millisecond) - } - if len(tester.ownHashes) != len(hashes) { - t.Fatalf("chain length mismatch: have %v, want %v", len(tester.ownHashes), len(hashes)) - } -} - // Tests that a peer advertising an high TD doesn't get to stall the downloader // afterwards by not sending any useful hashes. func TestHighTDStarvationAttack61(t *testing.T) { @@ -850,7 +474,7 @@ func TestHashAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, eth60, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, eth61, []common.Hash{genesis.Hash()}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -882,7 +506,7 @@ func TestBlockAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, eth60, []common.Hash{common.Hash{}}, nil); err != nil { + if err := tester.newPeer(id, eth61, []common.Hash{common.Hash{}}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { diff --git a/eth/handler_test.go b/eth/handler_test.go index 63c94faa1..6400d4e78 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -19,7 +19,6 @@ import ( // Tests that hashes can be retrieved from a remote chain by hashes in reverse // order. -func TestGetBlockHashes60(t *testing.T) { testGetBlockHashes(t, 60) } func TestGetBlockHashes61(t *testing.T) { testGetBlockHashes(t, 61) } func testGetBlockHashes(t *testing.T, protocol int) { @@ -63,7 +62,6 @@ func testGetBlockHashes(t *testing.T, protocol int) { // Tests that hashes can be retrieved from a remote chain by numbers in forward // order. -func TestGetBlockHashesFromNumber60(t *testing.T) { testGetBlockHashesFromNumber(t, 60) } func TestGetBlockHashesFromNumber61(t *testing.T) { testGetBlockHashesFromNumber(t, 61) } func testGetBlockHashesFromNumber(t *testing.T, protocol int) { @@ -104,7 +102,6 @@ func testGetBlockHashesFromNumber(t *testing.T, protocol int) { } // Tests that blocks can be retrieved from a remote chain based on their hashes. -func TestGetBlocks60(t *testing.T) { testGetBlocks(t, 60) } func TestGetBlocks61(t *testing.T) { testGetBlocks(t, 61) } func testGetBlocks(t *testing.T, protocol int) { diff --git a/eth/metrics.go b/eth/metrics.go index 778747210..21002094c 100644 --- a/eth/metrics.go +++ b/eth/metrics.go @@ -95,19 +95,19 @@ func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) { // Account for the data traffic packets, traffic := miscInPacketsMeter, miscInTrafficMeter switch { - case (rw.version == eth60 || rw.version == eth61) && msg.Code == BlockHashesMsg: + case rw.version < eth62 && msg.Code == BlockHashesMsg: packets, traffic = reqHashInPacketsMeter, reqHashInTrafficMeter - case (rw.version == eth60 || rw.version == eth61) && msg.Code == BlocksMsg: + case rw.version < eth62 && msg.Code == BlocksMsg: packets, traffic = reqBlockInPacketsMeter, reqBlockInTrafficMeter - case rw.version == eth62 && msg.Code == BlockHeadersMsg: + case rw.version >= eth62 && msg.Code == BlockHeadersMsg: packets, traffic = reqBlockInPacketsMeter, reqBlockInTrafficMeter - case rw.version == eth62 && msg.Code == BlockBodiesMsg: + case rw.version >= eth62 && msg.Code == BlockBodiesMsg: packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter - case rw.version == eth63 && msg.Code == NodeDataMsg: + case rw.version >= eth63 && msg.Code == NodeDataMsg: packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter - case rw.version == eth63 && msg.Code == ReceiptsMsg: + case rw.version >= eth63 && msg.Code == ReceiptsMsg: packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter case msg.Code == NewBlockHashesMsg: @@ -127,19 +127,19 @@ func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error { // Account for the data traffic packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter switch { - case (rw.version == eth60 || rw.version == eth61) && msg.Code == BlockHashesMsg: + case rw.version < eth62 && msg.Code == BlockHashesMsg: packets, traffic = reqHashOutPacketsMeter, reqHashOutTrafficMeter - case (rw.version == eth60 || rw.version == eth61) && msg.Code == BlocksMsg: + case rw.version < eth62 && msg.Code == BlocksMsg: packets, traffic = reqBlockOutPacketsMeter, reqBlockOutTrafficMeter - case rw.version == eth62 && msg.Code == BlockHeadersMsg: + case rw.version >= eth62 && msg.Code == BlockHeadersMsg: packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter - case rw.version == eth62 && msg.Code == BlockBodiesMsg: + case rw.version >= eth62 && msg.Code == BlockBodiesMsg: packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter - case rw.version == eth63 && msg.Code == NodeDataMsg: + case rw.version >= eth63 && msg.Code == NodeDataMsg: packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter - case rw.version == eth63 && msg.Code == ReceiptsMsg: + case rw.version >= eth63 && msg.Code == ReceiptsMsg: packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter case msg.Code == NewBlockHashesMsg: diff --git a/eth/protocol.go b/eth/protocol.go index c16223ccf..49f096a3b 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -28,7 +28,6 @@ import ( // Constants to match up protocol versions and messages const ( - eth60 = 60 eth61 = 61 eth62 = 62 eth63 = 63 @@ -36,10 +35,10 @@ const ( ) // Supported versions of the eth protocol (first is primary). -var ProtocolVersions = []uint{eth64, eth63, eth62, eth61, eth60} +var ProtocolVersions = []uint{eth64, eth63, eth62, eth61} // Number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{15, 12, 8, 9, 8} +var ProtocolLengths = []uint64{15, 12, 8, 9} const ( NetworkId = 1 @@ -48,17 +47,15 @@ const ( // eth protocol message codes const ( - // Protocol messages belonging to eth/60 - StatusMsg = 0x00 - NewBlockHashesMsg = 0x01 - TxMsg = 0x02 - GetBlockHashesMsg = 0x03 - BlockHashesMsg = 0x04 - GetBlocksMsg = 0x05 - BlocksMsg = 0x06 - NewBlockMsg = 0x07 - - // Protocol messages belonging to eth/61 (extension of eth/60) + // Protocol messages belonging to eth/61 + StatusMsg = 0x00 + NewBlockHashesMsg = 0x01 + TxMsg = 0x02 + GetBlockHashesMsg = 0x03 + BlockHashesMsg = 0x04 + GetBlocksMsg = 0x05 + BlocksMsg = 0x06 + NewBlockMsg = 0x07 GetBlockHashesFromNumberMsg = 0x08 // Protocol messages belonging to eth/62 (new protocol from scratch) diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 263088099..bc3b5acfc 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -38,7 +38,6 @@ func init() { var testAccount = crypto.NewKey(rand.Reader) // Tests that handshake failures are detected and reported correctly. -func TestStatusMsgErrors60(t *testing.T) { testStatusMsgErrors(t, 60) } func TestStatusMsgErrors61(t *testing.T) { testStatusMsgErrors(t, 61) } func TestStatusMsgErrors62(t *testing.T) { testStatusMsgErrors(t, 62) } func TestStatusMsgErrors63(t *testing.T) { testStatusMsgErrors(t, 63) } @@ -93,7 +92,6 @@ func testStatusMsgErrors(t *testing.T, protocol int) { } // This test checks that received transactions are added to the local pool. -func TestRecvTransactions60(t *testing.T) { testRecvTransactions(t, 60) } func TestRecvTransactions61(t *testing.T) { testRecvTransactions(t, 61) } func TestRecvTransactions62(t *testing.T) { testRecvTransactions(t, 62) } func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) } @@ -123,7 +121,6 @@ func testRecvTransactions(t *testing.T, protocol int) { } // This test checks that pending transactions are sent. -func TestSendTransactions60(t *testing.T) { testSendTransactions(t, 60) } func TestSendTransactions61(t *testing.T) { testSendTransactions(t, 61) } func TestSendTransactions62(t *testing.T) { testSendTransactions(t, 62) } func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) } |