aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-30 00:37:26 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-11-19 23:01:39 +0800
commitb6f5523bdcded47c4f92b4cb5e6e23287bd6b60d (patch)
tree7e9a13377f52658d398f4f3dc11883f515bdbb3d /eth
parent4c2933ad825aa11ce118abddfe6eeafc0422b2b6 (diff)
downloadgo-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.gz
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.bz2
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.lz
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.xz
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.zst
go-tangerine-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.zip
eth/downloader: fetch data proportionally to peer capacity
Diffstat (limited to 'eth')
-rw-r--r--eth/downloader/downloader.go178
-rw-r--r--eth/downloader/peer.go231
-rw-r--r--eth/downloader/queue.go95
3 files changed, 258 insertions, 246 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 5fa18a2e3..c272d05af 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -45,16 +45,17 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
- hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
- blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
- blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
- headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
- bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
- bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
- receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
- stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
+ hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
+ blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
+ blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
+
+ headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
+ bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
+ bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
+ receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
+ receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
+ stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -486,7 +487,7 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
// Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head})
- timeout := time.After(blockSoftTTL)
+ timeout := time.After(hashTTL)
for {
select {
case <-d.cancelCh:
@@ -779,47 +780,27 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of blocks, and demote in case of errors
blocks := packet.(*blockPack).blocks
- err := d.queue.DeliverBlocks(peer.id, blocks)
- switch err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above)
- if len(blocks) == 0 {
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of blocks and check chain validity
+ accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: stale delivery", peer)
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ peer.SetBlocksIdle(accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && len(blocks) == 0:
+ glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
+ glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
}
}
// Blocks arrived, try to update the progress
@@ -852,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
+ for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ peer.SetBlocksIdle(0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
@@ -1281,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- setIdle = func(p *peer) { p.SetBodiesIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -1305,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
- expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() }
- setIdle = func(p *peer) { p.SetReceiptsIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -1329,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Debug).Infof("Downloading node state data")
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
if err != nil {
@@ -1352,14 +1338,14 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
}
- expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() }
- setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@@ -1372,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
- expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
- idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
+ idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1394,45 +1380,25 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of data, and demote in case of errors
- switch err := deliver(packet); err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Items() == 0 {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
-
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of data and check chain validity
+ accepted, err := deliver(packet)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ setIdle(peer, accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && packet.Items() == 0:
+ glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
+ glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
}
}
// Blocks assembled, try to update the progress
@@ -1465,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
- for _, pid := range expire() {
+ for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ setIdle(peer, 0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 9ba6dabbd..80f08b68f 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -30,8 +30,10 @@ import (
"github.com/ethereum/go-ethereum/common"
)
-// Maximum number of entries allowed on the list or lacking items.
-const maxLackingHashes = 4096
+const (
+ maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+ throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
+)
// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
@@ -59,18 +61,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation
- blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
- receiptCapacity int32 // Number of receipts allowed to fetch per request
- stateCapacity int32 // Number of node data pieces allowed to fetch per request
+ blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
+ receiptThroughput float64 // Number of receipts measured to be retrievable per second
+ stateThroughput float64 // Number of node data pieces measured to be retrievable per second
blockStarted time.Time // Time instance when the last block (body)fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
- lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
- lackingLock sync.RWMutex // Lock protecting the lacking hashes list
+ lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -84,6 +84,7 @@ type peer struct {
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
+ lock sync.RWMutex
}
// newPeer create a new downloader peer, with specific hash and block retrieval
@@ -93,12 +94,9 @@ func newPeer(id string, version int, head common.Hash,
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{
- id: id,
- head: head,
- blockCapacity: 1,
- receiptCapacity: 1,
- stateCapacity: 1,
- lacking: make(map[common.Hash]struct{}),
+ id: id,
+ head: head,
+ lacking: make(map[common.Hash]struct{}),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
@@ -117,15 +115,18 @@ func newPeer(id string, version int, head common.Hash,
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
- atomic.StoreInt32(&p.blockCapacity, 1)
- atomic.StoreInt32(&p.receiptCapacity, 1)
- atomic.StoreInt32(&p.stateCapacity, 1)
+ atomic.StoreInt32(&p.stateIdle, 0)
+
+ p.blockThroughput = 0
+ p.receiptThroughput = 0
+ p.stateThroughput = 0
- p.lackingLock.Lock()
p.lacking = make(map[common.Hash]struct{})
- p.lackingLock.Unlock()
}
// Fetch61 sends a block retrieval request to the remote peer.
@@ -216,107 +217,86 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil
}
-// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBlocksIdle() {
- p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
+// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
+// requests. Its estimated block retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBlocksIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
-// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block body retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBodiesIdle() {
- p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle)
+// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
+// requests. Its estimated body retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBodiesIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
-// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its receipt retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetReceiptsIdle() {
- p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
+// retrieval requests. Its estimated receipt retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetReceiptsIdle(delivered int) {
+ p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
}
-// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
-// requests. Its node data retrieval allowance will also be updated either up- or
-// downwards, depending on whether the previous fetch completed in time.
-func (p *peer) SetNodeDataIdle() {
- p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
+// data retrieval requests. Its estimated state retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetNodeDataIdle(delivered int) {
+ p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its data retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
- // Update the peer's download allowance based on previous performance
- scale := 2.0
- if time.Since(started) > softTTL {
- scale = 0.5
- if time.Since(started) > hardTTL {
- scale = 1 / float64(maxFetch) // reduces capacity to 1
- }
- }
- for {
- // Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(capacity)
- next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
-
- // Try to update the old value
- if atomic.CompareAndSwapInt32(capacity, prev, next) {
- // If we're having problems at 1 capacity, try to find better peers
- if next == 1 {
- p.Demote()
- }
- break
- }
+// Its estimated retrieval throughput is updated with that measured just now.
+func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+ // Irrelevant of the scaling, make sure the peer ends up idle
+ defer atomic.StoreInt32(idle, 0)
+
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
+ if delivered == 0 {
+ *throughput = 0
+ return
}
- // Set the peer to idle to allow further fetch requests
- atomic.StoreInt32(idle, 0)
+ // Otherwise update the throughput with a new measurement
+ measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
+ *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
}
// BlockCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
+// previously discovered throughput.
func (p *peer) BlockCapacity() int {
- return int(atomic.LoadInt32(&p.blockCapacity))
-}
+ p.lock.RLock()
+ defer p.lock.RUnlock()
-// ReceiptCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
-func (p *peer) ReceiptCapacity() int {
- return int(atomic.LoadInt32(&p.receiptCapacity))
+ return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
}
-// NodeDataCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
-func (p *peer) NodeDataCapacity() int {
- return int(atomic.LoadInt32(&p.stateCapacity))
-}
+// ReceiptCapacity retrieves the peers receipt download allowance based on its
+// previously discovered throughput.
+func (p *peer) ReceiptCapacity() int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
-// Promote increases the peer's reputation.
-func (p *peer) Promote() {
- atomic.AddInt32(&p.rep, 1)
+ return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
}
-// Demote decreases the peer's reputation or leaves it at 0.
-func (p *peer) Demote() {
- for {
- // Calculate the new reputation value
- prev := atomic.LoadInt32(&p.rep)
- next := prev / 2
+// NodeDataCapacity retrieves the peers state download allowance based on its
+// previously discovered throughput.
+func (p *peer) NodeDataCapacity() int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
- // Try to update the old value
- if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
- return
- }
- }
+ return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off.
func (p *peer) MarkLacking(hash common.Hash) {
- p.lackingLock.Lock()
- defer p.lackingLock.Unlock()
+ p.lock.Lock()
+ defer p.lock.Unlock()
for len(p.lacking) >= maxLackingHashes {
for drop, _ := range p.lacking {
@@ -330,8 +310,8 @@ func (p *peer) MarkLacking(hash common.Hash) {
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
// list (i.e. whether we know that the peer does not have it).
func (p *peer) Lacks(hash common.Hash) bool {
- p.lackingLock.RLock()
- defer p.lackingLock.RUnlock()
+ p.lock.RLock()
+ defer p.lock.RUnlock()
_, ok := p.lacking[hash]
return ok
@@ -339,13 +319,13 @@ func (p *peer) Lacks(hash common.Hash) bool {
// String implements fmt.Stringer.
func (p *peer) String() string {
- p.lackingLock.RLock()
- defer p.lackingLock.RUnlock()
+ p.lock.RLock()
+ defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
- fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
- fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
fmt.Sprintf("lacking %4d", len(p.lacking)),
)
}
@@ -377,6 +357,10 @@ func (ps *peerSet) Reset() {
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
+//
+// The method also sets the starting throughput values of the new peer to the
+// average of all existing peers, to give it a realistic change of being used
+// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
@@ -384,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error {
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
}
+ if len(ps.peers) > 0 {
+ p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
+
+ for _, peer := range ps.peers {
+ peer.lock.RLock()
+ p.blockThroughput += peer.blockThroughput
+ p.receiptThroughput += peer.receiptThroughput
+ p.stateThroughput += peer.stateThroughput
+ peer.lock.RUnlock()
+ }
+ p.blockThroughput /= float64(len(ps.peers))
+ p.receiptThroughput /= float64(len(ps.peers))
+ p.stateThroughput /= float64(len(ps.peers))
+ }
ps.peers[p.id] = p
return nil
}
@@ -435,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- return ps.idlePeers(61, 61, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(61, 61, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@@ -444,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- return ps.idlePeers(62, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@@ -453,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
- return ps.idlePeers(63, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.receiptThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@@ -462,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
- return ps.idlePeers(63, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.stateThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
-func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
+// The resulting set of peers are sorted by their measure throughput.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
@@ -482,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
- if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ if throughput(idle[i]) < throughput(idle[j]) {
idle[i], idle[j] = idle[j], idle[i]
}
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 584797d7b..1e55560db 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBodies(timeout time.Duration) []string {
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
- peers := []string{}
+ expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
@@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
- peers = append(peers, id)
+ // Add the peer to the expiry report along the the number of failed requests
+ expirations := len(request.Hashes)
+ if expirations < len(request.Headers) {
+ expirations = len(request.Headers)
+ }
+ expiries[id] = expirations
}
}
// Remove the expired requests from the pending pool
- for _, id := range peers {
+ for id, _ := range expiries {
delete(pendPool, id)
}
- return peers
+ return expiries
}
-// DeliverBlocks injects a block retrieval response into the download queue.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a block retrieval response into the download queue. The
+// method returns the number of blocks accepted from the delivery and also wakes
+// any threads waiting for data delivery.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
request := q.blockPendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id)
@@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
}
}
// Iterate over the downloaded blocks and add each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
for _, block := range blocks {
// Skip any blocks that were not requested
hash := block.Hash()
@@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
delete(request.Hashes, hash)
delete(q.hashPool, hash)
+ accepted++
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
// Wake up WaitResults
- q.active.Signal()
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
+ return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
- return errs[0]
+ return accepted, errs[0]
case len(errs) == len(blocks):
- return errStaleDelivery
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// DeliverBodies injects a block body retrieval response into the results queue.
-func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
-func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
- donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+ results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
@@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
// Assemble each of the results with their headers and retrieved data parts
var (
- failure error
- useful bool
+ accepted int
+ failure error
+ useful bool
)
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
@@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending--
useful = true
+ accepted++
// Clean up a successful fetch
request.Headers[i] = nil
@@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
// Wake up WaitResults
- q.active.Signal()
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
- return failure
+ return accepted, failure
case useful:
- return fmt.Errorf("partial failure: %v", failure)
+ return accepted, fmt.Errorf("partial failure: %v", failure)
default:
- return errStaleDelivery
+ return accepted, errStaleDelivery
}
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
+// The method returns the number of node state entries originally requested, and
+// the number of them actually accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
@@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
}
// Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
- // Skip any blocks that were not requested
+ // Skip any state trie entires that were not requested
hash := common.BytesToHash(crypto.Sha3(blob))
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob})
+ accepted++
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
@@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
+ return accepted, nil
case len(errs) == len(request.Hashes):
- return errStaleDelivery
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}