diff options
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 45 |
1 files changed, 40 insertions, 5 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 4abae8d5e..df54eecbd 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -5,10 +5,14 @@ package downloader import ( "errors" + "math" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) @@ -27,14 +31,15 @@ type peer struct { head common.Hash // Hash of the peers latest known block idle int32 // Current activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation (not used currently) + rep int32 // Simple peer reputation - mu sync.RWMutex + capacity int32 // Number of blocks allowed to fetch per request + started time.Time // Time instance when the last fetch was started - ignored *set.Set + ignored *set.Set // Set of hashes not to request (didn't have previously) - getHashes hashFetcherFn - getBlocks blockFetcherFn + getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) + getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) } // newPeer create a new downloader peer, with specific hash and block retrieval @@ -43,6 +48,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo return &peer{ id: id, head: head, + capacity: 1, getHashes: getHashes, getBlocks: getBlocks, ignored: set.New(), @@ -52,6 +58,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo // Reset clears the internal state of a peer entity. func (p *peer) Reset() { atomic.StoreInt32(&p.idle, 0) + atomic.StoreInt32(&p.capacity, 1) p.ignored.Clear() } @@ -61,6 +68,8 @@ func (p *peer) Fetch(request *fetchRequest) error { if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { return errAlreadyFetching } + p.started = time.Now() + // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) for hash, _ := range request.Hashes { @@ -72,10 +81,36 @@ func (p *peer) Fetch(request *fetchRequest) error { } // SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. func (p *peer) SetIdle() { + // Update the peer's download allowance based on previous performance + scale := 2.0 + if time.Since(p.started) > blockSoftTTL { + scale = 0.5 + } + for { + // Calculate the new download bandwidth allowance + prev := atomic.LoadInt32(&p.capacity) + next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale))) + if scale < 1 { + glog.V(logger.Detail).Infof("%s: reducing block allowance from %d to %d", p.id, prev, next) + } + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + break + } + } + // Set the peer to idle to allow further block requests atomic.StoreInt32(&p.idle, 0) } +// Capacity retrieves the peers block download allowance based on its previously +// discovered bandwidth capacity. +func (p *peer) Capacity() int { + return int(atomic.LoadInt32(&p.capacity)) +} + // Promote increases the peer's reputation. func (p *peer) Promote() { atomic.AddInt32(&p.rep, 1) |