diff options
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 261 |
1 files changed, 191 insertions, 70 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index c1d20ac61..1f457cb15 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -36,10 +36,12 @@ type relativeHashFetcherFn func(common.Hash) error type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error -// Block header and body fethers belonging to eth/62 and above +// Block header and body fetchers belonging to eth/62 and above type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type blockBodyFetcherFn func([]common.Hash) error +type receiptFetcherFn func([]common.Hash) error +type stateFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") @@ -52,11 +54,18 @@ type peer struct { id string // Unique identifier of the peer head common.Hash // Hash of the peers latest known block - idle int32 // Current activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation + blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) + receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) + rep int32 // Simple peer reputation - capacity int32 // Number of blocks allowed to fetch per request - started time.Time // Time instance when the last fetch was started + blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request + receiptCapacity int32 // Number of receipts allowed to fetch per request + stateCapacity int32 // Number of node data pieces allowed to fetch per request + + blockStarted time.Time // Time instance when the last block (body)fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started + stateStarted time.Time // Time instance when the last node data fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) @@ -68,6 +77,9 @@ type peer struct { getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies + getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts + getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data + version int // Eth protocol version number to switch strategies } @@ -75,12 +87,15 @@ type peer struct { // mechanisms. func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, - ignored: set.New(), + id: id, + head: head, + blockCapacity: 1, + receiptCapacity: 1, + stateCapacity: 1, + ignored: set.New(), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -90,24 +105,34 @@ func newPeer(id string, version int, head common.Hash, getAbsHeaders: getAbsHeaders, getBlockBodies: getBlockBodies, + getReceipts: getReceipts, + getNodeData: getNodeData, + version: version, } } // Reset clears the internal state of a peer entity. func (p *peer) Reset() { - atomic.StoreInt32(&p.idle, 0) - atomic.StoreInt32(&p.capacity, 1) + atomic.StoreInt32(&p.blockIdle, 0) + atomic.StoreInt32(&p.receiptIdle, 0) + atomic.StoreInt32(&p.blockCapacity, 1) + atomic.StoreInt32(&p.receiptCapacity, 1) + atomic.StoreInt32(&p.stateCapacity, 1) p.ignored.Clear() } // Fetch61 sends a block retrieval request to the remote peer. func (p *peer) Fetch61(request *fetchRequest) error { + // Sanity check the protocol version + if p.version != 61 { + panic(fmt.Sprintf("block fetch [eth/61] requested on eth/%d", p.version)) + } // Short circuit if the peer is already fetching - if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { return errAlreadyFetching } - p.started = time.Now() + p.blockStarted = time.Now() // Convert the hash set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Hashes)) @@ -119,13 +144,17 @@ func (p *peer) Fetch61(request *fetchRequest) error { return nil } -// Fetch sends a block body retrieval request to the remote peer. -func (p *peer) Fetch(request *fetchRequest) error { +// FetchBodies sends a block body retrieval request to the remote peer. +func (p *peer) FetchBodies(request *fetchRequest) error { + // Sanity check the protocol version + if p.version < 62 { + panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version)) + } // Short circuit if the peer is already fetching - if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) { return errAlreadyFetching } - p.started = time.Now() + p.blockStarted = time.Now() // Convert the header set to a retrievable slice hashes := make([]common.Hash, 0, len(request.Headers)) @@ -137,55 +166,97 @@ func (p *peer) Fetch(request *fetchRequest) error { return nil } -// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests. -// Its block retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle61() { - // Update the peer's download allowance based on previous performance - scale := 2.0 - if time.Since(p.started) > blockSoftTTL { - scale = 0.5 - if time.Since(p.started) > blockHardTTL { - scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1 - } +// FetchReceipts sends a receipt retrieval request to the remote peer. +func (p *peer) FetchReceipts(request *fetchRequest) error { + // Sanity check the protocol version + if p.version < 63 { + panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version)) } - for { - // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale))) + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) { + return errAlreadyFetching + } + p.receiptStarted = time.Now() - // Try to update the old value - if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { - // If we're having problems at 1 capacity, try to find better peers - if next == 1 { - p.Demote() - } - break - } + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) } - // Set the peer to idle to allow further block requests - atomic.StoreInt32(&p.idle, 0) + go p.getReceipts(hashes) + + return nil } -// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +// FetchNodeData sends a node state data retrieval request to the remote peer. +func (p *peer) FetchNodeData(request *fetchRequest) error { + // Sanity check the protocol version + if p.version < 63 { + panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) + } + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) { + return errAlreadyFetching + } + p.stateStarted = time.Now() + + // Convert the hash set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Hashes)) + for hash, _ := range request.Hashes { + hashes = append(hashes, hash) + } + go p.getNodeData(hashes) + + return nil +} + +// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its block retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time. +func (p *peer) SetBlocksIdle() { + p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) +} + +// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block body retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time or not. -func (p *peer) SetIdle() { +// depending on whether the previous fetch completed in time. +func (p *peer) SetBodiesIdle() { + p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle) +} + +// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its receipt retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time. +func (p *peer) SetReceiptsIdle() { + p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) +} + +// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval +// requests. Its node data retrieval allowance will also be updated either up- or +// downwards, depending on whether the previous fetch completed in time. +func (p *peer) SetNodeDataIdle() { + p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +} + +// setIdle sets the peer to idle, allowing it to execute new retrieval requests. +// Its data retrieval allowance will also be updated either up- or downwards, +// depending on whether the previous fetch completed in time. +func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { // Update the peer's download allowance based on previous performance scale := 2.0 - if time.Since(p.started) > bodySoftTTL { + if time.Since(started) > softTTL { scale = 0.5 - if time.Since(p.started) > bodyHardTTL { - scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1 + if time.Since(started) > hardTTL { + scale = 1 / float64(maxFetch) // reduces capacity to 1 } } for { // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(&p.capacity) - next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale))) + prev := atomic.LoadInt32(capacity) + next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale))) // Try to update the old value - if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { + if atomic.CompareAndSwapInt32(capacity, prev, next) { // If we're having problems at 1 capacity, try to find better peers if next == 1 { p.Demote() @@ -193,14 +264,26 @@ func (p *peer) SetIdle() { break } } - // Set the peer to idle to allow further block requests - atomic.StoreInt32(&p.idle, 0) + // Set the peer to idle to allow further fetch requests + atomic.StoreInt32(idle, 0) +} + +// BlockCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) BlockCapacity() int { + return int(atomic.LoadInt32(&p.blockCapacity)) +} + +// ReceiptCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) ReceiptCapacity() int { + return int(atomic.LoadInt32(&p.receiptCapacity)) } -// Capacity retrieves the peers block download allowance based on its previously -// discovered bandwidth capacity. -func (p *peer) Capacity() int { - return int(atomic.LoadInt32(&p.capacity)) +// NodeDataCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) NodeDataCapacity() int { + return int(atomic.LoadInt32(&p.stateCapacity)) } // Promote increases the peer's reputation. @@ -226,7 +309,8 @@ func (p *peer) Demote() { func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ - fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ + fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ fmt.Sprintf("ignored %4d", p.ignored.Size()), ) } @@ -310,26 +394,63 @@ func (ps *peerSet) AllPeers() []*peer { return list } -// IdlePeers retrieves a flat list of all the currently idle peers within the +// BlockIdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) IdlePeers(version int) []*peer { +func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 + } + return ps.idlePeers(61, 61, idle) +} + +// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within +// the active peer set, ordered by their reputation. +func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 + } + return ps.idlePeers(62, 64, idle) +} + +// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers +// within the active peer set, ordered by their reputation. +func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.receiptIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle +// peers within the active peer set, ordered by their reputation. +func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.stateIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// idlePeers retrieves a flat list of all currently idle peers satisfying the +// protocol version constraints, using the provided function to check idleness. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*peer, 0, len(ps.peers)) + idle, total := make([]*peer, 0, len(ps.peers)), 0 for _, p := range ps.peers { - if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) { - if atomic.LoadInt32(&p.idle) == 0 { - list = append(list, p) + if p.version >= minProtocol && p.version <= maxProtocol { + if idleCheck(p) { + idle = append(idle, p) } + total++ } } - for i := 0; i < len(list); i++ { - for j := i + 1; j < len(list); j++ { - if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) { - list[i], list[j] = list[j], list[i] + for i := 0; i < len(idle); i++ { + for j := i + 1; j < len(idle); j++ { + if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + idle[i], idle[j] = idle[j], idle[i] } } } - return list + return idle, total } |