aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2016-04-19 17:27:37 +0800
committerPéter Szilágyi <peterke@gmail.com>2016-05-17 15:03:34 +0800
commite86619e75d1bd1209818ab4df2fac52e3c43b5e1 (patch)
tree9c0cd23e40ec9b8fde9d189ece3ed5e393cc753c /eth/downloader/downloader.go
parentb40dc8a1daf4bd1f293cf322274b470ad91517fb (diff)
downloadgo-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar
go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.gz
go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.bz2
go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.lz
go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.xz
go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.zst
go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.zip
eth/downloader: stream partial skeleton filling to processor
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go38
1 files changed, 25 insertions, 13 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 2b2de1b5f..2f79c2dfd 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -54,7 +54,7 @@ var (
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
- headerTTL = 2 * time.Second // [eth/62] Time it takes for a header request to time out
+ headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
@@ -1064,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
continue
}
// Otherwise check if we already know the header or not
- if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) {
+ if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
break
}
@@ -1226,21 +1226,24 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
// If we received a skeleton batch, resolve internals concurrently
if skeleton {
- filled, err := d.fillHeaderSkeleton(from, headers)
+ filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
return errInvalidChain
}
- headers = filled
+ headers = filled[proced:]
+ from += uint64(proced)
}
// Insert all the new headers and fetch the next batch
- glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
- select {
- case d.headerProcCh <- headers:
- case <-d.cancelCh:
- return errCancelHeaderFetch
+ if len(headers) > 0 {
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
+ select {
+ case d.headerProcCh <- headers:
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
+ }
+ from += uint64(len(headers))
}
- from += uint64(len(headers))
getHeaders(from)
case <-timeout.C:
@@ -1272,14 +1275,21 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
-func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, error) {
+//
+// Any partial results from the beginning of the skeleton is (if possible) forwarded
+// immediately to the header processor to keep the rest of the pipeline full even
+// in the case of header stalls.
+//
+// The method returs the entire filled skeleton and also the number of headers
+// already forwarded for processing.
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
d.queue.ScheduleSkeleton(from, skeleton)
var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*headerPack)
- return d.queue.DeliverHeaders(pack.peerId, pack.headers)
+ return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
}
expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
throttle = func() bool { return false }
@@ -1295,7 +1305,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
- return d.queue.RetrieveHeaders(), err
+
+ filled, proced := d.queue.RetrieveHeaders()
+ return filled, proced, err
}
// fetchBodies iteratively downloads the scheduled block bodies, taking any