aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go36
-rw-r--r--eth/downloader/downloader_test.go90
-rw-r--r--eth/downloader/queue.go11
3 files changed, 88 insertions, 49 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 30be0dde5..426da9beb 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -23,20 +23,20 @@ var (
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
- errLowTd = errors.New("peer's TD is too low")
- ErrBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer's unknown or unhealthy")
- ErrBadPeer = errors.New("action from bad peer ignored")
- errNoPeers = errors.New("no peers to keep download active")
- ErrPendingQueue = errors.New("pending items in queue")
- ErrTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errBlockNumberOverflow = errors.New("received block which overflows")
- errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
- errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errLowTd = errors.New("peer's TD is too low")
+ ErrBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ ErrBadPeer = errors.New("action from bad peer ignored")
+ errNoPeers = errors.New("no peers to keep download active")
+ ErrPendingQueue = errors.New("pending items in queue")
+ ErrTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ ErrInvalidChain = errors.New("retrieved hash chain is invalid")
+ errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
+ errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
type hashCheckFn func(common.Hash) bool
@@ -334,8 +334,14 @@ out:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
- // Deliver the received chunk of blocks, but drop the peer if invalid
+ // Deliver the received chunk of blocks
if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
+ if err == ErrInvalidChain {
+ // The hash chain is invalid (blocks are not ordered properly), abort
+ d.queue.Reset()
+ return err
+ }
+ // Peer did deliver, but some blocks were off, penalize
glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
peer.Demote()
break
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 7888bd1e0..4b8ee93d2 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -75,9 +75,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
return tester
}
-func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
+// sync is a simple wrapper around the downloader to start synchronisation and
+// block until it returns
+func (dl *downloadTester) sync(peerId string, head common.Hash) error {
dl.activePeerId = peerId
- return dl.downloader.Synchronise(peerId, hash)
+ return dl.downloader.Synchronise(peerId, head)
+}
+
+// syncTake is starts synchronising with a remote peer, but concurrently it also
+// starts fetching blocks that the downloader retrieved. IT blocks until both go
+// routines terminate.
+func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) {
+ // Start a block collector to take blocks as they become available
+ done := make(chan struct{})
+ took := []*types.Block{}
+ go func() {
+ for running := true; running; {
+ select {
+ case <-done:
+ running = false
+ default:
+ time.Sleep(time.Millisecond)
+ }
+ // Take a batch of blocks and accumulate
+ took = append(took, dl.downloader.TakeBlocks()...)
+ }
+ done <- struct{}{}
+ }()
+ // Start the downloading, sync the taker and return
+ err := dl.sync(peerId, head)
+
+ done <- struct{}{}
+ <-done
+
+ return took, err
}
func (dl *downloadTester) insertBlocks(blocks types.Blocks) {
@@ -264,32 +295,7 @@ func TestThrottling(t *testing.T) {
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
// Concurrently download and take the blocks
- errc := make(chan error, 1)
- go func() {
- errc <- tester.sync("peer1", hashes[0])
- }()
-
- done := make(chan struct{})
- took := []*types.Block{}
- go func() {
- for running := true; running; {
- select {
- case <-done:
- running = false
- default:
- time.Sleep(time.Millisecond)
- }
- // Take a batch of blocks and accumulate
- took = append(took, tester.downloader.TakeBlocks()...)
- }
- done <- struct{}{}
- }()
-
- // Synchronise the two threads and verify
- err := <-errc
- done <- struct{}{}
- <-done
-
+ took, err := tester.syncTake("peer1", hashes[0])
if err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
@@ -395,3 +401,31 @@ func TestNonExistingBlockAttack(t *testing.T) {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
+
+// Tests that if a malicious peer is returning hashes in a weird order, that the
+// sync throttler doesn't choke on them waiting for the valid blocks.
+func TestInvalidHashOrderAttack(t *testing.T) {
+ // Create a valid long chain, but reverse some hashes within
+ hashes := createHashes(0, 4*blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+
+ reverse := make([]common.Hash, len(hashes))
+ copy(reverse, hashes)
+
+ for i := len(hashes) / 4; i < 2*len(hashes)/4; i++ {
+ reverse[i], reverse[len(hashes)-i-1] = reverse[len(hashes)-i-1], reverse[i]
+ }
+
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, reverse, blocks)
+ tester.newPeer("attack", big.NewInt(10000), reverse[0])
+ if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
+ }
+ // Ensure that a valid chain can still pass sync
+ tester.hashes = hashes
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if _, err := tester.syncTake("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index fdea1f63f..aa48c521a 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -298,18 +298,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
// Iterate over the downloaded blocks and add each of them
errs := make([]error, 0)
for _, block := range blocks {
- // Skip any blocks that fall outside the cache range
- index := int(block.NumberU64()) - q.blockOffset
- if index >= len(q.blockCache) || index < 0 {
- //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache))
- continue
- }
// Skip any blocks that were not requested
hash := block.Hash()
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested block %v", hash))
continue
}
+ // If a requested block falls out of the range, the hash chain is invalid
+ index := int(block.NumberU64()) - q.blockOffset
+ if index >= len(q.blockCache) || index < 0 {
+ return ErrInvalidChain
+ }
// Otherwise merge the block and mark the hash block
q.blockCache[index] = block