aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go51
-rw-r--r--eth/downloader/downloader_test.go63
-rw-r--r--eth/downloader/queue.go2
3 files changed, 94 insertions, 22 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index d817b223c..fd588d2f3 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -15,8 +15,10 @@ import (
)
const (
- maxHashFetch = 512 // Amount of hashes to be fetched per chunk
- maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
+ MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling
+ MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request
+ MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
+
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
hashTTL = 5 * time.Second // Time it takes for a hash request to time out
)
@@ -28,10 +30,11 @@ var (
)
var (
- errLowTd = errors.New("peer's TD is too low")
+ errLowTd = errors.New("peers TD is too low")
ErrBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
ErrBadPeer = errors.New("action from bad peer ignored")
+ ErrStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active")
ErrPendingQueue = errors.New("pending items in queue")
ErrTimeout = errors.New("timeout")
@@ -60,13 +63,18 @@ type hashPack struct {
hashes []common.Hash
}
+type crossCheck struct {
+ expire time.Time
+ parent common.Hash
+}
+
type Downloader struct {
mux *event.TypeMux
mu sync.RWMutex
- queue *queue // Scheduler for selecting the hashes to download
- peers *peerSet // Set of active peers from which download can proceed
- checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
// Callbacks
hasBlock hashCheckFn
@@ -157,7 +165,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
- d.checks = make(map[common.Hash]time.Time)
+ d.checks = make(map[common.Hash]*crossCheck)
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
@@ -283,15 +291,22 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
return ErrBadPeer
}
if !done {
+ // Check that the peer is not stalling the sync
+ if len(inserts) < MinHashFetch {
+ return ErrStallingPeer
+ }
// Try and fetch a random block to verify the hash batch
// Skip the last hash as the cross check races with the next hash fetch
- if len(inserts) > 1 {
- cross := inserts[rand.Intn(len(inserts)-1)]
- glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross)
+ cross := rand.Intn(len(inserts) - 1)
+ origin, parent := inserts[cross], inserts[cross+1]
+ glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
- d.checks[cross] = time.Now().Add(blockTTL)
- active.getBlocks([]common.Hash{cross})
+ d.checks[origin] = &crossCheck{
+ expire: time.Now().Add(blockTTL),
+ parent: parent,
}
+ active.getBlocks([]common.Hash{origin})
+
// Also fetch a fresh
active.getHashes(head)
continue
@@ -310,8 +325,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
continue
}
block := blockPack.blocks[0]
- if _, ok := d.checks[block.Hash()]; ok {
- if !d.queue.Has(block.ParentHash()) {
+ if check, ok := d.checks[block.Hash()]; ok {
+ if block.ParentHash() != check.parent {
return ErrCrossCheckFailed
}
delete(d.checks, block.Hash())
@@ -319,8 +334,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
case <-crossTicker.C:
// Iterate over all the cross checks and fail the hash chain if they're not verified
- for hash, deadline := range d.checks {
- if time.Now().After(deadline) {
+ for hash, check := range d.checks {
+ if time.Now().After(check.expire) {
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
return ErrCrossCheckFailed
}
@@ -438,7 +453,7 @@ out:
}
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
- request := d.queue.Reserve(peer, maxBlockFetch)
+ request := d.queue.Reserve(peer, MaxBlockFetch)
if request == nil {
continue
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 19d64ac67..8b541d8b7 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -53,6 +53,8 @@ type downloadTester struct {
blocks map[common.Hash]*types.Block // Blocks associated with the hashes
chain []common.Hash // Block-chain being constructed
+ maxHashFetch int // Overrides the maximum number of retrieved hashes
+
t *testing.T
pcount int
done chan bool
@@ -133,8 +135,12 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
// getHashes retrieves a batch of hashes for reconstructing the chain.
func (dl *downloadTester) getHashes(head common.Hash) error {
+ limit := MaxHashFetch
+ if dl.maxHashFetch > 0 {
+ limit = dl.maxHashFetch
+ }
// Gather the next batch of hashes
- hashes := make([]common.Hash, 0, maxHashFetch)
+ hashes := make([]common.Hash, 0, limit)
for i, hash := range dl.hashes {
if hash == head {
i++
@@ -382,7 +388,7 @@ func TestRepeatingHashAttack(t *testing.T) {
// Make sure that syncing returns and does so with a failure
select {
- case <-time.After(100 * time.Millisecond):
+ case <-time.After(time.Second):
t.Fatalf("synchronisation blocked")
case err := <-errc:
if err == nil {
@@ -469,6 +475,23 @@ func TestMadeupHashChainAttack(t *testing.T) {
}
}
+// Tests that if a malicious peer makes up a random hash chain, and tries to push
+// indefinitely, one hash at a time, it actually gets caught with it. The reason
+// this is separate from the classical made up chain attack is that sending hashes
+// one by one prevents reliable block/parent verification.
+func TestMadeupHashChainDrippingAttack(t *testing.T) {
+ // Create a random chain of hashes to drip
+ hashes := createHashes(0, 16*blockCacheLimit)
+ tester := newTester(t, hashes, nil)
+
+ // Try and sync with the attacker, one hash at a time
+ tester.maxHashFetch = 1
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer)
+ }
+}
+
// Tests that if a malicious peer makes up a random block chain, and tried to
// push indefinitely, it actually gets caught with it.
func TestMadeupBlockChainAttack(t *testing.T) {
@@ -479,7 +502,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
crossCheckCycle = 25 * time.Millisecond
// Create a long chain of blocks and simulate an invalid chain by dropping every second
- hashes := createHashes(0, 32*blockCacheLimit)
+ hashes := createHashes(0, 16*blockCacheLimit)
blocks := createBlocksFromHashes(hashes)
gapped := make([]common.Hash, len(hashes)/2)
@@ -502,3 +525,37 @@ func TestMadeupBlockChainAttack(t *testing.T) {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
+
+// Advanced form of the above forged blockchain attack, where not only does the
+// attacker make up a valid hashes for random blocks, but also forges the block
+// parents to point to existing hashes.
+func TestMadeupParentBlockChainAttack(t *testing.T) {
+ defaultBlockTTL := blockTTL
+ defaultCrossCheckCycle := crossCheckCycle
+
+ blockTTL = 100 * time.Millisecond
+ crossCheckCycle = 25 * time.Millisecond
+
+ // Create a long chain of blocks and simulate an invalid chain by dropping every second
+ hashes := createHashes(0, 16*blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+ forges := createBlocksFromHashes(hashes)
+ for hash, block := range forges {
+ block.ParentHeaderHash = hash // Simulate pointing to already known hash
+ }
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, hashes, forges)
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ }
+ // Ensure that a valid chain can still pass sync
+ blockTTL = defaultBlockTTL
+ crossCheckCycle = defaultCrossCheckCycle
+
+ tester.blocks = blocks
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if _, err := tester.syncTake("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 13ec9a520..591a37773 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -17,7 +17,7 @@ import (
)
const (
- blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
+ blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
)
// fetchRequest is a currently running block retrieval operation.