diff options
author | Péter Szilágyi <peterke@gmail.com> | 2018-02-06 00:40:32 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2018-02-06 00:40:32 +0800 |
commit | 55599ee95d4151a2502465e0afc7c47bd1acba77 (patch) | |
tree | 4165e73ae852db4f025a5ed57f0bc499e87cb8b9 /eth/downloader/queue.go | |
parent | 59336283c0dbeb1d0a74ff7a8b717b2b3bb0cf40 (diff) | |
download | go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.tar go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.gz go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.bz2 go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.lz go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.xz go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.zst go-tangerine-55599ee95d4151a2502465e0afc7c47bd1acba77.zip |
core, trie: intermediate mempool between trie and database (#15857)
This commit reduces database I/O by not writing every state trie to disk.
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 169 |
1 files changed, 91 insertions, 78 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 6926f1d8c..a1a70e46e 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -32,7 +32,11 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) -var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download +var ( + blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download + blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching + blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones +) var ( errNoFetchesPending = errors.New("no fetches pending") @@ -41,17 +45,17 @@ var ( // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { - Peer *peerConnection // Peer to which the request was sent - From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) - Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) - Headers []*types.Header // [eth/62] Requested headers, sorted by request order - Time time.Time // Time when the request was made + Peer *peerConnection // Peer to which the request was sent + From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) + Headers []*types.Header // [eth/62] Requested headers, sorted by request order + Time time.Time // Time when the request was made } // fetchResult is a struct collecting partial results from data fetchers until // all outstanding pieces complete and the result as a whole can be processed. type fetchResult struct { - Pending int // Number of data fetches still pending + Pending int // Number of data fetches still pending + Hash common.Hash // Hash of the header to prevent recalculating Header *types.Header Uncles []*types.Header @@ -61,12 +65,10 @@ type fetchResult struct { // queue represents hashes that are either need fetching or are being fetched type queue struct { - mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching - fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode - - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching // Headers are "special", they download in batches, supported by a skeleton chain + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable @@ -87,8 +89,9 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - resultCache []*fetchResult // Downloaded but not yet delivered fetch results - resultOffset uint64 // Offset of the first cached fetch result in the block chain + resultCache []*fetchResult // Downloaded but not yet delivered fetch results + resultOffset uint64 // Offset of the first cached fetch result in the block chain + resultSize common.StorageSize // Approximate size of a block (exponential moving average) lock *sync.Mutex active *sync.Cond @@ -109,7 +112,7 @@ func newQueue() *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), - resultCache: make([]*fetchResult, blockCacheLimit), + resultCache: make([]*fetchResult, blockCacheItems), active: sync.NewCond(lock), lock: lock, } @@ -122,10 +125,8 @@ func (q *queue) Reset() { q.closed = false q.mode = FullSync - q.fastSyncPivot = 0 q.headerHead = common.Hash{} - q.headerPendPool = make(map[string]*fetchRequest) q.blockTaskPool = make(map[common.Hash]*types.Header) @@ -138,7 +139,7 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) - q.resultCache = make([]*fetchResult, blockCacheLimit) + q.resultCache = make([]*fetchResult, blockCacheItems) q.resultOffset = 0 } @@ -214,27 +215,13 @@ func (q *queue) Idle() bool { return (queued + pending + cached) == 0 } -// FastSyncPivot retrieves the currently used fast sync pivot point. -func (q *queue) FastSyncPivot() uint64 { - q.lock.Lock() - defer q.lock.Unlock() - - return q.fastSyncPivot -} - // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { q.lock.Lock() defer q.lock.Unlock() - // Calculate the currently in-flight block (body) requests - pending := 0 - for _, request := range q.blockPendPool { - pending += len(request.Hashes) + len(request.Headers) - } - // Throttle if more blocks (bodies) are in-flight than free space in the cache - return pending >= len(q.resultCache)-len(q.blockDonePool) + return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0 } // ShouldThrottleReceipts checks if the download should be throttled (active receipt @@ -243,13 +230,39 @@ func (q *queue) ShouldThrottleReceipts() bool { q.lock.Lock() defer q.lock.Unlock() - // Calculate the currently in-flight receipt requests + return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0 +} + +// resultSlots calculates the number of results slots available for requests +// whilst adhering to both the item and the memory limit too of the results +// cache. +func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int { + // Calculate the maximum length capped by the memory limit + limit := len(q.resultCache) + if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) { + limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) + } + // Calculate the number of slots already finished + finished := 0 + for _, result := range q.resultCache[:limit] { + if result == nil { + break + } + if _, ok := donePool[result.Hash]; ok { + finished++ + } + } + // Calculate the number of slots currently downloading pending := 0 - for _, request := range q.receiptPendPool { - pending += len(request.Headers) + for _, request := range pendPool { + for _, header := range request.Headers { + if header.Number.Uint64() < q.resultOffset+uint64(limit) { + pending++ + } + } } - // Throttle if more receipts are in-flight than free space in the cache - return pending >= len(q.resultCache)-len(q.receiptDonePool) + // Return the free slots to distribute + return limit - finished - pending } // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill @@ -323,8 +336,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) - if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { - // Fast phase of the fast sync, retrieve receipts too + if q.mode == FastSync { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } @@ -335,18 +347,25 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// WaitResults retrieves and permanently removes a batch of fetch -// results from the cache. the result slice will be empty if the queue -// has been closed. -func (q *queue) WaitResults() []*fetchResult { +// Results retrieves and permanently removes a batch of fetch results from +// the cache. the result slice will be empty if the queue has been closed. +func (q *queue) Results(block bool) []*fetchResult { q.lock.Lock() defer q.lock.Unlock() + // Count the number of items available for processing nproc := q.countProcessableItems() for nproc == 0 && !q.closed { + if !block { + return nil + } q.active.Wait() nproc = q.countProcessableItems() } + // Since we have a batch limit, don't pull more into "dangling" memory + if nproc > maxResultsProcess { + nproc = maxResultsProcess + } results := make([]*fetchResult, nproc) copy(results, q.resultCache[:nproc]) if len(results) > 0 { @@ -363,6 +382,21 @@ func (q *queue) WaitResults() []*fetchResult { } // Advance the expected block number of the first cache entry. q.resultOffset += uint64(nproc) + + // Recalculate the result item weights to prevent memory exhaustion + for _, result := range results { + size := result.Header.Size() + for _, uncle := range result.Uncles { + size += uncle.Size() + } + for _, receipt := range result.Receipts { + size += receipt.Size() + } + for _, tx := range result.Transactions { + size += tx.Size() + } + q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize + } } return results } @@ -370,21 +404,9 @@ func (q *queue) WaitResults() []*fetchResult { // countProcessableItems counts the processable items. func (q *queue) countProcessableItems() int { for i, result := range q.resultCache { - // Don't process incomplete or unavailable items. if result == nil || result.Pending > 0 { return i } - // Stop before processing the pivot block to ensure that - // resultCache has space for fsHeaderForceVerify items. Not - // doing this could leave us unable to download the required - // amount of headers. - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - return i - } - } - } } return len(q.resultCache) } @@ -473,10 +495,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common return nil, false, nil } // Calculate an upper limit on the items we might fetch (i.e. throttling) - space := len(q.resultCache) - len(donePool) - for _, request := range pendPool { - space -= len(request.Headers) - } + space := q.resultSlots(pendPool, donePool) + // Retrieve a batch of tasks, skipping previously failed ones send := make([]*types.Header, 0, count) skip := make([]*types.Header, 0) @@ -484,6 +504,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common progress := false for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { header := taskQueue.PopItem().(*types.Header) + hash := header.Hash() // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) @@ -493,18 +514,19 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common } if q.resultCache[index] == nil { components := 1 - if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + if q.mode == FastSync { components = 2 } q.resultCache[index] = &fetchResult{ Pending: components, + Hash: hash, Header: header, } } // If this fetch task is a noop, skip this fetch operation if isNoop(header) { - donePool[header.Hash()] = struct{}{} - delete(taskPool, header.Hash()) + donePool[hash] = struct{}{} + delete(taskPool, hash) space, proc = space-1, proc-1 q.resultCache[index].Pending-- @@ -512,7 +534,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common continue } // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.Lacks(header.Hash()) { + if p.Lacks(hash) { skip = append(skip, header) } else { send = append(send, header) @@ -565,9 +587,6 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m if request.From > 0 { taskQueue.Push(request.From, -float32(request.From)) } - for hash, index := range request.Hashes { - taskQueue.Push(hash, float32(index)) - } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } @@ -640,18 +659,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, if request.From > 0 { taskQueue.Push(request.From, -float32(request.From)) } - for hash, index := range request.Hashes { - taskQueue.Push(hash, float32(index)) - } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } // Add the peer to the expiry report along the the number of failed requests - expirations := len(request.Hashes) - if expirations < len(request.Headers) { - expirations = len(request.Headers) - } - expiries[id] = expirations + expiries[id] = len(request.Headers) } } // Remove the expired requests from the pending pool @@ -828,14 +840,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ failure = err break } - donePool[header.Hash()] = struct{}{} + hash := header.Hash() + + donePool[hash] = struct{}{} q.resultCache[index].Pending-- useful = true accepted++ // Clean up a successful fetch request.Headers[i] = nil - delete(taskPool, header.Hash()) + delete(taskPool, hash) } // Return all failed or missing fetches to the queue for _, header := range request.Headers { @@ -860,7 +874,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) { +func (q *queue) Prepare(offset uint64, mode SyncMode) { q.lock.Lock() defer q.lock.Unlock() @@ -868,6 +882,5 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types. if q.resultOffset < offset { q.resultOffset = offset } - q.fastSyncPivot = pivot q.mode = mode } |