From b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 29 Oct 2015 18:37:26 +0200 Subject: eth/downloader: fetch data proportionally to peer capacity --- eth/downloader/downloader.go | 178 ++++++++++++++------------------- eth/downloader/peer.go | 231 +++++++++++++++++++++++-------------------- eth/downloader/queue.go | 95 +++++++++++------- 3 files changed, 258 insertions(+), 246 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5fa18a2e3..c272d05af 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -45,16 +45,17 @@ var ( MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request - hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired - headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out - bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth - bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired - stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired + hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out + blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request + blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired + + headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out + bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request + bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired + receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request + receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired + stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request + stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) @@ -486,7 +487,7 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { // Request the advertised remote head block and wait for the response go p.getBlocks([]common.Hash{p.head}) - timeout := time.After(blockSoftTTL) + timeout := time.After(hashTTL) for { select { case <-d.cancelCh: @@ -779,47 +780,27 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of blocks, and demote in case of errors blocks := packet.(*blockPack).blocks - err := d.queue.DeliverBlocks(peer.id, blocks) - switch err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(blocks) == 0 { - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of blocks and check chain validity + accepted, err := d.queue.DeliverBlocks(peer.id, blocks) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + peer.SetBlocksIdle(accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && len(blocks) == 0: + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err) } } // Blocks arrived, try to update the progress @@ -852,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { + for pid, fails := range d.queue.ExpireBlocks(blockTTL) { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + peer.SetBlocksIdle(0) + } else { + glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -1281,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } + expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - setIdle = func(p *peer) { p.SetBodiesIdle() } + setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -1305,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } - expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) } + expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } capacity = func(p *peer) int { return p.ReceiptCapacity() } - setIdle = func(p *peer) { p.SetReceiptsIdle() } + setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -1329,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error { glog.V(logger.Debug).Infof("Downloading node state data") var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { if err != nil { @@ -1352,14 +1338,14 @@ func (d *Downloader) fetchNodeData() error { glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) }) } - expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveNodeData(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } capacity = func(p *peer) int { return p.NodeDataCapacity() } - setIdle = func(p *peer) { p.SetNodeDataIdle() } + setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, @@ -1372,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, - expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1394,45 +1380,25 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of data, and demote in case of errors - switch err := deliver(packet); err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Items() == 0 { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) - - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of data and check chain validity + accepted, err := deliver(packet) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind)) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + setIdle(peer, accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && packet.Items() == 0: + glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) + glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err) } } // Blocks assembled, try to update the progress @@ -1465,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv return errNoPeers } // Check for fetch request timeouts and demote the responsible peers - for _, pid := range expire() { + for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + setIdle(peer, 0) + } else { + glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind)) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 9ba6dabbd..80f08b68f 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -30,8 +30,10 @@ import ( "github.com/ethereum/go-ethereum/common" ) -// Maximum number of entries allowed on the list or lacking items. -const maxLackingHashes = 4096 +const ( + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items + throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. +) // Hash and block fetchers belonging to eth/61 and below type relativeHashFetcherFn func(common.Hash) error @@ -59,18 +61,16 @@ type peer struct { blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation - blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request - receiptCapacity int32 // Number of receipts allowed to fetch per request - stateCapacity int32 // Number of node data pieces allowed to fetch per request + blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second + receiptThroughput float64 // Number of receipts measured to be retrievable per second + stateThroughput float64 // Number of node data pieces measured to be retrievable per second blockStarted time.Time // Time instance when the last block (body)fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started - lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) - lackingLock sync.RWMutex // Lock protecting the lacking hashes list + lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position @@ -84,6 +84,7 @@ type peer struct { getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies + lock sync.RWMutex } // newPeer create a new downloader peer, with specific hash and block retrieval @@ -93,12 +94,9 @@ func newPeer(id string, version int, head common.Hash, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ - id: id, - head: head, - blockCapacity: 1, - receiptCapacity: 1, - stateCapacity: 1, - lacking: make(map[common.Hash]struct{}), + id: id, + head: head, + lacking: make(map[common.Hash]struct{}), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -117,15 +115,18 @@ func newPeer(id string, version int, head common.Hash, // Reset clears the internal state of a peer entity. func (p *peer) Reset() { + p.lock.Lock() + defer p.lock.Unlock() + atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0) - atomic.StoreInt32(&p.blockCapacity, 1) - atomic.StoreInt32(&p.receiptCapacity, 1) - atomic.StoreInt32(&p.stateCapacity, 1) + atomic.StoreInt32(&p.stateIdle, 0) + + p.blockThroughput = 0 + p.receiptThroughput = 0 + p.stateThroughput = 0 - p.lackingLock.Lock() p.lacking = make(map[common.Hash]struct{}) - p.lackingLock.Unlock() } // Fetch61 sends a block retrieval request to the remote peer. @@ -216,107 +217,86 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return nil } -// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its block retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetBlocksIdle() { - p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) +// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval +// requests. Its estimated block retrieval throughput is updated with that measured +// just now. +func (p *peer) SetBlocksIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } -// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its block body retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetBodiesIdle() { - p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle) +// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval +// requests. Its estimated body retrieval throughput is updated with that measured +// just now. +func (p *peer) SetBodiesIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } -// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its receipt retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetReceiptsIdle() { - p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) +// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt +// retrieval requests. Its estimated receipt retrieval throughput is updated +// with that measured just now. +func (p *peer) SetReceiptsIdle(delivered int) { + p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) } -// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval -// requests. Its node data retrieval allowance will also be updated either up- or -// downwards, depending on whether the previous fetch completed in time. -func (p *peer) SetNodeDataIdle() { - p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie +// data retrieval requests. Its estimated state retrieval throughput is updated +// with that measured just now. +func (p *peer) SetNodeDataIdle(delivered int) { + p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) } // setIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its data retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { - // Update the peer's download allowance based on previous performance - scale := 2.0 - if time.Since(started) > softTTL { - scale = 0.5 - if time.Since(started) > hardTTL { - scale = 1 / float64(maxFetch) // reduces capacity to 1 - } - } - for { - // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(capacity) - next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale))) - - // Try to update the old value - if atomic.CompareAndSwapInt32(capacity, prev, next) { - // If we're having problems at 1 capacity, try to find better peers - if next == 1 { - p.Demote() - } - break - } +// Its estimated retrieval throughput is updated with that measured just now. +func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { + // Irrelevant of the scaling, make sure the peer ends up idle + defer atomic.StoreInt32(idle, 0) + + p.lock.RLock() + defer p.lock.RUnlock() + + // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum + if delivered == 0 { + *throughput = 0 + return } - // Set the peer to idle to allow further fetch requests - atomic.StoreInt32(idle, 0) + // Otherwise update the throughput with a new measurement + measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor + *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured } // BlockCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// previously discovered throughput. func (p *peer) BlockCapacity() int { - return int(atomic.LoadInt32(&p.blockCapacity)) -} + p.lock.RLock() + defer p.lock.RUnlock() -// ReceiptCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. -func (p *peer) ReceiptCapacity() int { - return int(atomic.LoadInt32(&p.receiptCapacity)) + return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch)))) } -// NodeDataCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. -func (p *peer) NodeDataCapacity() int { - return int(atomic.LoadInt32(&p.stateCapacity)) -} +// ReceiptCapacity retrieves the peers receipt download allowance based on its +// previously discovered throughput. +func (p *peer) ReceiptCapacity() int { + p.lock.RLock() + defer p.lock.RUnlock() -// Promote increases the peer's reputation. -func (p *peer) Promote() { - atomic.AddInt32(&p.rep, 1) + return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch)))) } -// Demote decreases the peer's reputation or leaves it at 0. -func (p *peer) Demote() { - for { - // Calculate the new reputation value - prev := atomic.LoadInt32(&p.rep) - next := prev / 2 +// NodeDataCapacity retrieves the peers state download allowance based on its +// previously discovered throughput. +func (p *peer) NodeDataCapacity() int { + p.lock.RLock() + defer p.lock.RUnlock() - // Try to update the old value - if atomic.CompareAndSwapInt32(&p.rep, prev, next) { - return - } - } + return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch)))) } // MarkLacking appends a new entity to the set of items (blocks, receipts, states) // that a peer is known not to have (i.e. have been requested before). If the // set reaches its maximum allowed capacity, items are randomly dropped off. func (p *peer) MarkLacking(hash common.Hash) { - p.lackingLock.Lock() - defer p.lackingLock.Unlock() + p.lock.Lock() + defer p.lock.Unlock() for len(p.lacking) >= maxLackingHashes { for drop, _ := range p.lacking { @@ -330,8 +310,8 @@ func (p *peer) MarkLacking(hash common.Hash) { // Lacks retrieves whether the hash of a blockchain item is on the peers lacking // list (i.e. whether we know that the peer does not have it). func (p *peer) Lacks(hash common.Hash) bool { - p.lackingLock.RLock() - defer p.lackingLock.RUnlock() + p.lock.RLock() + defer p.lock.RUnlock() _, ok := p.lacking[hash] return ok @@ -339,13 +319,13 @@ func (p *peer) Lacks(hash common.Hash) bool { // String implements fmt.Stringer. func (p *peer) String() string { - p.lackingLock.RLock() - defer p.lackingLock.RUnlock() + p.lock.RLock() + defer p.lock.RUnlock() return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ - fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ - fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ + fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ + fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ + fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ fmt.Sprintf("lacking %4d", len(p.lacking)), ) } @@ -377,6 +357,10 @@ func (ps *peerSet) Reset() { // Register injects a new peer into the working set, or returns an error if the // peer is already known. +// +// The method also sets the starting throughput values of the new peer to the +// average of all existing peers, to give it a realistic change of being used +// for data retrievals. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -384,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error { if _, ok := ps.peers[p.id]; ok { return errAlreadyRegistered } + if len(ps.peers) > 0 { + p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0 + + for _, peer := range ps.peers { + peer.lock.RLock() + p.blockThroughput += peer.blockThroughput + p.receiptThroughput += peer.receiptThroughput + p.stateThroughput += peer.stateThroughput + peer.lock.RUnlock() + } + p.blockThroughput /= float64(len(ps.peers)) + p.receiptThroughput /= float64(len(ps.peers)) + p.stateThroughput /= float64(len(ps.peers)) + } ps.peers[p.id] = p return nil } @@ -435,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - return ps.idlePeers(61, 61, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(61, 61, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -444,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - return ps.idlePeers(62, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(62, 64, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -453,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.receiptIdle) == 0 } - return ps.idlePeers(63, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.receiptThroughput + } + return ps.idlePeers(63, 64, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -462,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.stateIdle) == 0 } - return ps.idlePeers(63, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.stateThroughput + } + return ps.idlePeers(63, 64, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the // protocol version constraints, using the provided function to check idleness. -func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { +// The resulting set of peers are sorted by their measure throughput. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() @@ -482,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) } for i := 0; i < len(idle); i++ { for j := i + 1; j < len(idle); j++ { - if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + if throughput(idle[i]) < throughput(idle[j]) { idle[i], idle[j] = idle[j], idle[i] } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 584797d7b..1e55560db 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) { // ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { +func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string { // ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBodies(timeout time.Duration) []string { +func (q *queue) ExpireBodies(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string { // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireReceipts(timeout time.Duration) []string { +func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string { // ExpireNodeData checks for in flight node data requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) []string { +func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { // Iterate over the expired requests and return each to the queue - peers := []string{} + expiries := make(map[string]int) for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout @@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } - peers = append(peers, id) + // Add the peer to the expiry report along the the number of failed requests + expirations := len(request.Hashes) + if expirations < len(request.Headers) { + expirations = len(request.Headers) + } + expiries[id] = expirations } } // Remove the expired requests from the pending pool - for _, id := range peers { + for id, _ := range expiries { delete(pendPool, id) } - return peers + return expiries } -// DeliverBlocks injects a block retrieval response into the download queue. -func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { +// DeliverBlocks injects a block retrieval response into the download queue. The +// method returns the number of blocks accepted from the delivery and also wakes +// any threads waiting for data delivery. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the blocks were never requested request := q.blockPendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } blockReqTimer.UpdateSince(request.Time) delete(q.blockPendPool, id) @@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { } } // Iterate over the downloaded blocks and add each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) for _, block := range blocks { // Skip any blocks that were not requested hash := block.Hash() @@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { delete(request.Hashes, hash) delete(q.hashPool, hash) + accepted++ } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } // Wake up WaitResults - q.active.Signal() + if accepted > 0 { + q.active.Signal() + } // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: - return nil + return accepted, nil case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): - return errs[0] + return accepted, errs[0] case len(errs) == len(blocks): - return errStaleDelivery + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } // DeliverBodies injects a block body retrieval response into the results queue. -func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// The method returns the number of blocks bodies accepted from the delivery and +// also wakes any threads waiting for data delivery. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi } // DeliverReceipts injects a receipt retrieval response into the results queue. -func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { +// The method returns the number of transaction receipts accepted from the delivery +// and also wakes any threads waiting for data delivery. +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, - donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, + results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { + // Short circuit if the data was never requested request := pendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } reqTimer.UpdateSince(request.Time) delete(pendPool, id) @@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } // Assemble each of the results with their headers and retrieved data parts var ( - failure error - useful bool + accepted int + failure error + useful bool ) for i, header := range request.Headers { // Short circuit assembly if no more fetch results are found @@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ donePool[header.Hash()] = struct{}{} q.resultCache[index].Pending-- useful = true + accepted++ // Clean up a successful fetch request.Headers[i] = nil @@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } // Wake up WaitResults - q.active.Signal() + if accepted > 0 { + q.active.Signal() + } // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: - return failure + return accepted, failure case useful: - return fmt.Errorf("partial failure: %v", failure) + return accepted, fmt.Errorf("partial failure: %v", failure) default: - return errStaleDelivery + return accepted, errStaleDelivery } } // DeliverNodeData injects a node state data retrieval response into the queue. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error { +// The method returns the number of node state entries originally requested, and +// the number of them actually accepted from the delivery. +func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the data was never requested request := q.statePendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } stateReqTimer.UpdateSince(request.Time) delete(q.statePendPool, id) @@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i } } // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) process := []trie.SyncResult{} for _, blob := range data { - // Skip any blocks that were not requested + // Skip any state trie entires that were not requested hash := common.BytesToHash(crypto.Sha3(blob)) if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) @@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i } // Inject the next state trie item into the processing queue process = append(process, trie.SyncResult{hash, blob}) + accepted++ delete(request.Hashes, hash) delete(q.stateTaskPool, hash) @@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If none of the data items were good, it's a stale delivery switch { case len(errs) == 0: - return nil + return accepted, nil case len(errs) == len(request.Hashes): - return errStaleDelivery + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } -- cgit v1.2.3