diff options
author | Péter Szilágyi <peterke@gmail.com> | 2016-06-01 23:07:25 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2016-06-06 19:20:57 +0800 |
commit | 88f174a014c1f2f99fa6d6a8054ada28a0b43504 (patch) | |
tree | 47604210e1423970a04fe5dd0bb41840898f4e08 /eth/downloader/peer.go | |
parent | 780bdb3e8069c38a779639ee92cc0eccba8a6735 (diff) | |
download | dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.gz dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.bz2 dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.lz dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.xz dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.tar.zst dexon-88f174a014c1f2f99fa6d6a8054ada28a0b43504.zip |
eth/downloader: adaptive quality of service tuning
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 81 |
1 files changed, 62 insertions, 19 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 6aab907d7..94d44fca4 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -23,6 +23,8 @@ import ( "errors" "fmt" "math" + "sort" + "strings" "sync" "sync/atomic" "time" @@ -31,8 +33,8 @@ import ( ) const ( - maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items - throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items + measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. ) // Hash and block fetchers belonging to eth/61 and below @@ -68,6 +70,8 @@ type peer struct { receiptThroughput float64 // Number of receipts measured to be retrievable per second stateThroughput float64 // Number of node data pieces measured to be retrievable per second + rtt time.Duration // Request round trip time to track responsiveness (QoS) + headerStarted time.Time // Time instance when the last header fetch was started blockStarted time.Time // Time instance when the last block (body) fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started @@ -290,44 +294,47 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id return } // Otherwise update the throughput with a new measurement - measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor - *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured + elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor + measured := float64(delivered) / (float64(elapsed) / float64(time.Second)) + + *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured + p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) } // HeaderCapacity retrieves the peers header download allowance based on its // previously discovered throughput. -func (p *peer) HeaderCapacity() int { +func (p *peer) HeaderCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.headerThroughput*float64(headerTargetRTT)/float64(time.Second), float64(MaxHeaderFetch)))) + return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch))) } // BlockCapacity retrieves the peers block download allowance based on its // previously discovered throughput. -func (p *peer) BlockCapacity() int { +func (p *peer) BlockCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch)))) + return int(math.Min(1+math.Max(1, p.blockThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxBlockFetch))) } // ReceiptCapacity retrieves the peers receipt download allowance based on its // previously discovered throughput. -func (p *peer) ReceiptCapacity() int { +func (p *peer) ReceiptCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch)))) + return int(math.Min(1+math.Max(1, p.receiptThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxReceiptFetch))) } // NodeDataCapacity retrieves the peers state download allowance based on its // previously discovered throughput. -func (p *peer) NodeDataCapacity() int { +func (p *peer) NodeDataCapacity(targetRTT time.Duration) int { p.lock.RLock() defer p.lock.RUnlock() - return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch)))) + return int(math.Min(1+math.Max(1, p.stateThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxStateFetch))) } // MarkLacking appends a new entity to the set of items (blocks, receipts, states) @@ -361,13 +368,14 @@ func (p *peer) String() string { p.lock.RLock() defer p.lock.RUnlock() - return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("headers %3.2f/s, ", p.headerThroughput)+ - fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ - fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ - fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ - fmt.Sprintf("lacking %4d", len(p.lacking)), - ) + return fmt.Sprintf("Peer %s [%s]", p.id, strings.Join([]string{ + fmt.Sprintf("hs %3.2f/s", p.headerThroughput), + fmt.Sprintf("bs %3.2f/s", p.blockThroughput), + fmt.Sprintf("rs %3.2f/s", p.receiptThroughput), + fmt.Sprintf("ss %3.2f/s", p.stateThroughput), + fmt.Sprintf("miss %4d", len(p.lacking)), + fmt.Sprintf("rtt %v", p.rtt), + }, ", ")) } // peerSet represents the collection of active peer participating in the chain @@ -402,6 +410,10 @@ func (ps *peerSet) Reset() { // average of all existing peers, to give it a realistic chance of being used // for data retrievals. func (ps *peerSet) Register(p *peer) error { + // Retrieve the current median RTT as a sane default + p.rtt = ps.medianRTT() + + // Register the new peer with some meaningful defaults ps.lock.Lock() defer ps.lock.Unlock() @@ -564,3 +576,34 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) } return idle, total } + +// medianRTT returns the median RTT of te peerset, considering only the tuning +// peers if there are more peers available. +func (ps *peerSet) medianRTT() time.Duration { + // Gather all the currnetly measured round trip times + ps.lock.RLock() + defer ps.lock.RUnlock() + + rtts := make([]float64, 0, len(ps.peers)) + for _, p := range ps.peers { + p.lock.RLock() + rtts = append(rtts, float64(p.rtt)) + p.lock.RUnlock() + } + sort.Float64s(rtts) + + median := rttMaxEstimate + if qosTuningPeers <= len(rtts) { + median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers + } else if len(rtts) > 0 { + median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos) + } + // Restrict the RTT into some QoS defaults, irrelevant of true RTT + if median < rttMinEstimate { + median = rttMinEstimate + } + if median > rttMaxEstimate { + median = rttMaxEstimate + } + return median +} |