aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-30 00:37:26 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-11-19 23:01:39 +0800
commitb6f5523bdcded47c4f92b4cb5e6e23287bd6b60d (patch)
tree7e9a13377f52658d398f4f3dc11883f515bdbb3d /eth/downloader/queue.go
parent4c2933ad825aa11ce118abddfe6eeafc0422b2b6 (diff)
downloadgo-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.gz
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.bz2
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.lz
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.xz
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.zst
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.zip
eth/downloader: fetch data proportionally to peer capacity
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go95
1 files changed, 59 insertions, 36 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 584797d7b..1e55560db 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBodies(timeout time.Duration) []string {
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
- peers := []string{}
+ expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
@@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
- peers = append(peers, id)
+ // Add the peer to the expiry report along the the number of failed requests
+ expirations := len(request.Hashes)
+ if expirations < len(request.Headers) {
+ expirations = len(request.Headers)
+ }
+ expiries[id] = expirations
}
}
// Remove the expired requests from the pending pool
- for _, id := range peers {
+ for id, _ := range expiries {
delete(pendPool, id)
}
- return peers
+ return expiries
}
-// DeliverBlocks injects a block retrieval response into the download queue.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a block retrieval response into the download queue. The
+// method returns the number of blocks accepted from the delivery and also wakes
+// any threads waiting for data delivery.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
request := q.blockPendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id)
@@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
}
}
// Iterate over the downloaded blocks and add each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
for _, block := range blocks {
// Skip any blocks that were not requested
hash := block.Hash()
@@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
delete(request.Hashes, hash)
delete(q.hashPool, hash)
+ accepted++
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
// Wake up WaitResults
- q.active.Signal()
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
+ return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
- return errs[0]
+ return accepted, errs[0]
case len(errs) == len(blocks):
- return errStaleDelivery
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// DeliverBodies injects a block body retrieval response into the results queue.
-func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
-func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
- donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+ results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
@@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
// Assemble each of the results with their headers and retrieved data parts
var (
- failure error
- useful bool
+ accepted int
+ failure error
+ useful bool
)
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
@@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending--
useful = true
+ accepted++
// Clean up a successful fetch
request.Headers[i] = nil
@@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
// Wake up WaitResults
- q.active.Signal()
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
- return failure
+ return accepted, failure
case useful:
- return fmt.Errorf("partial failure: %v", failure)
+ return accepted, fmt.Errorf("partial failure: %v", failure)
default:
- return errStaleDelivery
+ return accepted, errStaleDelivery
}
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
+// The method returns the number of node state entries originally requested, and
+// the number of them actually accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
@@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
}
// Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
- // Skip any blocks that were not requested
+ // Skip any state trie entires that were not requested
hash := common.BytesToHash(crypto.Sha3(blob))
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob})
+ accepted++
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
@@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
+ return accepted, nil
case len(errs) == len(request.Hashes):
- return errStaleDelivery
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}