aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go45
1 files changed, 40 insertions, 5 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 4abae8d5e..df54eecbd 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -5,10 +5,14 @@ package downloader
import (
"errors"
+ "math"
"sync"
"sync/atomic"
+ "time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
"gopkg.in/fatih/set.v0"
)
@@ -27,14 +31,15 @@ type peer struct {
head common.Hash // Hash of the peers latest known block
idle int32 // Current activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation (not used currently)
+ rep int32 // Simple peer reputation
- mu sync.RWMutex
+ capacity int32 // Number of blocks allowed to fetch per request
+ started time.Time // Time instance when the last fetch was started
- ignored *set.Set
+ ignored *set.Set // Set of hashes not to request (didn't have previously)
- getHashes hashFetcherFn
- getBlocks blockFetcherFn
+ getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing)
+ getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing)
}
// newPeer create a new downloader peer, with specific hash and block retrieval
@@ -43,6 +48,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
return &peer{
id: id,
head: head,
+ capacity: 1,
getHashes: getHashes,
getBlocks: getBlocks,
ignored: set.New(),
@@ -52,6 +58,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
atomic.StoreInt32(&p.idle, 0)
+ atomic.StoreInt32(&p.capacity, 1)
p.ignored.Clear()
}
@@ -61,6 +68,8 @@ func (p *peer) Fetch(request *fetchRequest) error {
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching
}
+ p.started = time.Now()
+
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
for hash, _ := range request.Hashes {
@@ -72,10 +81,36 @@ func (p *peer) Fetch(request *fetchRequest) error {
}
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its block retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() {
+ // Update the peer's download allowance based on previous performance
+ scale := 2.0
+ if time.Since(p.started) > blockSoftTTL {
+ scale = 0.5
+ }
+ for {
+ // Calculate the new download bandwidth allowance
+ prev := atomic.LoadInt32(&p.capacity)
+ next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale)))
+ if scale < 1 {
+ glog.V(logger.Detail).Infof("%s: reducing block allowance from %d to %d", p.id, prev, next)
+ }
+ // Try to update the old value
+ if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
+ break
+ }
+ }
+ // Set the peer to idle to allow further block requests
atomic.StoreInt32(&p.idle, 0)
}
+// Capacity retrieves the peers block download allowance based on its previously
+// discovered bandwidth capacity.
+func (p *peer) Capacity() int {
+ return int(atomic.LoadInt32(&p.capacity))
+}
+
// Promote increases the peer's reputation.
func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1)