aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go96
-rw-r--r--eth/downloader/downloader_test.go45
-rw-r--r--eth/downloader/queue.go10
3 files changed, 109 insertions, 42 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 426da9beb..95dd37fd7 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -2,6 +2,7 @@ package downloader
import (
"errors"
+ "math/rand"
"sync"
"sync/atomic"
"time"
@@ -14,15 +15,19 @@ import (
)
const (
- maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk
+ maxHashFetch = 512 // Amount of hashes to be fetched per chunk
+ maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
+ hashTTL = 5 * time.Second // Time it takes for a hash request to time out
)
var (
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
- blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
+ blockTTL = 5 * time.Second // Time it takes for a block request to time out
+ crossCheckCycle = time.Second // Period after which to check for expired cross checks
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+)
+var (
errLowTd = errors.New("peer's TD is too low")
ErrBusy = errors.New("busy")
errUnknownPeer = errors.New("peer's unknown or unhealthy")
@@ -34,6 +39,7 @@ var (
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool")
ErrInvalidChain = errors.New("retrieved hash chain is invalid")
+ ErrCrossCheckFailed = errors.New("block cross-check failed")
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
errNoSyncActive = errors.New("no sync active")
@@ -220,46 +226,47 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
start := time.Now()
- // Add the hash to the queue first
+ // Add the hash to the queue first, and start hash retrieval
d.queue.Insert([]common.Hash{h})
-
- // Get the first batch of hashes
p.getHashes(h)
var (
- failureResponseTimer = time.NewTimer(hashTtl)
- attemptedPeers = make(map[string]bool) // attempted peers will help with retries
- activePeer = p // active peer will help determine the current active peer
- hash common.Hash // common and last hash
+ active = p // active peer will help determine the current active peer
+ head = common.Hash{} // common and last hash
+
+ timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
+ attempted = make(map[string]bool) // attempted peers will help with retries
+ crossChecks = make(map[common.Hash]time.Time) // running cross checks and their deadline
+ crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
)
- attemptedPeers[p.id] = true
+ defer crossTicker.Stop()
-out:
- for {
+ attempted[p.id] = true
+ for finished := false; !finished; {
select {
case <-d.cancelCh:
return errCancelHashFetch
+
case hashPack := <-d.hashCh:
// Make sure the active peer is giving us the hashes
- if hashPack.peerId != activePeer.id {
+ if hashPack.peerId != active.id {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
break
}
-
- failureResponseTimer.Reset(hashTtl)
+ timeout.Reset(hashTTL)
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
- glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id)
+ glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
d.queue.Reset()
return errEmptyHashSet
}
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
done, index := false, 0
- for index, hash = range hashPack.hashes {
- if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil {
- glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
+ for index, head = range hashPack.hashes {
+ if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
+ glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
hashPack.hashes = hashPack.hashes[:index]
done = true
break
@@ -267,25 +274,50 @@ out:
}
// Insert all the new hashes, but only continue if got something useful
inserts := d.queue.Insert(hashPack.hashes)
- if inserts == 0 && !done {
- glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", activePeer.id)
+ if len(inserts) == 0 && !done {
+ glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
d.queue.Reset()
return ErrBadPeer
}
if !done {
- activePeer.getHashes(hash)
+ // Try and fetch a random block to verify the hash batch
+ cross := inserts[rand.Intn(len(inserts))]
+ glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross)
+
+ crossChecks[cross] = time.Now().Add(blockTTL)
+ active.getBlocks([]common.Hash{cross})
+
+ // Also fetch a fresh
+ active.getHashes(head)
continue
}
// We're done, allocate the download cache and proceed pulling the blocks
offset := 0
- if block := d.getBlock(hash); block != nil {
+ if block := d.getBlock(head); block != nil {
offset = int(block.NumberU64() + 1)
}
d.queue.Alloc(offset)
- break out
+ finished = true
+
+ case blockPack := <-d.blockCh:
+ // Cross check the block with the random verifications
+ if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
+ continue
+ }
+ hash := blockPack.blocks[0].Hash()
+ delete(crossChecks, hash)
+
+ case <-crossTicker.C:
+ // Iterate over all the cross checks and fail the hash chain if they're not verified
+ for hash, deadline := range crossChecks {
+ if time.Now().After(deadline) {
+ glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
+ return ErrCrossCheckFailed
+ }
+ }
- case <-failureResponseTimer.C:
+ case <-timeout.C:
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
var p *peer // p will be set if a peer can be found
@@ -293,21 +325,21 @@ out:
// already fetched hash list. This can't guarantee 100% correctness but does
// a fair job. This is always either correct or false incorrect.
for _, peer := range d.peers.AllPeers() {
- if d.queue.Has(peer.head) && !attemptedPeers[peer.id] {
+ if d.queue.Has(peer.head) && !attempted[peer.id] {
p = peer
break
}
}
// if all peers have been tried, abort the process entirely or if the hash is
// the zero hash.
- if p == nil || (hash == common.Hash{}) {
+ if p == nil || (head == common.Hash{}) {
d.queue.Reset()
return ErrTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
- activePeer = p
- p.getHashes(hash)
+ active = p
+ p.getHashes(head)
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
}
}
@@ -359,7 +391,7 @@ out:
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
- badPeers := d.queue.Expire(blockTtl)
+ badPeers := d.queue.Expire(blockTTL)
for _, pid := range badPeers {
// XXX We could make use of a reputation system here ranking peers
// in their performance
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 4b8ee93d2..60dcc06cd 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -130,8 +130,23 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
return dl.blocks[knownHash]
}
-func (dl *downloadTester) getHashes(hash common.Hash) error {
- dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes)
+// getHashes retrieves a batch of hashes for reconstructing the chain.
+func (dl *downloadTester) getHashes(head common.Hash) error {
+ // Gather the next batch of hashes
+ hashes := make([]common.Hash, 0, maxHashFetch)
+ for i, hash := range dl.hashes {
+ if hash == head {
+ for len(hashes) < cap(hashes) && i < len(dl.hashes) {
+ hashes = append(hashes, dl.hashes[i])
+ i++
+ }
+ break
+ }
+ }
+ // Delay delivery a bit to allow attacks to unfold
+ time.Sleep(time.Millisecond)
+
+ dl.downloader.DeliverHashes(dl.activePeerId, hashes)
return nil
}
@@ -166,7 +181,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash
func TestDownload(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
@@ -215,7 +230,7 @@ func TestMissing(t *testing.T) {
func TestTaking(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
@@ -256,7 +271,7 @@ func TestInactiveDownloader(t *testing.T) {
func TestCancel(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
@@ -282,7 +297,7 @@ func TestCancel(t *testing.T) {
func TestThrottling(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 16 * blockCacheLimit
hashes := createHashes(0, targetBlocks)
@@ -429,3 +444,21 @@ func TestInvalidHashOrderAttack(t *testing.T) {
t.Fatalf("failed to synchronise blocks: %v", err)
}
}
+
+// Tests that if a malicious peer makes up a random hash chain and tries to push
+// indefinitely, it actually gets caught with it.
+func TestMadeupHashChainAttack(t *testing.T) {
+ blockTTL = 100 * time.Millisecond
+ crossCheckCycle = 25 * time.Millisecond
+
+ // Create a long chain of hashes without backing blocks
+ hashes := createHashes(0, 1024*blockCacheLimit)
+ hashes = hashes[:len(hashes)-1]
+
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, hashes, nil)
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index aa48c521a..13ec9a520 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -123,13 +123,13 @@ func (q *queue) Has(hash common.Hash) bool {
}
// Insert adds a set of hashes for the download queue for scheduling, returning
-// the number of new hashes encountered.
-func (q *queue) Insert(hashes []common.Hash) int {
+// the new hashes encountered.
+func (q *queue) Insert(hashes []common.Hash) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the hashes prioritized in the arrival order
- inserts := 0
+ inserts := make([]common.Hash, 0, len(hashes))
for _, hash := range hashes {
// Skip anything we already have
if old, ok := q.hashPool[hash]; ok {
@@ -137,7 +137,9 @@ func (q *queue) Insert(hashes []common.Hash) int {
continue
}
// Update the counters and insert the hash
- q.hashCounter, inserts = q.hashCounter+1, inserts+1
+ q.hashCounter = q.hashCounter + 1
+ inserts = append(inserts, hash)
+
q.hashPool[hash] = q.hashCounter
q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
}