diff options
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 36 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 90 | ||||
-rw-r--r-- | eth/downloader/queue.go | 11 |
3 files changed, 88 insertions, 49 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 30be0dde5..426da9beb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -23,20 +23,20 @@ var ( minDesiredPeerCount = 5 // Amount of peers desired to start syncing blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out - errLowTd = errors.New("peer's TD is too low") - ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer's unknown or unhealthy") - ErrBadPeer = errors.New("action from bad peer ignored") - errNoPeers = errors.New("no peers to keep download active") - ErrPendingQueue = errors.New("pending items in queue") - ErrTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errBlockNumberOverflow = errors.New("received block which overflows") - errCancelHashFetch = errors.New("hash fetching cancelled (requested)") - errCancelBlockFetch = errors.New("block downloading cancelled (requested)") - errNoSyncActive = errors.New("no sync active") + errLowTd = errors.New("peer's TD is too low") + ErrBusy = errors.New("busy") + errUnknownPeer = errors.New("peer's unknown or unhealthy") + ErrBadPeer = errors.New("action from bad peer ignored") + errNoPeers = errors.New("no peers to keep download active") + ErrPendingQueue = errors.New("pending items in queue") + ErrTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + ErrInvalidChain = errors.New("retrieved hash chain is invalid") + errCancelHashFetch = errors.New("hash fetching cancelled (requested)") + errCancelBlockFetch = errors.New("block downloading cancelled (requested)") + errNoSyncActive = errors.New("no sync active") ) type hashCheckFn func(common.Hash) bool @@ -334,8 +334,14 @@ out: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks, but drop the peer if invalid + // Deliver the received chunk of blocks if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { + if err == ErrInvalidChain { + // The hash chain is invalid (blocks are not ordered properly), abort + d.queue.Reset() + return err + } + // Peer did deliver, but some blocks were off, penalize glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) peer.Demote() break diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7888bd1e0..4b8ee93d2 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,9 +75,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types return tester } -func (dl *downloadTester) sync(peerId string, hash common.Hash) error { +// sync is a simple wrapper around the downloader to start synchronisation and +// block until it returns +func (dl *downloadTester) sync(peerId string, head common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, hash) + return dl.downloader.Synchronise(peerId, head) +} + +// syncTake is starts synchronising with a remote peer, but concurrently it also +// starts fetching blocks that the downloader retrieved. IT blocks until both go +// routines terminate. +func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) { + // Start a block collector to take blocks as they become available + done := make(chan struct{}) + took := []*types.Block{} + go func() { + for running := true; running; { + select { + case <-done: + running = false + default: + time.Sleep(time.Millisecond) + } + // Take a batch of blocks and accumulate + took = append(took, dl.downloader.TakeBlocks()...) + } + done <- struct{}{} + }() + // Start the downloading, sync the taker and return + err := dl.sync(peerId, head) + + done <- struct{}{} + <-done + + return took, err } func (dl *downloadTester) insertBlocks(blocks types.Blocks) { @@ -264,32 +295,7 @@ func TestThrottling(t *testing.T) { tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) // Concurrently download and take the blocks - errc := make(chan error, 1) - go func() { - errc <- tester.sync("peer1", hashes[0]) - }() - - done := make(chan struct{}) - took := []*types.Block{} - go func() { - for running := true; running; { - select { - case <-done: - running = false - default: - time.Sleep(time.Millisecond) - } - // Take a batch of blocks and accumulate - took = append(took, tester.downloader.TakeBlocks()...) - } - done <- struct{}{} - }() - - // Synchronise the two threads and verify - err := <-errc - done <- struct{}{} - <-done - + took, err := tester.syncTake("peer1", hashes[0]) if err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -395,3 +401,31 @@ func TestNonExistingBlockAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Tests that if a malicious peer is returning hashes in a weird order, that the +// sync throttler doesn't choke on them waiting for the valid blocks. +func TestInvalidHashOrderAttack(t *testing.T) { + // Create a valid long chain, but reverse some hashes within + hashes := createHashes(0, 4*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + + reverse := make([]common.Hash, len(hashes)) + copy(reverse, hashes) + + for i := len(hashes) / 4; i < 2*len(hashes)/4; i++ { + reverse[i], reverse[len(hashes)-i-1] = reverse[len(hashes)-i-1], reverse[i] + } + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, reverse, blocks) + tester.newPeer("attack", big.NewInt(10000), reverse[0]) + if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index fdea1f63f..aa48c521a 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -298,18 +298,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // Iterate over the downloaded blocks and add each of them errs := make([]error, 0) for _, block := range blocks { - // Skip any blocks that fall outside the cache range - index := int(block.NumberU64()) - q.blockOffset - if index >= len(q.blockCache) || index < 0 { - //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache)) - continue - } // Skip any blocks that were not requested hash := block.Hash() if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested block %v", hash)) continue } + // If a requested block falls out of the range, the hash chain is invalid + index := int(block.NumberU64()) - q.blockOffset + if index >= len(q.blockCache) || index < 0 { + return ErrInvalidChain + } // Otherwise merge the block and mark the hash block q.blockCache[index] = block |