diff options
author | Péter Szilágyi <peterke@gmail.com> | 2016-04-19 17:27:37 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2016-05-17 15:03:34 +0800 |
commit | e86619e75d1bd1209818ab4df2fac52e3c43b5e1 (patch) | |
tree | 9c0cd23e40ec9b8fde9d189ece3ed5e393cc753c /eth/downloader/downloader.go | |
parent | b40dc8a1daf4bd1f293cf322274b470ad91517fb (diff) | |
download | go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.gz go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.bz2 go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.lz go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.xz go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.tar.zst go-tangerine-e86619e75d1bd1209818ab4df2fac52e3c43b5e1.zip |
eth/downloader: stream partial skeleton filling to processor
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 2b2de1b5f..2f79c2dfd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,7 +54,7 @@ var ( blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) - headerTTL = 2 * time.Second // [eth/62] Time it takes for a header request to time out + headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request @@ -1064,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -1226,21 +1226,24 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // If we received a skeleton batch, resolve internals concurrently if skeleton { - filled, err := d.fillHeaderSkeleton(from, headers) + filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err) return errInvalidChain } - headers = filled + headers = filled[proced:] + from += uint64(proced) } // Insert all the new headers and fetch the next batch - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) - select { - case d.headerProcCh <- headers: - case <-d.cancelCh: - return errCancelHeaderFetch + if len(headers) > 0 { + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCancelHeaderFetch + } + from += uint64(len(headers)) } - from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1272,14 +1275,21 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // fillHeaderSkeleton concurrently retrieves headers from all our available peers // and maps them to the provided skeleton header chain. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, error) { +// +// Any partial results from the beginning of the skeleton is (if possible) forwarded +// immediately to the header processor to keep the rest of the pipeline full even +// in the case of header stalls. +// +// The method returs the entire filled skeleton and also the number of headers +// already forwarded for processing. +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from) d.queue.ScheduleSkeleton(from, skeleton) var ( deliver = func(packet dataPack) (int, error) { pack := packet.(*headerPack) - return d.queue.DeliverHeaders(pack.peerId, pack.headers) + return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) } expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } throttle = func() bool { return false } @@ -1295,7 +1305,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header") glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err) - return d.queue.RetrieveHeaders(), err + + filled, proced := d.queue.RetrieveHeaders() + return filled, proced, err } // fetchBodies iteratively downloads the scheduled block bodies, taking any |