aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-30 00:37:26 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-11-19 23:01:39 +0800
commitb6f5523bdcded47c4f92b4cb5e6e23287bd6b60d (patch)
tree7e9a13377f52658d398f4f3dc11883f515bdbb3d /eth/downloader/downloader.go
parent4c2933ad825aa11ce118abddfe6eeafc0422b2b6 (diff)
downloaddexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar
dexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.gz
dexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.bz2
dexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.lz
dexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.xz
dexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.tar.zst
dexon-b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d.zip
eth/downloader: fetch data proportionally to peer capacity
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go178
1 files changed, 74 insertions, 104 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 5fa18a2e3..c272d05af 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -45,16 +45,17 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
- hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
- blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
- blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
- headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
- bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
- bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
- receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
- stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
+ hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
+ blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
+ blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
+
+ headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
+ bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
+ bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
+ receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
+ receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
+ stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -486,7 +487,7 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
// Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head})
- timeout := time.After(blockSoftTTL)
+ timeout := time.After(hashTTL)
for {
select {
case <-d.cancelCh:
@@ -779,47 +780,27 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of blocks, and demote in case of errors
blocks := packet.(*blockPack).blocks
- err := d.queue.DeliverBlocks(peer.id, blocks)
- switch err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above)
- if len(blocks) == 0 {
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of blocks and check chain validity
+ accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: stale delivery", peer)
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ peer.SetBlocksIdle(accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && len(blocks) == 0:
+ glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
+ glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
}
}
// Blocks arrived, try to update the progress
@@ -852,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
+ for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ peer.SetBlocksIdle(0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
@@ -1281,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- setIdle = func(p *peer) { p.SetBodiesIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -1305,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
- expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() }
- setIdle = func(p *peer) { p.SetReceiptsIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -1329,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Debug).Infof("Downloading node state data")
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
if err != nil {
@@ -1352,14 +1338,14 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
}
- expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() }
- setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@@ -1372,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
- expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
- idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
+ idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1394,45 +1380,25 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of data, and demote in case of errors
- switch err := deliver(packet); err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Items() == 0 {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
-
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of data and check chain validity
+ accepted, err := deliver(packet)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ setIdle(peer, accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && packet.Items() == 0:
+ glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
+ glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
}
}
// Blocks assembled, try to update the progress
@@ -1465,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
- for _, pid := range expire() {
+ for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ setIdle(peer, 0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate