diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-05-15 21:38:12 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-05-15 21:38:12 +0800 |
commit | 7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5 (patch) | |
tree | 6be0a8967229d27b6262cea90bb7d54d76670eb9 /eth/downloader | |
parent | c1f0d40e34a80f4453a9a54f90e2d4551c3bdb05 (diff) | |
parent | 5c1a7b965ca7901d3b185d75205419b87163a4fa (diff) | |
download | go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.tar go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.tar.gz go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.tar.bz2 go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.tar.lz go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.tar.xz go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.tar.zst go-tangerine-7d71a75d7715f97a86e0bd2e5aa9ac149d0ee4b5.zip |
Merge pull request #988 from karalabe/fix-downloader-vulnerabilities
Fix downloader vulnerabilities
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 169 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 250 | ||||
-rw-r--r-- | eth/downloader/queue.go | 33 |
3 files changed, 332 insertions, 120 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 616971f73..f9bd5a635 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -2,7 +2,7 @@ package downloader import ( "errors" - "fmt" + "math/rand" "sync" "sync/atomic" "time" @@ -15,29 +15,34 @@ import ( ) const ( - maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk + maxHashFetch = 512 // Amount of hashes to be fetched per chunk + maxBlockFetch = 128 // Amount of blocks to be fetched per chunk peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out + hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) var ( - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out - - errLowTd = errors.New("peer's TD is too low") - ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer's unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errNoPeers = errors.New("no peers to keep download active") - ErrPendingQueue = errors.New("pending items in queue") - ErrTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errBlockNumberOverflow = errors.New("received block which overflows") - errCancelHashFetch = errors.New("hash fetching cancelled (requested)") - errCancelBlockFetch = errors.New("block downloading cancelled (requested)") - errNoSyncActive = errors.New("no sync active") + blockTTL = 5 * time.Second // Time it takes for a block request to time out + crossCheckCycle = time.Second // Period after which to check for expired cross checks + minDesiredPeerCount = 5 // Amount of peers desired to start syncing +) + +var ( + errLowTd = errors.New("peer's TD is too low") + ErrBusy = errors.New("busy") + errUnknownPeer = errors.New("peer's unknown or unhealthy") + ErrBadPeer = errors.New("action from bad peer ignored") + errNoPeers = errors.New("no peers to keep download active") + ErrPendingQueue = errors.New("pending items in queue") + ErrTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + ErrInvalidChain = errors.New("retrieved hash chain is invalid") + ErrCrossCheckFailed = errors.New("block cross-check failed") + errCancelHashFetch = errors.New("hash fetching cancelled (requested)") + errCancelBlockFetch = errors.New("block downloading cancelled (requested)") + errNoSyncActive = errors.New("no sync active") ) type hashCheckFn func(common.Hash) bool @@ -58,9 +63,10 @@ type hashPack struct { type Downloader struct { mux *event.TypeMux - mu sync.RWMutex - queue *queue - peers *peerSet + mu sync.RWMutex + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain // Callbacks hasBlock hashCheckFn @@ -153,6 +159,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() d.peers.Reset() + d.checks = make(map[common.Hash]time.Time) // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) @@ -177,7 +184,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { defer func() { // reset on error if err != nil { - d.queue.Reset() + d.Cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -221,66 +228,98 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { start := time.Now() - // Add the hash to the queue first + // Add the hash to the queue first, and start hash retrieval d.queue.Insert([]common.Hash{h}) - - // Get the first batch of hashes p.getHashes(h) var ( - failureResponseTimer = time.NewTimer(hashTtl) - attemptedPeers = make(map[string]bool) // attempted peers will help with retries - activePeer = p // active peer will help determine the current active peer - hash common.Hash // common and last hash + active = p // active peer will help determine the current active peer + head = common.Hash{} // common and last hash + + timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer + attempted = make(map[string]bool) // attempted peers will help with retries + crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks ) - attemptedPeers[p.id] = true + defer crossTicker.Stop() -out: - for { + attempted[p.id] = true + for finished := false; !finished; { select { case <-d.cancelCh: return errCancelHashFetch + case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes - if hashPack.peerId != activePeer.id { + if hashPack.peerId != active.id { glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) break } - - failureResponseTimer.Reset(hashTtl) + timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id) - d.queue.Reset() - + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) return errEmptyHashSet } // Determine if we're done fetching hashes (queue up all pending), and continue if not done done, index := false, 0 - for index, hash = range hashPack.hashes { - if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) + for index, head = range hashPack.hashes { + if d.hasBlock(head) || d.queue.GetBlock(head) != nil { + glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break } } - d.queue.Insert(hashPack.hashes) - + // Insert all the new hashes, but only continue if got something useful + inserts := d.queue.Insert(hashPack.hashes) + if len(inserts) == 0 && !done { + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) + return ErrBadPeer + } if !done { - activePeer.getHashes(hash) + // Try and fetch a random block to verify the hash batch + cross := inserts[rand.Intn(len(inserts))] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + + d.checks[cross] = time.Now().Add(blockTTL) + active.getBlocks([]common.Hash{cross}) + + // Also fetch a fresh + active.getHashes(head) continue } // We're done, allocate the download cache and proceed pulling the blocks offset := 0 - if block := d.getBlock(hash); block != nil { + if block := d.getBlock(head); block != nil { offset = int(block.NumberU64() + 1) } d.queue.Alloc(offset) - break out + finished = true - case <-failureResponseTimer.C: + case blockPack := <-d.blockCh: + // Cross check the block with the random verifications + if blockPack.peerId != active.id || len(blockPack.blocks) != 1 { + continue + } + block := blockPack.blocks[0] + if _, ok := d.checks[block.Hash()]; ok { + if !d.queue.Has(block.ParentHash()) { + return ErrCrossCheckFailed + } + delete(d.checks, block.Hash()) + } + + case <-crossTicker.C: + // Iterate over all the cross checks and fail the hash chain if they're not verified + for hash, deadline := range d.checks { + if time.Now().After(deadline) { + glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) + return ErrCrossCheckFailed + } + } + + case <-timeout.C: glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) var p *peer // p will be set if a peer can be found @@ -288,21 +327,20 @@ out: // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. for _, peer := range d.peers.AllPeers() { - if d.queue.Has(peer.head) && !attemptedPeers[peer.id] { + if d.queue.Has(peer.head) && !attempted[peer.id] { p = peer break } } // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. - if p == nil || (hash == common.Hash{}) { - d.queue.Reset() + if p == nil || (head == common.Hash{}) { return ErrTimeout } // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. - activePeer = p - p.getHashes(hash) + active = p + p.getHashes(head) glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) } } @@ -325,12 +363,26 @@ out: select { case <-d.cancelCh: return errCancelBlockFetch + case blockPack := <-d.blockCh: + // Short circuit if it's a stale cross check + if len(blockPack.blocks) == 1 { + block := blockPack.blocks[0] + if _, ok := d.checks[block.Hash()]; ok { + delete(d.checks, block.Hash()) + continue + } + } // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks, but drop the peer if invalid + // Deliver the received chunk of blocks if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { + if err == ErrInvalidChain { + // The hash chain is invalid (blocks are not ordered properly), abort + return err + } + // Peer did deliver, but some blocks were off, penalize glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) peer.Demote() break @@ -348,7 +400,7 @@ out: // that badly or poorly behave are removed from the peer set (not banned). // Bad peers are excluded from the available peer set and therefor won't be // reused. XXX We could re-introduce peers after X time. - badPeers := d.queue.Expire(blockTtl) + badPeers := d.queue.Expire(blockTTL) for _, pid := range badPeers { // XXX We could make use of a reputation system here ranking peers // in their performance @@ -361,7 +413,6 @@ out: } // After removing bad peers make sure we actually have sufficient peer left to keep downloading if d.peers.Len() == 0 { - d.queue.Reset() return errNoPeers } // If there are unrequested hashes left start fetching @@ -395,9 +446,7 @@ out: // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error if d.queue.InFlight() == 0 { - d.queue.Reset() - - return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending()) + return errPeersUnavailable } } else if d.queue.InFlight() == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 50fe00d42..d55664314 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -23,25 +23,26 @@ func createHashes(start, amount int) (hashes []common.Hash) { for i := range hashes[:len(hashes)-1] { binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2)) } - return } -func createBlock(i int, prevHash, hash common.Hash) *types.Block { +func createBlock(i int, parent, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) block.HeaderHash = hash - block.ParentHeaderHash = prevHash + block.ParentHeaderHash = parent return block } func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { blocks := make(map[common.Hash]*types.Block) - - for i, hash := range hashes { - blocks[hash] = createBlock(len(hashes)-i, knownHash, hash) + for i := 0; i < len(hashes); i++ { + parent := knownHash + if i < len(hashes)-1 { + parent = hashes[i+1] + } + blocks[hashes[i]] = createBlock(len(hashes)-i, parent, hashes[i]) } - return blocks } @@ -75,9 +76,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types return tester } -func (dl *downloadTester) sync(peerId string, hash common.Hash) error { +// sync is a simple wrapper around the downloader to start synchronisation and +// block until it returns +func (dl *downloadTester) sync(peerId string, head common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, hash) + return dl.downloader.Synchronise(peerId, head) +} + +// syncTake is starts synchronising with a remote peer, but concurrently it also +// starts fetching blocks that the downloader retrieved. IT blocks until both go +// routines terminate. +func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) { + // Start a block collector to take blocks as they become available + done := make(chan struct{}) + took := []*types.Block{} + go func() { + for running := true; running; { + select { + case <-done: + running = false + default: + time.Sleep(time.Millisecond) + } + // Take a batch of blocks and accumulate + took = append(took, dl.downloader.TakeBlocks()...) + } + done <- struct{}{} + }() + // Start the downloading, sync the taker and return + err := dl.sync(peerId, head) + + done <- struct{}{} + <-done + + return took, err } func (dl *downloadTester) insertBlocks(blocks types.Blocks) { @@ -99,18 +131,37 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.blocks[knownHash] } -func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes) +// getHashes retrieves a batch of hashes for reconstructing the chain. +func (dl *downloadTester) getHashes(head common.Hash) error { + // Gather the next batch of hashes + hashes := make([]common.Hash, 0, maxHashFetch) + for i, hash := range dl.hashes { + if hash == head { + i++ + for len(hashes) < cap(hashes) && i < len(dl.hashes) { + hashes = append(hashes, dl.hashes[i]) + i++ + } + break + } + } + // Delay delivery a bit to allow attacks to unfold + id := dl.activePeerId + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, hashes) + }() return nil } func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { return func(hashes []common.Hash) error { - blocks := make([]*types.Block, len(hashes)) - for i, hash := range hashes { - blocks[i] = dl.blocks[hash] + blocks := make([]*types.Block, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := dl.blocks[hash]; ok { + blocks = append(blocks, block) + } } - go dl.downloader.DeliverBlocks(id, blocks) return nil @@ -134,7 +185,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash func TestDownload(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -183,7 +234,7 @@ func TestMissing(t *testing.T) { func TestTaking(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -224,7 +275,7 @@ func TestInactiveDownloader(t *testing.T) { func TestCancel(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 1000 hashes := createHashes(0, targetBlocks) @@ -250,7 +301,7 @@ func TestCancel(t *testing.T) { func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 - blockTtl = 1 * time.Second + blockTTL = 1 * time.Second targetBlocks := 16 * blockCacheLimit hashes := createHashes(0, targetBlocks) @@ -263,32 +314,7 @@ func TestThrottling(t *testing.T) { tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{}) // Concurrently download and take the blocks - errc := make(chan error, 1) - go func() { - errc <- tester.sync("peer1", hashes[0]) - }() - - done := make(chan struct{}) - took := []*types.Block{} - go func() { - for running := true; running; { - select { - case <-done: - running = false - default: - time.Sleep(time.Millisecond) - } - // Take a batch of blocks and accumulate - took = append(took, tester.downloader.TakeBlocks()...) - } - done <- struct{}{} - }() - - // Synchronise the two threads and verify - err := <-errc - done <- struct{}{} - <-done - + took, err := tester.syncTake("peer1", hashes[0]) if err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -336,3 +362,137 @@ func TestNonExistingParentAttack(t *testing.T) { t.Fatalf("tester doesn't know about the origin hash") } } + +// Tests that if a malicious peers keeps sending us repeating hashes, we don't +// loop indefinitely. +func TestRepeatingHashAttack(t *testing.T) { + // Create a valid chain, but drop the last link + hashes := createHashes(0, blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + forged := hashes[:len(hashes)-1] + + // Try and sync with the malicious node + tester := newTester(t, forged, blocks) + tester.newPeer("attack", big.NewInt(10000), forged[0]) + + errc := make(chan error) + go func() { + errc <- tester.sync("attack", hashes[0]) + }() + + // Make sure that syncing returns and does so with a failure + select { + case <-time.After(100 * time.Millisecond): + t.Fatalf("synchronisation blocked") + case err := <-errc: + if err == nil { + t.Fatalf("synchronisation succeeded") + } + } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} + +// Tests that if a malicious peers returns a non-existent block hash, it should +// eventually time out and the sync reattempted. +func TestNonExistingBlockAttack(t *testing.T) { + // Create a valid chain, but forge the last link + hashes := createHashes(0, blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + origin := hashes[len(hashes)/2] + + hashes[len(hashes)/2] = unknownHash + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, blocks) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) + } + // Ensure that a valid chain can still pass sync + hashes[len(hashes)/2] = origin + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} + +// Tests that if a malicious peer is returning hashes in a weird order, that the +// sync throttler doesn't choke on them waiting for the valid blocks. +func TestInvalidHashOrderAttack(t *testing.T) { + // Create a valid long chain, but reverse some hashes within + hashes := createHashes(0, 4*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + + chunk1 := make([]common.Hash, blockCacheLimit) + chunk2 := make([]common.Hash, blockCacheLimit) + copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit]) + copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit]) + + reverse := make([]common.Hash, len(hashes)) + copy(reverse, hashes) + copy(reverse[2*blockCacheLimit:], chunk1) + copy(reverse[blockCacheLimit:], chunk2) + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, reverse, blocks) + tester.newPeer("attack", big.NewInt(10000), reverse[0]) + if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} + +// Tests that if a malicious peer makes up a random hash chain and tries to push +// indefinitely, it actually gets caught with it. +func TestMadeupHashChainAttack(t *testing.T) { + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of hashes without backing blocks + hashes := createHashes(0, 1024*blockCacheLimit) + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, nil) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } +} + +// Tests that if a malicious peer makes up a random block chain, and tried to +// push indefinitely, it actually gets caught with it. +func TestMadeupBlockChainAttack(t *testing.T) { + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of blocks and simulate an invalid chain by dropping every second + hashes := createHashes(0, 32*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + + gapped := make([]common.Hash, len(hashes)/2) + for i := 0; i < len(gapped); i++ { + gapped[i] = hashes[2*i] + } + // Try and sync with the malicious node and check that it fails + tester := newTester(t, gapped, blocks) + tester.newPeer("attack", big.NewInt(10000), gapped[0]) + if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } + // Ensure that a valid chain can still pass sync + tester.hashes = hashes + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 6ad915757..13ec9a520 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -122,24 +122,28 @@ func (q *queue) Has(hash common.Hash) bool { return false } -// Insert adds a set of hashes for the download queue for scheduling. -func (q *queue) Insert(hashes []common.Hash) { +// Insert adds a set of hashes for the download queue for scheduling, returning +// the new hashes encountered. +func (q *queue) Insert(hashes []common.Hash) []common.Hash { q.lock.Lock() defer q.lock.Unlock() // Insert all the hashes prioritized in the arrival order - for i, hash := range hashes { - index := q.hashCounter + i - + inserts := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + // Skip anything we already have if old, ok := q.hashPool[hash]; ok { glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old) continue } - q.hashPool[hash] = index - q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first + // Update the counters and insert the hash + q.hashCounter = q.hashCounter + 1 + inserts = append(inserts, hash) + + q.hashPool[hash] = q.hashCounter + q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first } - // Update the hash counter for the next batch of inserts - q.hashCounter += len(hashes) + return inserts } // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't @@ -296,18 +300,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // Iterate over the downloaded blocks and add each of them errs := make([]error, 0) for _, block := range blocks { - // Skip any blocks that fall outside the cache range - index := int(block.NumberU64()) - q.blockOffset - if index >= len(q.blockCache) || index < 0 { - //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache)) - continue - } // Skip any blocks that were not requested hash := block.Hash() if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested block %v", hash)) continue } + // If a requested block falls out of the range, the hash chain is invalid + index := int(block.NumberU64()) - q.blockOffset + if index >= len(q.blockCache) || index < 0 { + return ErrInvalidChain + } // Otherwise merge the block and mark the hash block q.blockCache[index] = block |