diff options
author | Felix Lange <fjl@twurst.com> | 2015-08-27 06:03:59 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-08-27 06:03:59 +0800 |
commit | 6ec13e7e2bab1ebdb580819a48629055bbbb5fb3 (patch) | |
tree | cd23c3deac1a41b34a5157c1f7c5361ca56b4137 /eth/downloader/peer.go | |
parent | 79b644c7a35bbc835b7e78ddf8a31c37e69b0784 (diff) | |
parent | 17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d (diff) | |
download | dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.tar dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.tar.gz dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.tar.bz2 dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.tar.lz dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.tar.xz dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.tar.zst dexon-6ec13e7e2bab1ebdb580819a48629055bbbb5fb3.zip |
Merge pull request #1701 from karalabe/eth62-sync-rebase
eth: implement eth/62 synchronization logic
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 92 |
1 files changed, 79 insertions, 13 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 4273b9168..8fd1f9a99 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -31,10 +31,16 @@ import ( "gopkg.in/fatih/set.v0" ) +// Hash and block fetchers belonging to eth/61 and below type relativeHashFetcherFn func(common.Hash) error type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error +// Block header and body fethers belonging to eth/62 and above +type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error +type absoluteHeaderFetcherFn func(uint64, int, int, bool) error +type blockBodyFetcherFn func([]common.Hash) error + var ( errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyRegistered = errors.New("peer is already registered") @@ -54,25 +60,37 @@ type peer struct { ignored *set.Set // Set of hashes not to request (didn't have previously) - getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash - getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position - getBlocks blockFetcherFn // Method to retrieve a batch of blocks + getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash + getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position + getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks + + getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash + getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position + getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies version int // Eth protocol version number to switch strategies } // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. -func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer { +func newPeer(id string, version int, head common.Hash, + getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, + id: id, + head: head, + capacity: 1, + ignored: set.New(), + getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, getBlocks: getBlocks, - ignored: set.New(), - version: version, + + getRelHeaders: getRelHeaders, + getAbsHeaders: getAbsHeaders, + getBlockBodies: getBlockBodies, + + version: version, } } @@ -83,8 +101,8 @@ func (p *peer) Reset() { p.ignored.Clear() } -// Fetch sends a block retrieval request to the remote peer. -func (p *peer) Fetch(request *fetchRequest) error { +// Fetch61 sends a block retrieval request to the remote peer. +func (p *peer) Fetch61(request *fetchRequest) error { // Short circuit if the peer is already fetching if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { return errAlreadyFetching @@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error { return nil } -// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Fetch sends a block body retrieval request to the remote peer. +func (p *peer) Fetch(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + return errAlreadyFetching + } + p.started = time.Now() + + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) + } + go p.getBlockBodies(hashes) + + return nil +} + +// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests. // Its block retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle() { +func (p *peer) SetIdle61() { // Update the peer's download allowance based on previous performance scale := 2.0 if time.Since(p.started) > blockSoftTTL { @@ -131,6 +167,36 @@ func (p *peer) SetIdle() { atomic.StoreInt32(&p.idle, 0) } +// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block body retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) SetIdle() { + // Update the peer's download allowance based on previous performance + scale := 2.0 + if time.Since(p.started) > bodySoftTTL { + scale = 0.5 + if time.Since(p.started) > bodyHardTTL { + scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1 + } + } + for { + // Calculate the new download bandwidth allowance + prev := atomic.LoadInt32(&p.capacity) + next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale))) + + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + // If we're having problems at 1 capacity, try to find better peers + if next == 1 { + p.Demote() + } + break + } + } + // Set the peer to idle to allow further block requests + atomic.StoreInt32(&p.idle, 0) +} + // Capacity retrieves the peers block download allowance based on its previously // discovered bandwidth capacity. func (p *peer) Capacity() int { |