aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go192
1 files changed, 123 insertions, 69 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index c1d20ac61..5fc0db587 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -36,10 +36,11 @@ type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error
-// Block header and body fethers belonging to eth/62 and above
+// Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
+type receiptFetcherFn func([]common.Hash) error
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
@@ -52,11 +53,14 @@ type peer struct {
id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block
- idle int32 // Current activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation
+ blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
+ receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+ rep int32 // Simple peer reputation
- capacity int32 // Number of blocks allowed to fetch per request
- started time.Time // Time instance when the last fetch was started
+ blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
+ receiptCapacity int32 // Number of receipts allowed to fetch per request
+ blockStarted time.Time // Time instance when the last block (body)fetch was started
+ receiptStarted time.Time // Time instance when the last receipt fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously)
@@ -68,6 +72,8 @@ type peer struct {
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
+ getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
+
version int // Eth protocol version number to switch strategies
}
@@ -75,12 +81,14 @@ type peer struct {
// mechanisms.
func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer {
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
+ getReceipts receiptFetcherFn) *peer {
return &peer{
- id: id,
- head: head,
- capacity: 1,
- ignored: set.New(),
+ id: id,
+ head: head,
+ blockCapacity: 1,
+ receiptCapacity: 1,
+ ignored: set.New(),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
@@ -90,24 +98,28 @@ func newPeer(id string, version int, head common.Hash,
getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies,
+ getReceipts: getReceipts,
+
version: version,
}
}
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
- atomic.StoreInt32(&p.idle, 0)
- atomic.StoreInt32(&p.capacity, 1)
+ atomic.StoreInt32(&p.blockIdle, 0)
+ atomic.StoreInt32(&p.receiptIdle, 0)
+ atomic.StoreInt32(&p.blockCapacity, 1)
+ atomic.StoreInt32(&p.receiptCapacity, 1)
p.ignored.Clear()
}
// Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch61(request *fetchRequest) error {
// Short circuit if the peer is already fetching
- if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+ if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching
}
- p.started = time.Now()
+ p.blockStarted = time.Now()
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
@@ -119,13 +131,13 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return nil
}
-// Fetch sends a block body retrieval request to the remote peer.
-func (p *peer) Fetch(request *fetchRequest) error {
+// FetchBodies sends a block body retrieval request to the remote peer.
+func (p *peer) FetchBodies(request *fetchRequest) error {
// Short circuit if the peer is already fetching
- if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+ if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching
}
- p.started = time.Now()
+ p.blockStarted = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
@@ -137,55 +149,64 @@ func (p *peer) Fetch(request *fetchRequest) error {
return nil
}
-// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time or not.
-func (p *peer) SetIdle61() {
- // Update the peer's download allowance based on previous performance
- scale := 2.0
- if time.Since(p.started) > blockSoftTTL {
- scale = 0.5
- if time.Since(p.started) > blockHardTTL {
- scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1
- }
+// FetchReceipts sends a receipt retrieval request to the remote peer.
+func (p *peer) FetchReceipts(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
+ return errAlreadyFetching
}
- for {
- // Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(&p.capacity)
- next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale)))
+ p.receiptStarted = time.Now()
- // Try to update the old value
- if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
- // If we're having problems at 1 capacity, try to find better peers
- if next == 1 {
- p.Demote()
- }
- break
- }
+ // Convert the header set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Headers))
+ for _, header := range request.Headers {
+ hashes = append(hashes, header.Hash())
}
- // Set the peer to idle to allow further block requests
- atomic.StoreInt32(&p.idle, 0)
+ go p.getReceipts(hashes)
+
+ return nil
+}
+
+// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its block retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) SetBlocksIdle() {
+ p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
}
-// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block body retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
-func (p *peer) SetIdle() {
+func (p *peer) SetBodiesIdle() {
+ p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
+}
+
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its receipt retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) SetReceiptsIdle() {
+ p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
+}
+
+// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its data retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
+func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
// Update the peer's download allowance based on previous performance
scale := 2.0
- if time.Since(p.started) > bodySoftTTL {
+ if time.Since(started) > softTTL {
scale = 0.5
- if time.Since(p.started) > bodyHardTTL {
- scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1
+ if time.Since(started) > hardTTL {
+ scale = 1 / float64(maxFetch) // reduces capacity to 1
}
}
for {
// Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(&p.capacity)
- next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale)))
+ prev := atomic.LoadInt32(capacity)
+ next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
// Try to update the old value
- if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
+ if atomic.CompareAndSwapInt32(capacity, prev, next) {
// If we're having problems at 1 capacity, try to find better peers
if next == 1 {
p.Demote()
@@ -193,14 +214,20 @@ func (p *peer) SetIdle() {
break
}
}
- // Set the peer to idle to allow further block requests
- atomic.StoreInt32(&p.idle, 0)
+ // Set the peer to idle to allow further fetch requests
+ atomic.StoreInt32(idle, 0)
+}
+
+// BlockCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) BlockCapacity() int {
+ return int(atomic.LoadInt32(&p.blockCapacity))
}
-// Capacity retrieves the peers block download allowance based on its previously
-// discovered bandwidth capacity.
-func (p *peer) Capacity() int {
- return int(atomic.LoadInt32(&p.capacity))
+// ReceiptCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) ReceiptCapacity() int {
+ return int(atomic.LoadInt32(&p.receiptCapacity))
}
// Promote increases the peer's reputation.
@@ -226,7 +253,8 @@ func (p *peer) Demote() {
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
- fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+
+ fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
+ fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
fmt.Sprintf("ignored %4d", p.ignored.Size()),
)
}
@@ -310,26 +338,52 @@ func (ps *peerSet) AllPeers() []*peer {
return list
}
-// IdlePeers retrieves a flat list of all the currently idle peers within the
+// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
-func (ps *peerSet) IdlePeers(version int) []*peer {
+func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
+ idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
- if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) {
- if atomic.LoadInt32(&p.idle) == 0 {
- list = append(list, p)
+ if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
+ if atomic.LoadInt32(&p.blockIdle) == 0 {
+ idle = append(idle, p)
}
+ total++
}
}
- for i := 0; i < len(list); i++ {
- for j := i + 1; j < len(list); j++ {
- if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
- list[i], list[j] = list[j], list[i]
+ for i := 0; i < len(idle); i++ {
+ for j := i + 1; j < len(idle); j++ {
+ if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ idle[i], idle[j] = idle[j], idle[i]
}
}
}
- return list
+ return idle, total
+}
+
+// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
+// active peer set, ordered by their reputation.
+func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ idle, total := make([]*peer, 0, len(ps.peers)), 0
+ for _, p := range ps.peers {
+ if p.version >= 63 {
+ if atomic.LoadInt32(&p.receiptIdle) == 0 {
+ idle = append(idle, p)
+ }
+ total++
+ }
+ }
+ for i := 0; i < len(idle); i++ {
+ for j := i + 1; j < len(idle); j++ {
+ if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ idle[i], idle[j] = idle[j], idle[i]
+ }
+ }
+ }
+ return idle, total
}