aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-08-15 02:25:41 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-08-25 22:48:47 +0800
commit47a7fe5d22fe2a6be783f6576070814fe951eaaf (patch)
tree61f2f691c6775fa5ae3547b8d769a709b7b3f04c /eth/downloader/peer.go
parentca88e18f59af84f34ad67da21fd27a6407eea87c (diff)
downloadgo-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar
go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.gz
go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.bz2
go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.lz
go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.xz
go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.zst
go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.zip
eth: port the synchronisation algo to eth/62
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go92
1 files changed, 79 insertions, 13 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 4273b9168..8fd1f9a99 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -31,10 +31,16 @@ import (
"gopkg.in/fatih/set.v0"
)
+// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error
+// Block header and body fethers belonging to eth/62 and above
+type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
+type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
+type blockBodyFetcherFn func([]common.Hash) error
+
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered")
@@ -54,25 +60,37 @@ type peer struct {
ignored *set.Set // Set of hashes not to request (didn't have previously)
- getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash
- getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position
- getBlocks blockFetcherFn // Method to retrieve a batch of blocks
+ getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
+ getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
+ getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks
+
+ getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
+ getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
+ getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
version int // Eth protocol version number to switch strategies
}
// newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms.
-func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer {
+func newPeer(id string, version int, head common.Hash,
+ getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer {
return &peer{
- id: id,
- head: head,
- capacity: 1,
+ id: id,
+ head: head,
+ capacity: 1,
+ ignored: set.New(),
+
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
getBlocks: getBlocks,
- ignored: set.New(),
- version: version,
+
+ getRelHeaders: getRelHeaders,
+ getAbsHeaders: getAbsHeaders,
+ getBlockBodies: getBlockBodies,
+
+ version: version,
}
}
@@ -83,8 +101,8 @@ func (p *peer) Reset() {
p.ignored.Clear()
}
-// Fetch sends a block retrieval request to the remote peer.
-func (p *peer) Fetch(request *fetchRequest) error {
+// Fetch61 sends a block retrieval request to the remote peer.
+func (p *peer) Fetch61(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching
@@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error {
return nil
}
-// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Fetch sends a block body retrieval request to the remote peer.
+func (p *peer) Fetch(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.started = time.Now()
+
+ // Convert the header set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Headers))
+ for _, header := range request.Headers {
+ hashes = append(hashes, header.Hash())
+ }
+ go p.getBlockBodies(hashes)
+
+ return nil
+}
+
+// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
-func (p *peer) SetIdle() {
+func (p *peer) SetIdle61() {
// Update the peer's download allowance based on previous performance
scale := 2.0
if time.Since(p.started) > blockSoftTTL {
@@ -131,6 +167,36 @@ func (p *peer) SetIdle() {
atomic.StoreInt32(&p.idle, 0)
}
+// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its block body retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) SetIdle() {
+ // Update the peer's download allowance based on previous performance
+ scale := 2.0
+ if time.Since(p.started) > bodySoftTTL {
+ scale = 0.5
+ if time.Since(p.started) > bodyHardTTL {
+ scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1
+ }
+ }
+ for {
+ // Calculate the new download bandwidth allowance
+ prev := atomic.LoadInt32(&p.capacity)
+ next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale)))
+
+ // Try to update the old value
+ if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
+ // If we're having problems at 1 capacity, try to find better peers
+ if next == 1 {
+ p.Demote()
+ }
+ break
+ }
+ }
+ // Set the peer to idle to allow further block requests
+ atomic.StoreInt32(&p.idle, 0)
+}
+
// Capacity retrieves the peers block download allowance based on its previously
// discovered bandwidth capacity.
func (p *peer) Capacity() int {