aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2016-02-26 00:36:42 +0800
committerPéter Szilágyi <peterke@gmail.com>2016-05-17 15:03:34 +0800
commitb40dc8a1daf4bd1f293cf322274b470ad91517fb (patch)
tree0edef02f02f05b91b1c4969e79bab62c45721944 /eth/downloader/downloader.go
parentfe532a98f9f32bb81ef0d8d013cf44327830d11e (diff)
downloadgo-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.tar
go-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.tar.gz
go-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.tar.bz2
go-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.tar.lz
go-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.tar.xz
go-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.tar.zst
go-tangerine-b40dc8a1daf4bd1f293cf322274b470ad91517fb.zip
eth/downloader: implement concurrent header downloads
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go480
1 files changed, 299 insertions, 181 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 0f76357cb..2b2de1b5f 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -42,6 +42,7 @@ var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
+ MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
@@ -52,7 +53,8 @@ var (
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
- headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
+ headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
+ headerTTL = 2 * time.Second // [eth/62] Time it takes for a header request to time out
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
@@ -60,9 +62,10 @@ var (
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
- maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
- maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
- maxResultsProcess = 256 // Number of download results to import at once into the chain
+ maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
+ maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
+ maxResultsProcess = 4096 // Number of content download results to import at once into the chain
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
@@ -72,29 +75,30 @@ var (
)
var (
- errBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errStallingPeer = errors.New("peer is stalling")
- errNoPeers = errors.New("no peers to keep download active")
- errTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errEmptyHeaderSet = errors.New("empty header set by peer")
- errPeersUnavailable = errors.New("no peers available or all tried for download")
- errAlreadyInPool = errors.New("hash already in pool")
- errInvalidAncestor = errors.New("retrieved ancestor is invalid")
- errInvalidChain = errors.New("retrieved hash chain is invalid")
- errInvalidBlock = errors.New("retrieved block is invalid")
- errInvalidBody = errors.New("retrieved block body is invalid")
- errInvalidReceipt = errors.New("retrieved receipt is invalid")
- errCancelHashFetch = errors.New("hash download canceled (requested)")
- errCancelBlockFetch = errors.New("block download canceled (requested)")
- errCancelHeaderFetch = errors.New("block header download canceled (requested)")
- errCancelBodyFetch = errors.New("block body download canceled (requested)")
- errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
- errCancelStateFetch = errors.New("state data download canceled (requested)")
- errCancelProcessing = errors.New("processing canceled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errNoPeers = errors.New("no peers to keep download active")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errEmptyHeaderSet = errors.New("empty header set by peer")
+ errPeersUnavailable = errors.New("no peers available or all tried for download")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidAncestor = errors.New("retrieved ancestor is invalid")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errInvalidBlock = errors.New("retrieved block is invalid")
+ errInvalidBody = errors.New("retrieved block body is invalid")
+ errInvalidReceipt = errors.New("retrieved receipt is invalid")
+ errCancelHashFetch = errors.New("hash download canceled (requested)")
+ errCancelBlockFetch = errors.New("block download canceled (requested)")
+ errCancelHeaderFetch = errors.New("block header download canceled (requested)")
+ errCancelBodyFetch = errors.New("block body download canceled (requested)")
+ errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
+ errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
+ errCancelContentProcessing = errors.New("content processing canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
type Downloader struct {
@@ -137,16 +141,17 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
- hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
- blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
- headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
- blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
- stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
+ hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
+ blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+ blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
+ headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@@ -194,6 +199,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1),
+ headerProcCh: make(chan []*types.Header, 1),
}
}
@@ -308,6 +314,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default:
}
}
+ for empty := false; !empty; {
+ select {
+ case <-d.headerProcCh:
+ default:
+ empty = true
+ }
+ }
// Reset any ephemeral sync statistics
d.syncStatsLock.Lock()
d.syncStatsStateTotal = 0
@@ -373,7 +386,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- return d.spawnSync(
+ return d.spawnSync(origin+1,
func() error { return d.fetchHashes61(p, td, origin+1) },
func() error { return d.fetchBlocks61(origin + 1) },
)
@@ -423,11 +436,12 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- return d.spawnSync(
- func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
- func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
+ return d.spawnSync(origin+1,
+ func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
+ func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
)
default:
@@ -439,11 +453,11 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(fetchers ...func() error) error {
+func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers)+1)
wg.Add(len(fetchers) + 1)
- go func() { defer wg.Done(); errc <- d.process() }()
+ go func() { defer wg.Done(); errc <- d.processContent() }()
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
@@ -1149,55 +1163,38 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return start, nil
}
-// fetchHeaders keeps retrieving headers from the requested number, until no more
-// are returned, potentially throttling on the way.
-//
-// The queue parameter can be used to switch between queuing headers for block
-// body download too, or directly import as pure header chains.
-func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
- glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from)
+// fetchHeaders keeps retrieving headers concurrently from the number
+// requested, until no more are returned, potentially throttling on the way. To
+// facilitate concurrency but still protect against malicious nodes sending bad
+// headers, we construct a header chain skeleton using the "origin" peer we are
+// syncing with, and fill in the missing headers using anyone else. Headers from
+// other peers are only accepted if they map cleanly to the skeleton. If noone
+// can fill in the skeleton - not even the origin peer - it's assumed invalid and
+// the origin is dropped.
+func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
+ glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
- // Calculate the pivoting point for switching from fast to slow sync
- pivot := d.queue.FastSyncPivot()
-
- // Keep a count of uncertain headers to roll back
- rollback := []*types.Header{}
- defer func() {
- if len(rollback) > 0 {
- // Flatten the headers and roll them back
- hashes := make([]common.Hash, len(rollback))
- for i, header := range rollback {
- hashes[i] = header.Hash()
- }
- lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
- d.rollback(hashes)
- glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
- len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number())
-
- // If we're already past the pivot point, this could be an attack, disable fast sync
- if rollback[len(rollback)-1].Number.Uint64() > pivot {
- d.noFast = true
- }
- }
- }()
-
- // Create a timeout timer, and the associated hash fetcher
- request := time.Now() // time of the last fetch request
+ // Create a timeout timer, and the associated header fetcher
+ skeleton := true // Skeleton assembly phase or finishing up
+ request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
getHeaders := func(from uint64) {
- glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)
-
- go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
+ if skeleton {
+ glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
+ go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
+ } else {
+ glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
+ go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
+ }
request = time.Now()
timeout.Reset(headerTTL)
}
- // Start pulling headers, until all are exhausted
+ // Start pulling the header chain skeleton until all is done
getHeaders(from)
- gotHeaders := false
for {
select {
@@ -1205,115 +1202,44 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return errCancelHeaderFetch
case packet := <-d.headerCh:
- // Make sure the active peer is giving us the headers
+ // Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
+ glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
+ // If the skeleton's finished, pull any remaining head headers directly from the origin
+ if packet.Items() == 0 && skeleton {
+ skeleton = false
+ getHeaders(from)
+ continue
+ }
// If no more headers are inbound, notify the content fetchers and return
if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
-
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
- select {
- case ch <- false:
- case <-d.cancelCh:
- }
- }
- // If no headers were retrieved at all, the peer violated it's TD promise that it had a
- // better chain compared to ours. The only exception is if it's promised blocks were
- // already imported by other means (e.g. fetcher):
- //
- // R <remote peer>, L <local node>: Both at block 10
- // R: Mine block 11, and propagate it to L
- // L: Queue block 11 for import
- // L: Notice that R's head and TD increased compared to ours, start sync
- // L: Import of block 11 finishes
- // L: Sync begins, and finds common ancestor at 11
- // L: Request new headers up from 11 (R's TD was higher, it must have something)
- // R: Nothing to give
- if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
- return errStallingPeer
- }
- // If fast or light syncing, ensure promised headers are indeed delivered. This is
- // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
- // of delivering the post-pivot blocks that would flag the invalid content.
- //
- // This check cannot be executed "as is" for full imports, since blocks may still be
- // queued for processing when the header download completes. However, as long as the
- // peer gave us something useful, we're already happy/progressed (above check).
- if d.mode == FastSync || d.mode == LightSync {
- if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
- return errStallingPeer
- }
- }
- rollback = nil
+ d.headerProcCh <- nil
return nil
}
- gotHeaders = true
headers := packet.(*headerPack).headers
- // Otherwise insert all the new headers, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
-
- if d.mode == FastSync || d.mode == LightSync {
- // Collect the yet unknown headers to mark them as uncertain
- unknown := make([]*types.Header, 0, len(headers))
- for _, header := range headers {
- if !d.hasHeader(header.Hash()) {
- unknown = append(unknown, header)
- }
- }
- // If we're importing pure headers, verify based on their recentness
- frequency := fsHeaderCheckFrequency
- if headers[len(headers)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
- frequency = 1
- }
- if n, err := d.insertHeaders(headers, frequency); err != nil {
- // If some headers were inserted, add them too to the rollback list
- if n > 0 {
- rollback = append(rollback, headers[:n]...)
- }
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
+ // If we received a skeleton batch, resolve internals concurrently
+ if skeleton {
+ filled, err := d.fillHeaderSkeleton(from, headers)
+ if err != nil {
+ glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
return errInvalidChain
}
- // All verifications passed, store newly found uncertain headers
- rollback = append(rollback, unknown...)
- if len(rollback) > fsHeaderSafetyNet {
- rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
- }
- }
- if d.mode == FullSync || d.mode == FastSync {
- inserts := d.queue.Schedule(headers, from)
- if len(inserts) != len(headers) {
- glog.V(logger.Debug).Infof("%v: stale headers", p)
- return errBadPeer
- }
- }
- // Notify the content fetchers of new headers, but stop if queue is full
- cont := d.queue.PendingBlocks() < maxQueuedHeaders && d.queue.PendingReceipts() < maxQueuedHeaders
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
- if cont {
- // We still have headers to fetch, send continuation wake signal (potential)
- select {
- case ch <- true:
- default:
- }
- } else {
- // Header limit reached, send a termination wake signal (enforced)
- select {
- case ch <- false:
- case <-d.cancelCh:
- }
- }
+ headers = filled
}
- if !cont {
- return nil
+ // Insert all the new headers and fetch the next batch
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
+ select {
+ case d.headerProcCh <- headers:
+ case <-d.cancelCh:
+ return errCancelHeaderFetch
}
- // Queue not yet full, fetch the next batch
from += uint64(len(headers))
getHeaders(from)
@@ -1330,7 +1256,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
}
}
- return nil
+ select {
+ case d.headerProcCh <- nil:
+ case <-d.cancelCh:
+ }
+ return errBadPeer
case <-d.hashCh:
case <-d.blockCh:
@@ -1340,6 +1270,34 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
+// fillHeaderSkeleton concurrently retrieves headers from all our available peers
+// and maps them to the provided skeleton header chain.
+func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, error) {
+ glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
+ d.queue.ScheduleSkeleton(from, skeleton)
+
+ var (
+ deliver = func(packet dataPack) (int, error) {
+ pack := packet.(*headerPack)
+ return d.queue.DeliverHeaders(pack.peerId, pack.headers)
+ }
+ expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
+ throttle = func() bool { return false }
+ reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ return d.queue.ReserveHeaders(p, count), false, nil
+ }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
+ capacity = func(p *peer) int { return p.HeaderCapacity() }
+ setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
+ )
+ err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
+ d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
+ nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
+
+ glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
+ return d.queue.RetrieveHeaders(), err
+}
+
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
@@ -1398,6 +1356,11 @@ func (d *Downloader) fetchNodeData() error {
deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
+ // If the peer gave us nothing, stalling fast sync, drop
+ if delivered == 0 {
+ glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId())
+ d.dropPeer(packet.PeerId())
+ }
if err != nil {
// If the node data processing failed, the root hash is very wrong, abort
glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
@@ -1554,7 +1517,9 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
continue
}
if glog.V(logger.Detail) {
- if len(request.Headers) > 0 {
+ if request.From > 0 {
+ glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
+ } else if len(request.Headers) > 0 {
glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
} else {
glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
@@ -1588,9 +1553,162 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
}
-// process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents.
-func (d *Downloader) process() error {
+// processHeaders takes batches of retrieved headers from an input channel and
+// keeps processing and scheduling them into the header chain and downloader's
+// queue until the stream ends or a failure occurs.
+func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
+ // Calculate the pivoting point for switching from fast to slow sync
+ pivot := d.queue.FastSyncPivot()
+
+ // Keep a count of uncertain headers to roll back
+ rollback := []*types.Header{}
+ defer func() {
+ if len(rollback) > 0 {
+ // Flatten the headers and roll them back
+ hashes := make([]common.Hash, len(rollback))
+ for i, header := range rollback {
+ hashes[i] = header.Hash()
+ }
+ lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
+ d.rollback(hashes)
+ glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
+ len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number())
+
+ // If we're already past the pivot point, this could be an attack, disable fast sync
+ if rollback[len(rollback)-1].Number.Uint64() > pivot {
+ d.noFast = true
+ }
+ }
+ }()
+
+ // Wait for batches of headers to process
+ gotHeaders := false
+
+ for {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+
+ case headers := <-d.headerProcCh:
+ // Terminate header processing if we synced up
+ if len(headers) == 0 {
+ // Notify everyone that headers are fully processed
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ select {
+ case ch <- false:
+ case <-d.cancelCh:
+ }
+ }
+ // If no headers were retrieved at all, the peer violated it's TD promise that it had a
+ // better chain compared to ours. The only exception is if it's promised blocks were
+ // already imported by other means (e.g. fecher):
+ //
+ // R <remote peer>, L <local node>: Both at block 10
+ // R: Mine block 11, and propagate it to L
+ // L: Queue block 11 for import
+ // L: Notice that R's head and TD increased compared to ours, start sync
+ // L: Import of block 11 finishes
+ // L: Sync begins, and finds common ancestor at 11
+ // L: Request new headers up from 11 (R's TD was higher, it must have something)
+ // R: Nothing to give
+ if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
+ return errStallingPeer
+ }
+ // If fast or light syncing, ensure promised headers are indeed delivered. This is
+ // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
+ // of delivering the post-pivot blocks that would flag the invalid content.
+ //
+ // This check cannot be executed "as is" for full imports, since blocks may still be
+ // queued for processing when the header download completes. However, as long as the
+ // peer gave us something useful, we're already happy/progressed (above check).
+ if d.mode == FastSync || d.mode == LightSync {
+ if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
+ return errStallingPeer
+ }
+ }
+ // Disable any rollback and return
+ rollback = nil
+ return nil
+ }
+ // Otherwise split the chunk of headers into batches and process them
+ gotHeaders = true
+
+ for len(headers) > 0 {
+ // Terminate if something failed in between processing chunks
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+ default:
+ }
+ // Select the next chunk of headers to import
+ limit := maxHeadersProcess
+ if limit > len(headers) {
+ limit = len(headers)
+ }
+ chunk := headers[:limit]
+
+ // In case of header only syncing, validate the chunk immediately
+ if d.mode == FastSync || d.mode == LightSync {
+ // Collect the yet unknown headers to mark them as uncertain
+ unknown := make([]*types.Header, 0, len(headers))
+ for _, header := range chunk {
+ if !d.hasHeader(header.Hash()) {
+ unknown = append(unknown, header)
+ }
+ }
+ // If we're importing pure headers, verify based on their recentness
+ frequency := fsHeaderCheckFrequency
+ if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
+ frequency = 1
+ }
+ if n, err := d.insertHeaders(chunk, frequency); err != nil {
+ // If some headers were inserted, add them too to the rollback list
+ if n > 0 {
+ rollback = append(rollback, chunk[:n]...)
+ }
+ glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
+ return errInvalidChain
+ }
+ // All verifications passed, store newly found uncertain headers
+ rollback = append(rollback, unknown...)
+ if len(rollback) > fsHeaderSafetyNet {
+ rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
+ }
+ }
+ // Unless we're doing light chains, schedule the headers for associated content retrieval
+ if d.mode == FullSync || d.mode == FastSync {
+ // If we've reached the allowed number of pending headers, stall a bit
+ for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
+ select {
+ case <-d.cancelCh:
+ return errCancelHeaderProcessing
+ case <-time.After(time.Second):
+ }
+ }
+ // Otherwise insert the headers for content retrieval
+ inserts := d.queue.Schedule(chunk, origin)
+ if len(inserts) != len(chunk) {
+ glog.V(logger.Debug).Infof("stale headers")
+ return errBadPeer
+ }
+ }
+ headers = headers[limit:]
+ origin += uint64(limit)
+ }
+ // Signal the content downloaders of the availablility of new tasks
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
+ select {
+ case ch <- true:
+ default:
+ }
+ }
+ }
+ }
+}
+
+// processContent takes fetch results from the queue and tries to import them
+// into the chain. The type of import operation will depend on the result contents.
+func (d *Downloader) processContent() error {
pivot := d.queue.FastSyncPivot()
for {
results := d.queue.WaitResults()
@@ -1608,7 +1726,7 @@ func (d *Downloader) process() error {
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
- return errCancelProcessing
+ return errCancelContentProcessing
}
// Retrieve the a batch of results to import
var (