aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go44
1 files changed, 32 insertions, 12 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 903f043eb..a758410a5 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -1,3 +1,19 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
// Contains the block download scheduler to collect download tasks and schedule
// them in an ordered, and throttled way.
@@ -40,9 +56,9 @@ type queue struct {
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
- blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
- blockCache []*Block // Downloaded but not yet delivered blocks
- blockOffset int // Offset of the first cached block in the block-chain
+ blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes
+ blockCache []*Block // Downloaded but not yet delivered blocks
+ blockOffset uint64 // Offset of the first cached block in the block-chain
lock sync.RWMutex
}
@@ -53,7 +69,7 @@ func newQueue() *queue {
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
pendPool: make(map[string]*fetchRequest),
- blockPool: make(map[common.Hash]int),
+ blockPool: make(map[common.Hash]uint64),
blockCache: make([]*Block, blockCacheLimit),
}
}
@@ -69,7 +85,7 @@ func (q *queue) Reset() {
q.pendPool = make(map[string]*fetchRequest)
- q.blockPool = make(map[common.Hash]int)
+ q.blockPool = make(map[common.Hash]uint64)
q.blockOffset = 0
q.blockCache = make([]*Block, blockCacheLimit)
}
@@ -130,7 +146,7 @@ func (q *queue) Has(hash common.Hash) bool {
// Insert adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered.
-func (q *queue) Insert(hashes []common.Hash) []common.Hash {
+func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
@@ -147,7 +163,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash {
inserts = append(inserts, hash)
q.hashPool[hash] = q.hashCounter
- q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
+ if fifo {
+ q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first
+ } else {
+ q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
+ }
}
return inserts
}
@@ -175,7 +195,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block {
return nil
}
// Return the block if it's still available in the cache
- if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) {
+ if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) {
return q.blockCache[index-q.blockOffset]
}
return nil
@@ -202,7 +222,7 @@ func (q *queue) TakeBlocks() []*Block {
for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
q.blockCache[k] = nil
}
- q.blockOffset += len(blocks)
+ q.blockOffset += uint64(len(blocks))
return blocks
}
@@ -318,7 +338,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
continue
}
// If a requested block falls out of the range, the hash chain is invalid
- index := int(block.NumberU64()) - q.blockOffset
+ index := int(int64(block.NumberU64()) - int64(q.blockOffset))
if index >= len(q.blockCache) || index < 0 {
return errInvalidChain
}
@@ -329,7 +349,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
delete(request.Hashes, hash)
delete(q.hashPool, hash)
- q.blockPool[hash] = int(block.NumberU64())
+ q.blockPool[hash] = block.NumberU64()
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
@@ -346,7 +366,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
// Prepare configures the block cache offset to allow accepting inbound blocks.
-func (q *queue) Prepare(offset int) {
+func (q *queue) Prepare(offset uint64) {
q.lock.Lock()
defer q.lock.Unlock()