aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go43
1 files changed, 20 insertions, 23 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 40749698c..13ec9a520 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -122,24 +122,28 @@ func (q *queue) Has(hash common.Hash) bool {
return false
}
-// Insert adds a set of hashes for the download queue for scheduling.
-func (q *queue) Insert(hashes []common.Hash) {
+// Insert adds a set of hashes for the download queue for scheduling, returning
+// the new hashes encountered.
+func (q *queue) Insert(hashes []common.Hash) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the hashes prioritized in the arrival order
- for i, hash := range hashes {
- index := q.hashCounter + i
-
+ inserts := make([]common.Hash, 0, len(hashes))
+ for _, hash := range hashes {
+ // Skip anything we already have
if old, ok := q.hashPool[hash]; ok {
glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
continue
}
- q.hashPool[hash] = index
- q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
+ // Update the counters and insert the hash
+ q.hashCounter = q.hashCounter + 1
+ inserts = append(inserts, hash)
+
+ q.hashPool[hash] = q.hashCounter
+ q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
}
- // Update the hash counter for the next batch of inserts
- q.hashCounter += len(hashes)
+ return inserts
}
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
@@ -172,17 +176,11 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block {
}
// TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
-// The head parameter is required to prevent a race condition where concurrent
-// takes may fail parent verifications.
-func (q *queue) TakeBlocks(head *types.Block) types.Blocks {
+func (q *queue) TakeBlocks() types.Blocks {
q.lock.Lock()
defer q.lock.Unlock()
- // Short circuit if the head block's different
- if len(q.blockCache) == 0 || q.blockCache[0] != head {
- return nil
- }
- // Otherwise accumulate all available blocks
+ // Accumulate all available blocks
var blocks types.Blocks
for _, block := range q.blockCache {
if block == nil {
@@ -302,18 +300,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
// Iterate over the downloaded blocks and add each of them
errs := make([]error, 0)
for _, block := range blocks {
- // Skip any blocks that fall outside the cache range
- index := int(block.NumberU64()) - q.blockOffset
- if index >= len(q.blockCache) || index < 0 {
- //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache))
- continue
- }
// Skip any blocks that were not requested
hash := block.Hash()
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested block %v", hash))
continue
}
+ // If a requested block falls out of the range, the hash chain is invalid
+ index := int(block.NumberU64()) - q.blockOffset
+ if index >= len(q.blockCache) || index < 0 {
+ return ErrInvalidChain
+ }
// Otherwise merge the block and mark the hash block
q.blockCache[index] = block