aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go169
-rw-r--r--eth/downloader/downloader_test.go250
-rw-r--r--eth/downloader/queue.go33
3 files changed, 332 insertions, 120 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 616971f73..f9bd5a635 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -2,7 +2,7 @@ package downloader
import (
"errors"
- "fmt"
+ "math/rand"
"sync"
"sync/atomic"
"time"
@@ -15,29 +15,34 @@ import (
)
const (
- maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk
+ maxHashFetch = 512 // Amount of hashes to be fetched per chunk
+ maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
+ hashTTL = 5 * time.Second // Time it takes for a hash request to time out
)
var (
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
- blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
-
- errLowTd = errors.New("peer's TD is too low")
- ErrBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer's unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errNoPeers = errors.New("no peers to keep download active")
- ErrPendingQueue = errors.New("pending items in queue")
- ErrTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errBlockNumberOverflow = errors.New("received block which overflows")
- errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
- errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ blockTTL = 5 * time.Second // Time it takes for a block request to time out
+ crossCheckCycle = time.Second // Period after which to check for expired cross checks
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+)
+
+var (
+ errLowTd = errors.New("peer's TD is too low")
+ ErrBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ ErrBadPeer = errors.New("action from bad peer ignored")
+ errNoPeers = errors.New("no peers to keep download active")
+ ErrPendingQueue = errors.New("pending items in queue")
+ ErrTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ ErrInvalidChain = errors.New("retrieved hash chain is invalid")
+ ErrCrossCheckFailed = errors.New("block cross-check failed")
+ errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
+ errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
type hashCheckFn func(common.Hash) bool
@@ -58,9 +63,10 @@ type hashPack struct {
type Downloader struct {
mux *event.TypeMux
- mu sync.RWMutex
- queue *queue
- peers *peerSet
+ mu sync.RWMutex
+ queue *queue // Scheduler for selecting the hashes to download
+ peers *peerSet // Set of active peers from which download can proceed
+ checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain
// Callbacks
hasBlock hashCheckFn
@@ -153,6 +159,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
+ d.checks = make(map[common.Hash]time.Time)
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
@@ -177,7 +184,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
defer func() {
// reset on error
if err != nil {
- d.queue.Reset()
+ d.Cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -221,66 +228,98 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
start := time.Now()
- // Add the hash to the queue first
+ // Add the hash to the queue first, and start hash retrieval
d.queue.Insert([]common.Hash{h})
-
- // Get the first batch of hashes
p.getHashes(h)
var (
- failureResponseTimer = time.NewTimer(hashTtl)
- attemptedPeers = make(map[string]bool) // attempted peers will help with retries
- activePeer = p // active peer will help determine the current active peer
- hash common.Hash // common and last hash
+ active = p // active peer will help determine the current active peer
+ head = common.Hash{} // common and last hash
+
+ timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
+ attempted = make(map[string]bool) // attempted peers will help with retries
+ crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
)
- attemptedPeers[p.id] = true
+ defer crossTicker.Stop()
-out:
- for {
+ attempted[p.id] = true
+ for finished := false; !finished; {
select {
case <-d.cancelCh:
return errCancelHashFetch
+
case hashPack := <-d.hashCh:
// Make sure the active peer is giving us the hashes
- if hashPack.peerId != activePeer.id {
+ if hashPack.peerId != active.id {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
break
}
-
- failureResponseTimer.Reset(hashTtl)
+ timeout.Reset(hashTTL)
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
- glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id)
- d.queue.Reset()
-
+ glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
return errEmptyHashSet
}
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
done, index := false, 0
- for index, hash = range hashPack.hashes {
- if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil {
- glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
+ for index, head = range hashPack.hashes {
+ if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
+ glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
hashPack.hashes = hashPack.hashes[:index]
done = true
break
}
}
- d.queue.Insert(hashPack.hashes)
-
+ // Insert all the new hashes, but only continue if got something useful
+ inserts := d.queue.Insert(hashPack.hashes)
+ if len(inserts) == 0 && !done {
+ glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
+ return ErrBadPeer
+ }
if !done {
- activePeer.getHashes(hash)
+ // Try and fetch a random block to verify the hash batch
+ cross := inserts[rand.Intn(len(inserts))]
+ glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross)
+
+ d.checks[cross] = time.Now().Add(blockTTL)
+ active.getBlocks([]common.Hash{cross})
+
+ // Also fetch a fresh
+ active.getHashes(head)
continue
}
// We're done, allocate the download cache and proceed pulling the blocks
offset := 0
- if block := d.getBlock(hash); block != nil {
+ if block := d.getBlock(head); block != nil {
offset = int(block.NumberU64() + 1)
}
d.queue.Alloc(offset)
- break out
+ finished = true
- case <-failureResponseTimer.C:
+ case blockPack := <-d.blockCh:
+ // Cross check the block with the random verifications
+ if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
+ continue
+ }
+ block := blockPack.blocks[0]
+ if _, ok := d.checks[block.Hash()]; ok {
+ if !d.queue.Has(block.ParentHash()) {
+ return ErrCrossCheckFailed
+ }
+ delete(d.checks, block.Hash())
+ }
+
+ case <-crossTicker.C:
+ // Iterate over all the cross checks and fail the hash chain if they're not verified
+ for hash, deadline := range d.checks {
+ if time.Now().After(deadline) {
+ glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
+ return ErrCrossCheckFailed
+ }
+ }
+
+ case <-timeout.C:
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
var p *peer // p will be set if a peer can be found
@@ -288,21 +327,20 @@ out:
// already fetched hash list. This can't guarantee 100% correctness but does
// a fair job. This is always either correct or false incorrect.
for _, peer := range d.peers.AllPeers() {
- if d.queue.Has(peer.head) && !attemptedPeers[peer.id] {
+ if d.queue.Has(peer.head) && !attempted[peer.id] {
p = peer
break
}
}
// if all peers have been tried, abort the process entirely or if the hash is
// the zero hash.
- if p == nil || (hash == common.Hash{}) {
- d.queue.Reset()
+ if p == nil || (head == common.Hash{}) {
return ErrTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
- activePeer = p
- p.getHashes(hash)
+ active = p
+ p.getHashes(head)
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
}
}
@@ -325,12 +363,26 @@ out:
select {
case <-d.cancelCh:
return errCancelBlockFetch
+
case blockPack := <-d.blockCh:
+ // Short circuit if it's a stale cross check
+ if len(blockPack.blocks) == 1 {
+ block := blockPack.blocks[0]
+ if _, ok := d.checks[block.Hash()]; ok {
+ delete(d.checks, block.Hash())
+ continue
+ }
+ }
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
- // Deliver the received chunk of blocks, but drop the peer if invalid
+ // Deliver the received chunk of blocks
if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
+ if err == ErrInvalidChain {
+ // The hash chain is invalid (blocks are not ordered properly), abort
+ return err
+ }
+ // Peer did deliver, but some blocks were off, penalize
glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
peer.Demote()
break
@@ -348,7 +400,7 @@ out:
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
- badPeers := d.queue.Expire(blockTtl)
+ badPeers := d.queue.Expire(blockTTL)
for _, pid := range badPeers {
// XXX We could make use of a reputation system here ranking peers
// in their performance
@@ -361,7 +413,6 @@ out:
}
// After removing bad peers make sure we actually have sufficient peer left to keep downloading
if d.peers.Len() == 0 {
- d.queue.Reset()
return errNoPeers
}
// If there are unrequested hashes left start fetching
@@ -395,9 +446,7 @@ out:
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if d.queue.InFlight() == 0 {
- d.queue.Reset()
-
- return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending())
+ return errPeersUnavailable
}
} else if d.queue.InFlight() == 0 {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 50fe00d42..d55664314 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -23,25 +23,26 @@ func createHashes(start, amount int) (hashes []common.Hash) {
for i := range hashes[:len(hashes)-1] {
binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2))
}
-
return
}
-func createBlock(i int, prevHash, hash common.Hash) *types.Block {
+func createBlock(i int, parent, hash common.Hash) *types.Block {
header := &types.Header{Number: big.NewInt(int64(i))}
block := types.NewBlockWithHeader(header)
block.HeaderHash = hash
- block.ParentHeaderHash = prevHash
+ block.ParentHeaderHash = parent
return block
}
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
blocks := make(map[common.Hash]*types.Block)
-
- for i, hash := range hashes {
- blocks[hash] = createBlock(len(hashes)-i, knownHash, hash)
+ for i := 0; i < len(hashes); i++ {
+ parent := knownHash
+ if i < len(hashes)-1 {
+ parent = hashes[i+1]
+ }
+ blocks[hashes[i]] = createBlock(len(hashes)-i, parent, hashes[i])
}
-
return blocks
}
@@ -75,9 +76,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
return tester
}
-func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
+// sync is a simple wrapper around the downloader to start synchronisation and
+// block until it returns
+func (dl *downloadTester) sync(peerId string, head common.Hash) error {
dl.activePeerId = peerId
- return dl.downloader.Synchronise(peerId, hash)
+ return dl.downloader.Synchronise(peerId, head)
+}
+
+// syncTake is starts synchronising with a remote peer, but concurrently it also
+// starts fetching blocks that the downloader retrieved. IT blocks until both go
+// routines terminate.
+func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) {
+ // Start a block collector to take blocks as they become available
+ done := make(chan struct{})
+ took := []*types.Block{}
+ go func() {
+ for running := true; running; {
+ select {
+ case <-done:
+ running = false
+ default:
+ time.Sleep(time.Millisecond)
+ }
+ // Take a batch of blocks and accumulate
+ took = append(took, dl.downloader.TakeBlocks()...)
+ }
+ done <- struct{}{}
+ }()
+ // Start the downloading, sync the taker and return
+ err := dl.sync(peerId, head)
+
+ done <- struct{}{}
+ <-done
+
+ return took, err
}
func (dl *downloadTester) insertBlocks(blocks types.Blocks) {
@@ -99,18 +131,37 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
return dl.blocks[knownHash]
}
-func (dl *downloadTester) getHashes(hash common.Hash) error {
- dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes)
+// getHashes retrieves a batch of hashes for reconstructing the chain.
+func (dl *downloadTester) getHashes(head common.Hash) error {
+ // Gather the next batch of hashes
+ hashes := make([]common.Hash, 0, maxHashFetch)
+ for i, hash := range dl.hashes {
+ if hash == head {
+ i++
+ for len(hashes) < cap(hashes) && i < len(dl.hashes) {
+ hashes = append(hashes, dl.hashes[i])
+ i++
+ }
+ break
+ }
+ }
+ // Delay delivery a bit to allow attacks to unfold
+ id := dl.activePeerId
+ go func() {
+ time.Sleep(time.Millisecond)
+ dl.downloader.DeliverHashes(id, hashes)
+ }()
return nil
}
func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
return func(hashes []common.Hash) error {
- blocks := make([]*types.Block, len(hashes))
- for i, hash := range hashes {
- blocks[i] = dl.blocks[hash]
+ blocks := make([]*types.Block, 0, len(hashes))
+ for _, hash := range hashes {
+ if block, ok := dl.blocks[hash]; ok {
+ blocks = append(blocks, block)
+ }
}
-
go dl.downloader.DeliverBlocks(id, blocks)
return nil
@@ -134,7 +185,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash
func TestDownload(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
@@ -183,7 +234,7 @@ func TestMissing(t *testing.T) {
func TestTaking(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
@@ -224,7 +275,7 @@ func TestInactiveDownloader(t *testing.T) {
func TestCancel(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
@@ -250,7 +301,7 @@ func TestCancel(t *testing.T) {
func TestThrottling(t *testing.T) {
minDesiredPeerCount = 4
- blockTtl = 1 * time.Second
+ blockTTL = 1 * time.Second
targetBlocks := 16 * blockCacheLimit
hashes := createHashes(0, targetBlocks)
@@ -263,32 +314,7 @@ func TestThrottling(t *testing.T) {
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
// Concurrently download and take the blocks
- errc := make(chan error, 1)
- go func() {
- errc <- tester.sync("peer1", hashes[0])
- }()
-
- done := make(chan struct{})
- took := []*types.Block{}
- go func() {
- for running := true; running; {
- select {
- case <-done:
- running = false
- default:
- time.Sleep(time.Millisecond)
- }
- // Take a batch of blocks and accumulate
- took = append(took, tester.downloader.TakeBlocks()...)
- }
- done <- struct{}{}
- }()
-
- // Synchronise the two threads and verify
- err := <-errc
- done <- struct{}{}
- <-done
-
+ took, err := tester.syncTake("peer1", hashes[0])
if err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
@@ -336,3 +362,137 @@ func TestNonExistingParentAttack(t *testing.T) {
t.Fatalf("tester doesn't know about the origin hash")
}
}
+
+// Tests that if a malicious peers keeps sending us repeating hashes, we don't
+// loop indefinitely.
+func TestRepeatingHashAttack(t *testing.T) {
+ // Create a valid chain, but drop the last link
+ hashes := createHashes(0, blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+ forged := hashes[:len(hashes)-1]
+
+ // Try and sync with the malicious node
+ tester := newTester(t, forged, blocks)
+ tester.newPeer("attack", big.NewInt(10000), forged[0])
+
+ errc := make(chan error)
+ go func() {
+ errc <- tester.sync("attack", hashes[0])
+ }()
+
+ // Make sure that syncing returns and does so with a failure
+ select {
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("synchronisation blocked")
+ case err := <-errc:
+ if err == nil {
+ t.Fatalf("synchronisation succeeded")
+ }
+ }
+ // Ensure that a valid chain can still pass sync
+ tester.hashes = hashes
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if err := tester.sync("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
+
+// Tests that if a malicious peers returns a non-existent block hash, it should
+// eventually time out and the sync reattempted.
+func TestNonExistingBlockAttack(t *testing.T) {
+ // Create a valid chain, but forge the last link
+ hashes := createHashes(0, blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+ origin := hashes[len(hashes)/2]
+
+ hashes[len(hashes)/2] = unknownHash
+
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, hashes, blocks)
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable)
+ }
+ // Ensure that a valid chain can still pass sync
+ hashes[len(hashes)/2] = origin
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if err := tester.sync("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
+
+// Tests that if a malicious peer is returning hashes in a weird order, that the
+// sync throttler doesn't choke on them waiting for the valid blocks.
+func TestInvalidHashOrderAttack(t *testing.T) {
+ // Create a valid long chain, but reverse some hashes within
+ hashes := createHashes(0, 4*blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+
+ chunk1 := make([]common.Hash, blockCacheLimit)
+ chunk2 := make([]common.Hash, blockCacheLimit)
+ copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit])
+ copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit])
+
+ reverse := make([]common.Hash, len(hashes))
+ copy(reverse, hashes)
+ copy(reverse[2*blockCacheLimit:], chunk1)
+ copy(reverse[blockCacheLimit:], chunk2)
+
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, reverse, blocks)
+ tester.newPeer("attack", big.NewInt(10000), reverse[0])
+ if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
+ }
+ // Ensure that a valid chain can still pass sync
+ tester.hashes = hashes
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if _, err := tester.syncTake("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
+
+// Tests that if a malicious peer makes up a random hash chain and tries to push
+// indefinitely, it actually gets caught with it.
+func TestMadeupHashChainAttack(t *testing.T) {
+ blockTTL = 100 * time.Millisecond
+ crossCheckCycle = 25 * time.Millisecond
+
+ // Create a long chain of hashes without backing blocks
+ hashes := createHashes(0, 1024*blockCacheLimit)
+
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, hashes, nil)
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ }
+}
+
+// Tests that if a malicious peer makes up a random block chain, and tried to
+// push indefinitely, it actually gets caught with it.
+func TestMadeupBlockChainAttack(t *testing.T) {
+ blockTTL = 100 * time.Millisecond
+ crossCheckCycle = 25 * time.Millisecond
+
+ // Create a long chain of blocks and simulate an invalid chain by dropping every second
+ hashes := createHashes(0, 32*blockCacheLimit)
+ blocks := createBlocksFromHashes(hashes)
+
+ gapped := make([]common.Hash, len(hashes)/2)
+ for i := 0; i < len(gapped); i++ {
+ gapped[i] = hashes[2*i]
+ }
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, gapped, blocks)
+ tester.newPeer("attack", big.NewInt(10000), gapped[0])
+ if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ }
+ // Ensure that a valid chain can still pass sync
+ tester.hashes = hashes
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if _, err := tester.syncTake("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 6ad915757..13ec9a520 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -122,24 +122,28 @@ func (q *queue) Has(hash common.Hash) bool {
return false
}
-// Insert adds a set of hashes for the download queue for scheduling.
-func (q *queue) Insert(hashes []common.Hash) {
+// Insert adds a set of hashes for the download queue for scheduling, returning
+// the new hashes encountered.
+func (q *queue) Insert(hashes []common.Hash) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the hashes prioritized in the arrival order
- for i, hash := range hashes {
- index := q.hashCounter + i
-
+ inserts := make([]common.Hash, 0, len(hashes))
+ for _, hash := range hashes {
+ // Skip anything we already have
if old, ok := q.hashPool[hash]; ok {
glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
continue
}
- q.hashPool[hash] = index
- q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
+ // Update the counters and insert the hash
+ q.hashCounter = q.hashCounter + 1
+ inserts = append(inserts, hash)
+
+ q.hashPool[hash] = q.hashCounter
+ q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
}
- // Update the hash counter for the next batch of inserts
- q.hashCounter += len(hashes)
+ return inserts
}
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
@@ -296,18 +300,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
// Iterate over the downloaded blocks and add each of them
errs := make([]error, 0)
for _, block := range blocks {
- // Skip any blocks that fall outside the cache range
- index := int(block.NumberU64()) - q.blockOffset
- if index >= len(q.blockCache) || index < 0 {
- //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache))
- continue
- }
// Skip any blocks that were not requested
hash := block.Hash()
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested block %v", hash))
continue
}
+ // If a requested block falls out of the range, the hash chain is invalid
+ index := int(block.NumberU64()) - q.blockOffset
+ if index >= len(q.blockCache) || index < 0 {
+ return ErrInvalidChain
+ }
// Otherwise merge the block and mark the hash block
q.blockCache[index] = block