diff options
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 192 |
1 files changed, 123 insertions, 69 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c1d20ac61..5fc0db587 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -36,10 +36,11 @@ type relativeHashFetcherFn func(common.Hash) error type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error -// Block header and body fethers belonging to eth/62 and above +// Block header and body fetchers belonging to eth/62 and above type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type blockBodyFetcherFn func([]common.Hash) error +type receiptFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") @@ -52,11 +53,14 @@ type peer struct { id string // Unique identifier of the peer head common.Hash // Hash of the peers latest known block - idle int32 // Current activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation + blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) + receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + rep int32 // Simple peer reputation - capacity int32 // Number of blocks allowed to fetch per request - started time.Time // Time instance when the last fetch was started + blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request + receiptCapacity int32 // Number of receipts allowed to fetch per request + blockStarted time.Time // Time instance when the last block (body)fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) @@ -68,6 +72,8 @@ type peer struct { getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies + getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts + version int // Eth protocol version number to switch strategies } @@ -75,12 +81,14 @@ type peer struct { // mechanisms. func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, + getReceipts receiptFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, - ignored: set.New(), + id: id, + head: head, + blockCapacity: 1, + receiptCapacity: 1, + ignored: set.New(), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -90,24 +98,28 @@ func newPeer(id string, version int, head common.Hash, getAbsHeaders: getAbsHeaders, getBlockBodies: getBlockBodies, + getReceipts: getReceipts, + version: version, } } // Reset clears the internal state of a peer entity. func (p *peer) Reset() { - atomic.StoreInt32(&p.idle, 0) - atomic.StoreInt32(&p.capacity, 1) + atomic.StoreInt32(&p.blockIdle, 0) + atomic.StoreInt32(&p.receiptIdle, 0) + atomic.StoreInt32(&p.blockCapacity, 1) + atomic.StoreInt32(&p.receiptCapacity, 1) p.ignored.Clear() } // Fetch61 sends a block retrieval request to the remote peer. func (p *peer) Fetch61(request *fetchRequest) error { // Short circuit if the peer is already fetching - if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { return errAlreadyFetching } - p.started = time.Now() + p.blockStarted = time.Now() // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) @@ -119,13 +131,13 @@ func (p *peer) Fetch61(request *fetchRequest) error { return nil } -// Fetch sends a block body retrieval request to the remote peer. -func (p *peer) Fetch(request *fetchRequest) error { +// FetchBodies sends a block body retrieval request to the remote peer. +func (p *peer) FetchBodies(request *fetchRequest) error { // Short circuit if the peer is already fetching - if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { return errAlreadyFetching } - p.started = time.Now() + p.blockStarted = time.Now() // Convert the header set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Headers)) @@ -137,55 +149,64 @@ func (p *peer) Fetch(request *fetchRequest) error { return nil } -// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests. -// Its block retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle61() { - // Update the peer's download allowance based on previous performance - scale := 2.0 - if time.Since(p.started) > blockSoftTTL { - scale = 0.5 - if time.Since(p.started) > blockHardTTL { - scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1 - } +// FetchReceipts sends a receipt retrieval request to the remote peer. +func (p *peer) FetchReceipts(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) { + return errAlreadyFetching } - for { - // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale))) + p.receiptStarted = time.Now() - // Try to update the old value - if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { - // If we're having problems at 1 capacity, try to find better peers - if next == 1 { - p.Demote() - } - break - } + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) } - // Set the peer to idle to allow further block requests - atomic.StoreInt32(&p.idle, 0) + go p.getReceipts(hashes) + + return nil +} + +// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) SetBlocksIdle() { + p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) } -// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block body retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle() { +func (p *peer) SetBodiesIdle() { + p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) +} + +// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its receipt retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) SetReceiptsIdle() { + p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) +} + +// setIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its data retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time or not. +func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { // Update the peer's download allowance based on previous performance scale := 2.0 - if time.Since(p.started) > bodySoftTTL { + if time.Since(started) > softTTL { scale = 0.5 - if time.Since(p.started) > bodyHardTTL { - scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1 + if time.Since(started) > hardTTL { + scale = 1 / float64(maxFetch) // reduces capacity to 1 } } for { // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale))) + prev := atomic.LoadInt32(capacity) + next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale))) // Try to update the old value - if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + if atomic.CompareAndSwapInt32(capacity, prev, next) { // If we're having problems at 1 capacity, try to find better peers if next == 1 { p.Demote() @@ -193,14 +214,20 @@ func (p *peer) SetIdle() { break } } - // Set the peer to idle to allow further block requests - atomic.StoreInt32(&p.idle, 0) + // Set the peer to idle to allow further fetch requests + atomic.StoreInt32(idle, 0) +} + +// BlockCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) BlockCapacity() int { + return int(atomic.LoadInt32(&p.blockCapacity)) } -// Capacity retrieves the peers block download allowance based on its previously -// discovered bandwidth capacity. -func (p *peer) Capacity() int { - return int(atomic.LoadInt32(&p.capacity)) +// ReceiptCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) ReceiptCapacity() int { + return int(atomic.LoadInt32(&p.receiptCapacity)) } // Promote increases the peer's reputation. @@ -226,7 +253,8 @@ func (p *peer) Demote() { func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ - fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ + fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ fmt.Sprintf("ignored %4d", p.ignored.Size()), ) } @@ -310,26 +338,52 @@ func (ps *peerSet) AllPeers() []*peer { return list } -// IdlePeers retrieves a flat list of all the currently idle peers within the +// BlockIdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) IdlePeers(version int) []*peer { +func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) + idle, total := make([]*peer, 0, len(ps.peers)), 0 for _, p := range ps.peers { - if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) { - if atomic.LoadInt32(&p.idle) == 0 { - list = append(list, p) + if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) { + if atomic.LoadInt32(&p.blockIdle) == 0 { + idle = append(idle, p) } + total++ } } - for i := 0; i < len(list); i++ { - for j := i + 1; j < len(list); j++ { - if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) { - list[i], list[j] = list[j], list[i] + for i := 0; i < len(idle); i++ { + for j := i + 1; j < len(idle); j++ { + if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + idle[i], idle[j] = idle[j], idle[i] } } } - return list + return idle, total +} + +// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the +// active peer set, ordered by their reputation. +func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { + ps.lock.RLock() + defer ps.lock.RUnlock() + + idle, total := make([]*peer, 0, len(ps.peers)), 0 + for _, p := range ps.peers { + if p.version >= 63 { + if atomic.LoadInt32(&p.receiptIdle) == 0 { + idle = append(idle, p) + } + total++ + } + } + for i := 0; i < len(idle); i++ { + for j := i + 1; j < len(idle); j++ { + if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + idle[i], idle[j] = idle[j], idle[i] + } + } + } + return idle, total } |