aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2016-06-01 23:07:25 +0800
committerPéter Szilágyi <peterke@gmail.com>2016-06-06 19:20:57 +0800
commit88f174a014c1f2f99fa6d6a8054ada28a0b43504 (patch)
tree47604210e1423970a04fe5dd0bb41840898f4e08 /eth/downloader/peer.go
parent780bdb3e8069c38a779639ee92cc0eccba8a6735 (diff)
downloadgo-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar
go-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.gz
go-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.bz2
go-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.lz
go-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.xz
go-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.zst
go-tangerine-88f174a014c1f2f99fa6d6a8054ada28a0b43504.zip
eth/downloader: adaptive quality of service tuning
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go81
1 files changed, 62 insertions, 19 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 6aab907d7..94d44fca4 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -23,6 +23,8 @@ import (
"errors"
"fmt"
"math"
+ "sort"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -31,8 +33,8 @@ import (
)
const (
- maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
- throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
+ maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+ measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
)
// Hash and block fetchers belonging to eth/61 and below
@@ -68,6 +70,8 @@ type peer struct {
receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second
+ rtt time.Duration // Request round trip time to track responsiveness (QoS)
+
headerStarted time.Time // Time instance when the last header fetch was started
blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
@@ -290,44 +294,47 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
return
}
// Otherwise update the throughput with a new measurement
- measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
- *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
+ elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
+ measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
+
+ *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
+ p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
}
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
-func (p *peer) HeaderCapacity() int {
+func (p *peer) HeaderCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch))))
+ return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
}
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
-func (p *peer) BlockCapacity() int {
+func (p *peer) BlockCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
+ return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch)))
}
// ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput.
-func (p *peer) ReceiptCapacity() int {
+func (p *peer) ReceiptCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
+ return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch)))
}
// NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput.
-func (p *peer) NodeDataCapacity() int {
+func (p *peer) NodeDataCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
- return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
+ return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch)))
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
@@ -361,13 +368,14 @@ func (p *peer) String() string {
p.lock.RLock()
defer p.lock.RUnlock()
- return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+
- fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
- fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
- fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
- fmt.Sprintf("lacking %4d", len(p.lacking)),
- )
+ return fmt.Sprintf("Peer %s [%s]", p.id, strings.Join([]string{
+ fmt.Sprintf("hs %3.2f/s", p.headerThroughput),
+ fmt.Sprintf("bs %3.2f/s", p.blockThroughput),
+ fmt.Sprintf("rs %3.2f/s", p.receiptThroughput),
+ fmt.Sprintf("ss %3.2f/s", p.stateThroughput),
+ fmt.Sprintf("miss %4d", len(p.lacking)),
+ fmt.Sprintf("rtt %v", p.rtt),
+ }, ", "))
}
// peerSet represents the collection of active peer participating in the chain
@@ -402,6 +410,10 @@ func (ps *peerSet) Reset() {
// average of all existing peers, to give it a realistic chance of being used
// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
+ // Retrieve the current median RTT as a sane default
+ p.rtt = ps.medianRTT()
+
+ // Register the new peer with some meaningful defaults
ps.lock.Lock()
defer ps.lock.Unlock()
@@ -564,3 +576,34 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
}
return idle, total
}
+
+// medianRTT returns the median RTT of te peerset, considering only the tuning
+// peers if there are more peers available.
+func (ps *peerSet) medianRTT() time.Duration {
+ // Gather all the currnetly measured round trip times
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ rtts := make([]float64, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ p.lock.RLock()
+ rtts = append(rtts, float64(p.rtt))
+ p.lock.RUnlock()
+ }
+ sort.Float64s(rtts)
+
+ median := rttMaxEstimate
+ if qosTuningPeers <= len(rtts) {
+ median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers
+ } else if len(rtts) > 0 {
+ median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
+ }
+ // Restrict the RTT into some QoS defaults, irrelevant of true RTT
+ if median < rttMinEstimate {
+ median = rttMinEstimate
+ }
+ if median > rttMaxEstimate {
+ median = rttMaxEstimate
+ }
+ return median
+}