aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/downloader/downloader.go40
-rw-r--r--eth/downloader/queue.go26
2 files changed, 32 insertions, 34 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 608acf499..25b251112 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -346,13 +346,28 @@ out:
d.peers.setState(blockPack.peerId, idleState)
}
case <-ticker.C:
- // after removing bad peers make sure we actually have sufficient peer left to keep downloading
+ // Check for bad peers. Bad peers may indicate a peer not responding
+ // to a `getBlocks` message. A timeout of 5 seconds is set. Peers
+ // that badly or poorly behave are removed from the peer set (not banned).
+ // Bad peers are excluded from the available peer set and therefor won't be
+ // reused. XXX We could re-introduce peers after X time.
+ badPeers := d.queue.Expire(blockTtl)
+ for _, pid := range badPeers {
+ // XXX We could make use of a reputation system here ranking peers
+ // in their performance
+ // 1) Time for them to respond;
+ // 2) Measure their speed;
+ // 3) Amount and availability.
+ if peer := d.peers[pid]; peer != nil {
+ peer.demote()
+ peer.reset()
+ }
+ }
+ // After removing bad peers make sure we actually have sufficient peer left to keep downloading
if len(d.peers) == 0 {
d.queue.Reset()
-
return errNoPeers
}
-
// If there are unrequested hashes left start fetching
// from the available peers.
if d.queue.Pending() > 0 {
@@ -392,25 +407,6 @@ out:
// safely assume we're done. Another part of the process will check
// for parent errors and will re-request anything that's missing
break out
- } else {
- // Check for bad peers. Bad peers may indicate a peer not responding
- // to a `getBlocks` message. A timeout of 5 seconds is set. Peers
- // that badly or poorly behave are removed from the peer set (not banned).
- // Bad peers are excluded from the available peer set and therefor won't be
- // reused. XXX We could re-introduce peers after X time.
- badPeers := d.queue.Expire(blockTtl)
- for _, pid := range badPeers {
- // XXX We could make use of a reputation system here ranking peers
- // in their performance
- // 1) Time for them to respond;
- // 2) Measure their speed;
- // 3) Amount and availability.
- if peer := d.peers[pid]; peer != nil {
- peer.demote()
- peer.reset()
- }
- }
-
}
}
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index eae567052..d849d4d68 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -12,7 +12,7 @@ import (
)
const (
- blockCacheLimit = 4096 // Maximum number of blocks to cache before throttling the download
+ blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
)
// fetchRequest is a currently running block retrieval operation.
@@ -28,8 +28,7 @@ type queue struct {
hashQueue *prque.Prque // Priority queue of the block hashes to fetch
hashCounter int // Counter indexing the added hashes to ensure retrieval order
- pendPool map[string]*fetchRequest // Currently pending block retrieval operations
- pendCount int // Number of pending block fetches (to throttle the download)
+ pendPool map[string]*fetchRequest // Currently pending block retrieval operations
blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
blockCache []*types.Block // Downloaded but not yet delivered blocks
@@ -58,7 +57,6 @@ func (q *queue) Reset() {
q.hashCounter = 0
q.pendPool = make(map[string]*fetchRequest)
- q.pendCount = 0
q.blockPool = make(map[common.Hash]int)
q.blockOffset = 0
@@ -106,7 +104,13 @@ func (q *queue) Throttle() bool {
q.lock.RLock()
defer q.lock.RUnlock()
- return q.pendCount >= len(q.blockCache)-len(q.blockPool)
+ // Calculate the currently in-flight block requests
+ pending := 0
+ for _, request := range q.pendPool {
+ pending += len(request.Hashes)
+ }
+ // Throttle if more blocks are in-flight than free space in the cache
+ return pending >= len(q.blockCache)-len(q.blockPool)
}
// Has checks if a hash is within the download queue or not.
@@ -206,10 +210,14 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
- // Short circuit if the pool has been depleted
+ // Short circuit if the pool has been depleted, or if the peer's already
+ // downloading something (sanity check not to corrupt state)
if q.hashQueue.Empty() {
return nil
}
+ if _, ok := q.pendPool[p.id]; ok {
+ return nil
+ }
// Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int)
skip := make(map[common.Hash]int)
@@ -236,7 +244,6 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
Time: time.Now(),
}
q.pendPool[p.id] = request
- q.pendCount += len(request.Hashes)
return request
}
@@ -250,7 +257,6 @@ func (q *queue) Cancel(request *fetchRequest) {
q.hashQueue.Push(hash, float32(index))
}
delete(q.pendPool, request.Peer.id)
- q.pendCount -= len(request.Hashes)
}
// Expire checks for in flight requests that exceeded a timeout allowance,
@@ -266,7 +272,6 @@ func (q *queue) Expire(timeout time.Duration) []string {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
- q.pendCount -= len(request.Hashes)
peers = append(peers, id)
}
}
@@ -289,9 +294,6 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
delete(q.pendPool, id)
- // Mark all the hashes in the request as non-pending
- q.pendCount -= len(request.Hashes)
-
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
for hash, _ := range request.Hashes {