diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-30 00:37:26 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-11-19 23:01:39 +0800 |
commit | b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d (patch) | |
tree | 7e9a13377f52658d398f4f3dc11883f515bdbb3d /eth/downloader/queue.go | |
parent | 4c2933ad825aa11ce118abddfe6eeafc0422b2b6 (diff) | |
download | go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.gz go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.bz2 go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.lz go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.xz go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.zst go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.zip |
eth/downloader: fetch data proportionally to peer capacity
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 95 |
1 files changed, 59 insertions, 36 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 584797d7b..1e55560db 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) { // ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { +func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string { // ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBodies(timeout time.Duration) []string { +func (q *queue) ExpireBodies(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string { // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireReceipts(timeout time.Duration) []string { +func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string { // ExpireNodeData checks for in flight node data requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) []string { +func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { // Iterate over the expired requests and return each to the queue - peers := []string{} + expiries := make(map[string]int) for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout @@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } - peers = append(peers, id) + // Add the peer to the expiry report along the the number of failed requests + expirations := len(request.Hashes) + if expirations < len(request.Headers) { + expirations = len(request.Headers) + } + expiries[id] = expirations } } // Remove the expired requests from the pending pool - for _, id := range peers { + for id, _ := range expiries { delete(pendPool, id) } - return peers + return expiries } -// DeliverBlocks injects a block retrieval response into the download queue. -func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { +// DeliverBlocks injects a block retrieval response into the download queue. The +// method returns the number of blocks accepted from the delivery and also wakes +// any threads waiting for data delivery. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the blocks were never requested request := q.blockPendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } blockReqTimer.UpdateSince(request.Time) delete(q.blockPendPool, id) @@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { } } // Iterate over the downloaded blocks and add each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) for _, block := range blocks { // Skip any blocks that were not requested hash := block.Hash() @@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { delete(request.Hashes, hash) delete(q.hashPool, hash) + accepted++ } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } // Wake up WaitResults - q.active.Signal() + if accepted > 0 { + q.active.Signal() + } // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: - return nil + return accepted, nil case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): - return errs[0] + return accepted, errs[0] case len(errs) == len(blocks): - return errStaleDelivery + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } // DeliverBodies injects a block body retrieval response into the results queue. -func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// The method returns the number of blocks bodies accepted from the delivery and +// also wakes any threads waiting for data delivery. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi } // DeliverReceipts injects a receipt retrieval response into the results queue. -func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { +// The method returns the number of transaction receipts accepted from the delivery +// and also wakes any threads waiting for data delivery. +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, - donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, + results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { + // Short circuit if the data was never requested request := pendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } reqTimer.UpdateSince(request.Time) delete(pendPool, id) @@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } // Assemble each of the results with their headers and retrieved data parts var ( - failure error - useful bool + accepted int + failure error + useful bool ) for i, header := range request.Headers { // Short circuit assembly if no more fetch results are found @@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ donePool[header.Hash()] = struct{}{} q.resultCache[index].Pending-- useful = true + accepted++ // Clean up a successful fetch request.Headers[i] = nil @@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } // Wake up WaitResults - q.active.Signal() + if accepted > 0 { + q.active.Signal() + } // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: - return failure + return accepted, failure case useful: - return fmt.Errorf("partial failure: %v", failure) + return accepted, fmt.Errorf("partial failure: %v", failure) default: - return errStaleDelivery + return accepted, errStaleDelivery } } // DeliverNodeData injects a node state data retrieval response into the queue. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error { +// The method returns the number of node state entries originally requested, and +// the number of them actually accepted from the delivery. +func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the data was never requested request := q.statePendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } stateReqTimer.UpdateSince(request.Time) delete(q.statePendPool, id) @@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i } } // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) process := []trie.SyncResult{} for _, blob := range data { - // Skip any blocks that were not requested + // Skip any state trie entires that were not requested hash := common.BytesToHash(crypto.Sha3(blob)) if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) @@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i } // Inject the next state trie item into the processing queue process = append(process, trie.SyncResult{hash, blob}) + accepted++ delete(request.Hashes, hash) delete(q.stateTaskPool, hash) @@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If none of the data items were good, it's a stale delivery switch { case len(errs) == 0: - return nil + return accepted, nil case len(errs) == len(request.Hashes): - return errStaleDelivery + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } |