aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go28
1 files changed, 16 insertions, 12 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 903f043eb..b24ce42e8 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -40,9 +40,9 @@ type queue struct {
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
- blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
- blockCache []*Block // Downloaded but not yet delivered blocks
- blockOffset int // Offset of the first cached block in the block-chain
+ blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes
+ blockCache []*Block // Downloaded but not yet delivered blocks
+ blockOffset uint64 // Offset of the first cached block in the block-chain
lock sync.RWMutex
}
@@ -53,7 +53,7 @@ func newQueue() *queue {
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
pendPool: make(map[string]*fetchRequest),
- blockPool: make(map[common.Hash]int),
+ blockPool: make(map[common.Hash]uint64),
blockCache: make([]*Block, blockCacheLimit),
}
}
@@ -69,7 +69,7 @@ func (q *queue) Reset() {
q.pendPool = make(map[string]*fetchRequest)
- q.blockPool = make(map[common.Hash]int)
+ q.blockPool = make(map[common.Hash]uint64)
q.blockOffset = 0
q.blockCache = make([]*Block, blockCacheLimit)
}
@@ -130,7 +130,7 @@ func (q *queue) Has(hash common.Hash) bool {
// Insert adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered.
-func (q *queue) Insert(hashes []common.Hash) []common.Hash {
+func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
@@ -147,7 +147,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash {
inserts = append(inserts, hash)
q.hashPool[hash] = q.hashCounter
- q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
+ if fifo {
+ q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first
+ } else {
+ q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
+ }
}
return inserts
}
@@ -175,7 +179,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block {
return nil
}
// Return the block if it's still available in the cache
- if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) {
+ if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) {
return q.blockCache[index-q.blockOffset]
}
return nil
@@ -202,7 +206,7 @@ func (q *queue) TakeBlocks() []*Block {
for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
q.blockCache[k] = nil
}
- q.blockOffset += len(blocks)
+ q.blockOffset += uint64(len(blocks))
return blocks
}
@@ -318,7 +322,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
continue
}
// If a requested block falls out of the range, the hash chain is invalid
- index := int(block.NumberU64()) - q.blockOffset
+ index := int(int64(block.NumberU64()) - int64(q.blockOffset))
if index >= len(q.blockCache) || index < 0 {
return errInvalidChain
}
@@ -329,7 +333,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
delete(request.Hashes, hash)
delete(q.hashPool, hash)
- q.blockPool[hash] = int(block.NumberU64())
+ q.blockPool[hash] = block.NumberU64()
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
@@ -346,7 +350,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
// Prepare configures the block cache offset to allow accepting inbound blocks.
-func (q *queue) Prepare(offset int) {
+func (q *queue) Prepare(offset uint64) {
q.lock.Lock()
defer q.lock.Unlock()