aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go169
1 files changed, 91 insertions, 78 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 6926f1d8c..a1a70e46e 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -32,7 +32,11 @@ import (
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
-var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
+var (
+ blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
+ blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
+ blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
+)
var (
errNoFetchesPending = errors.New("no fetches pending")
@@ -41,17 +45,17 @@ var (
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
- Peer *peerConnection // Peer to which the request was sent
- From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
- Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
- Headers []*types.Header // [eth/62] Requested headers, sorted by request order
- Time time.Time // Time when the request was made
+ Peer *peerConnection // Peer to which the request was sent
+ From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
+ Headers []*types.Header // [eth/62] Requested headers, sorted by request order
+ Time time.Time // Time when the request was made
}
// fetchResult is a struct collecting partial results from data fetchers until
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
- Pending int // Number of data fetches still pending
+ Pending int // Number of data fetches still pending
+ Hash common.Hash // Hash of the header to prevent recalculating
Header *types.Header
Uncles []*types.Header
@@ -61,12 +65,10 @@ type fetchResult struct {
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
- mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
- fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
-
- headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
+ mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
+ headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
@@ -87,8 +89,9 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
- resultCache []*fetchResult // Downloaded but not yet delivered fetch results
- resultOffset uint64 // Offset of the first cached fetch result in the block chain
+ resultCache []*fetchResult // Downloaded but not yet delivered fetch results
+ resultOffset uint64 // Offset of the first cached fetch result in the block chain
+ resultSize common.StorageSize // Approximate size of a block (exponential moving average)
lock *sync.Mutex
active *sync.Cond
@@ -109,7 +112,7 @@ func newQueue() *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
- resultCache: make([]*fetchResult, blockCacheLimit),
+ resultCache: make([]*fetchResult, blockCacheItems),
active: sync.NewCond(lock),
lock: lock,
}
@@ -122,10 +125,8 @@ func (q *queue) Reset() {
q.closed = false
q.mode = FullSync
- q.fastSyncPivot = 0
q.headerHead = common.Hash{}
-
q.headerPendPool = make(map[string]*fetchRequest)
q.blockTaskPool = make(map[common.Hash]*types.Header)
@@ -138,7 +139,7 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
- q.resultCache = make([]*fetchResult, blockCacheLimit)
+ q.resultCache = make([]*fetchResult, blockCacheItems)
q.resultOffset = 0
}
@@ -214,27 +215,13 @@ func (q *queue) Idle() bool {
return (queued + pending + cached) == 0
}
-// FastSyncPivot retrieves the currently used fast sync pivot point.
-func (q *queue) FastSyncPivot() uint64 {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.fastSyncPivot
-}
-
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
q.lock.Lock()
defer q.lock.Unlock()
- // Calculate the currently in-flight block (body) requests
- pending := 0
- for _, request := range q.blockPendPool {
- pending += len(request.Hashes) + len(request.Headers)
- }
- // Throttle if more blocks (bodies) are in-flight than free space in the cache
- return pending >= len(q.resultCache)-len(q.blockDonePool)
+ return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0
}
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
@@ -243,13 +230,39 @@ func (q *queue) ShouldThrottleReceipts() bool {
q.lock.Lock()
defer q.lock.Unlock()
- // Calculate the currently in-flight receipt requests
+ return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0
+}
+
+// resultSlots calculates the number of results slots available for requests
+// whilst adhering to both the item and the memory limit too of the results
+// cache.
+func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
+ // Calculate the maximum length capped by the memory limit
+ limit := len(q.resultCache)
+ if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) {
+ limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
+ }
+ // Calculate the number of slots already finished
+ finished := 0
+ for _, result := range q.resultCache[:limit] {
+ if result == nil {
+ break
+ }
+ if _, ok := donePool[result.Hash]; ok {
+ finished++
+ }
+ }
+ // Calculate the number of slots currently downloading
pending := 0
- for _, request := range q.receiptPendPool {
- pending += len(request.Headers)
+ for _, request := range pendPool {
+ for _, header := range request.Headers {
+ if header.Number.Uint64() < q.resultOffset+uint64(limit) {
+ pending++
+ }
+ }
}
- // Throttle if more receipts are in-flight than free space in the cache
- return pending >= len(q.resultCache)-len(q.receiptDonePool)
+ // Return the free slots to distribute
+ return limit - finished - pending
}
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
@@ -323,8 +336,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
- if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
- // Fast phase of the fast sync, retrieve receipts too
+ if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
@@ -335,18 +347,25 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// WaitResults retrieves and permanently removes a batch of fetch
-// results from the cache. the result slice will be empty if the queue
-// has been closed.
-func (q *queue) WaitResults() []*fetchResult {
+// Results retrieves and permanently removes a batch of fetch results from
+// the cache. the result slice will be empty if the queue has been closed.
+func (q *queue) Results(block bool) []*fetchResult {
q.lock.Lock()
defer q.lock.Unlock()
+ // Count the number of items available for processing
nproc := q.countProcessableItems()
for nproc == 0 && !q.closed {
+ if !block {
+ return nil
+ }
q.active.Wait()
nproc = q.countProcessableItems()
}
+ // Since we have a batch limit, don't pull more into "dangling" memory
+ if nproc > maxResultsProcess {
+ nproc = maxResultsProcess
+ }
results := make([]*fetchResult, nproc)
copy(results, q.resultCache[:nproc])
if len(results) > 0 {
@@ -363,6 +382,21 @@ func (q *queue) WaitResults() []*fetchResult {
}
// Advance the expected block number of the first cache entry.
q.resultOffset += uint64(nproc)
+
+ // Recalculate the result item weights to prevent memory exhaustion
+ for _, result := range results {
+ size := result.Header.Size()
+ for _, uncle := range result.Uncles {
+ size += uncle.Size()
+ }
+ for _, receipt := range result.Receipts {
+ size += receipt.Size()
+ }
+ for _, tx := range result.Transactions {
+ size += tx.Size()
+ }
+ q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
+ }
}
return results
}
@@ -370,21 +404,9 @@ func (q *queue) WaitResults() []*fetchResult {
// countProcessableItems counts the processable items.
func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
return i
}
- // Stop before processing the pivot block to ensure that
- // resultCache has space for fsHeaderForceVerify items. Not
- // doing this could leave us unable to download the required
- // amount of headers.
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- return i
- }
- }
- }
}
return len(q.resultCache)
}
@@ -473,10 +495,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
return nil, false, nil
}
// Calculate an upper limit on the items we might fetch (i.e. throttling)
- space := len(q.resultCache) - len(donePool)
- for _, request := range pendPool {
- space -= len(request.Headers)
- }
+ space := q.resultSlots(pendPool, donePool)
+
// Retrieve a batch of tasks, skipping previously failed ones
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
@@ -484,6 +504,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
progress := false
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
header := taskQueue.PopItem().(*types.Header)
+ hash := header.Hash()
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
@@ -493,18 +514,19 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
if q.resultCache[index] == nil {
components := 1
- if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
+ if q.mode == FastSync {
components = 2
}
q.resultCache[index] = &fetchResult{
Pending: components,
+ Hash: hash,
Header: header,
}
}
// If this fetch task is a noop, skip this fetch operation
if isNoop(header) {
- donePool[header.Hash()] = struct{}{}
- delete(taskPool, header.Hash())
+ donePool[hash] = struct{}{}
+ delete(taskPool, hash)
space, proc = space-1, proc-1
q.resultCache[index].Pending--
@@ -512,7 +534,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
continue
}
// Otherwise unless the peer is known not to have the data, add to the retrieve list
- if p.Lacks(header.Hash()) {
+ if p.Lacks(hash) {
skip = append(skip, header)
} else {
send = append(send, header)
@@ -565,9 +587,6 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
}
- for hash, index := range request.Hashes {
- taskQueue.Push(hash, float32(index))
- }
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
@@ -640,18 +659,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
}
- for hash, index := range request.Hashes {
- taskQueue.Push(hash, float32(index))
- }
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
// Add the peer to the expiry report along the the number of failed requests
- expirations := len(request.Hashes)
- if expirations < len(request.Headers) {
- expirations = len(request.Headers)
- }
- expiries[id] = expirations
+ expiries[id] = len(request.Headers)
}
}
// Remove the expired requests from the pending pool
@@ -828,14 +840,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
failure = err
break
}
- donePool[header.Hash()] = struct{}{}
+ hash := header.Hash()
+
+ donePool[hash] = struct{}{}
q.resultCache[index].Pending--
useful = true
accepted++
// Clean up a successful fetch
request.Headers[i] = nil
- delete(taskPool, header.Hash())
+ delete(taskPool, hash)
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
@@ -860,7 +874,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
-func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
+func (q *queue) Prepare(offset uint64, mode SyncMode) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -868,6 +882,5 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.
if q.resultOffset < offset {
q.resultOffset = offset
}
- q.fastSyncPivot = pivot
q.mode = mode
}