diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 517 |
1 files changed, 335 insertions, 182 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0f76357cb..74bff2b66 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -42,6 +42,7 @@ var ( MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request @@ -52,6 +53,7 @@ var ( blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired + headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now) headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired @@ -60,9 +62,10 @@ var ( stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired - maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) - maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxResultsProcess = 256 // Number of download results to import at once into the chain + maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) + maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxHeadersProcess = 2048 // Number of header download results to import at once into the chain + maxResultsProcess = 2048 // Number of content download results to import at once into the chain fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected @@ -72,29 +75,30 @@ var ( ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errNoPeers = errors.New("no peers to keep download active") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errEmptyHeaderSet = errors.New("empty header set by peer") - errPeersUnavailable = errors.New("no peers available or all tried for download") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidAncestor = errors.New("retrieved ancestor is invalid") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errInvalidBlock = errors.New("retrieved block is invalid") - errInvalidBody = errors.New("retrieved block body is invalid") - errInvalidReceipt = errors.New("retrieved receipt is invalid") - errCancelHashFetch = errors.New("hash download canceled (requested)") - errCancelBlockFetch = errors.New("block download canceled (requested)") - errCancelHeaderFetch = errors.New("block header download canceled (requested)") - errCancelBodyFetch = errors.New("block body download canceled (requested)") - errCancelReceiptFetch = errors.New("receipt download canceled (requested)") - errCancelStateFetch = errors.New("state data download canceled (requested)") - errCancelProcessing = errors.New("processing canceled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errNoPeers = errors.New("no peers to keep download active") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errEmptyHeaderSet = errors.New("empty header set by peer") + errPeersUnavailable = errors.New("no peers available or all tried for download") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidAncestor = errors.New("retrieved ancestor is invalid") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errInvalidBlock = errors.New("retrieved block is invalid") + errInvalidBody = errors.New("retrieved block body is invalid") + errInvalidReceipt = errors.New("retrieved receipt is invalid") + errCancelHashFetch = errors.New("hash download canceled (requested)") + errCancelBlockFetch = errors.New("block download canceled (requested)") + errCancelHeaderFetch = errors.New("block header download canceled (requested)") + errCancelBodyFetch = errors.New("block body download canceled (requested)") + errCancelReceiptFetch = errors.New("receipt download canceled (requested)") + errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelHeaderProcessing = errors.New("header processing canceled (requested)") + errCancelContentProcessing = errors.New("content processing canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) type Downloader struct { @@ -137,16 +141,17 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan dataPack // [eth/61] Channel receiving inbound hashes - blockCh chan dataPack // [eth/61] Channel receiving inbound blocks - headerCh chan dataPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - stateCh chan dataPack // [eth/63] Channel receiving inbound node state data - blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks + hashCh chan dataPack // [eth/61] Channel receiving inbound hashes + blockCh chan dataPack // [eth/61] Channel receiving inbound blocks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + stateCh chan dataPack // [eth/63] Channel receiving inbound node state data + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks + headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -194,6 +199,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), } } @@ -308,6 +314,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode default: } } + for empty := false; !empty; { + select { + case <-d.headerProcCh: + default: + empty = true + } + } // Reset any ephemeral sync statistics d.syncStatsLock.Lock() d.syncStatsStateTotal = 0 @@ -373,7 +386,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - return d.spawnSync( + return d.spawnSync(origin+1, func() error { return d.fetchHashes61(p, td, origin+1) }, func() error { return d.fetchBlocks61(origin + 1) }, ) @@ -423,11 +436,12 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - return d.spawnSync( - func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + return d.spawnSync(origin+1, + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync ) default: @@ -439,11 +453,11 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(fetchers ...func() error) error { +func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { var wg sync.WaitGroup errc := make(chan error, len(fetchers)+1) wg.Add(len(fetchers) + 1) - go func() { defer wg.Done(); errc <- d.process() }() + go func() { defer wg.Done(); errc <- d.processContent() }() for _, fn := range fetchers { fn := fn go func() { defer wg.Done(); errc <- fn() }() @@ -702,9 +716,9 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { getHashes := func(from uint64) { glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) - go p.getAbsHashes(from, MaxHashFetch) request = time.Now() timeout.Reset(hashTTL) + go p.getAbsHashes(from, MaxHashFetch) } // Start pulling hashes, until all are exhausted getHashes(from) @@ -1050,7 +1064,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -1149,55 +1163,39 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { return start, nil } -// fetchHeaders keeps retrieving headers from the requested number, until no more -// are returned, potentially throttling on the way. -// -// The queue parameter can be used to switch between queuing headers for block -// body download too, or directly import as pure header chains. -func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { - glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) +// fetchHeaders keeps retrieving headers concurrently from the number +// requested, until no more are returned, potentially throttling on the way. To +// facilitate concurrency but still protect against malicious nodes sending bad +// headers, we construct a header chain skeleton using the "origin" peer we are +// syncing with, and fill in the missing headers using anyone else. Headers from +// other peers are only accepted if they map cleanly to the skeleton. If no one +// can fill in the skeleton - not even the origin peer - it's assumed invalid and +// the origin is dropped. +func (d *Downloader) fetchHeaders(p *peer, from uint64) error { + glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from) defer glog.V(logger.Debug).Infof("%v: header download terminated", p) - // Calculate the pivoting point for switching from fast to slow sync - pivot := d.queue.FastSyncPivot() - - // Keep a count of uncertain headers to roll back - rollback := []*types.Header{} - defer func() { - if len(rollback) > 0 { - // Flatten the headers and roll them back - hashes := make([]common.Hash, len(rollback)) - for i, header := range rollback { - hashes[i] = header.Hash() - } - lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() - d.rollback(hashes) - glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", - len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number()) - - // If we're already past the pivot point, this could be an attack, disable fast sync - if rollback[len(rollback)-1].Number.Uint64() > pivot { - d.noFast = true - } - } - }() - - // Create a timeout timer, and the associated hash fetcher - request := time.Now() // time of the last fetch request + // Create a timeout timer, and the associated header fetcher + skeleton := true // Skeleton assembly phase or finishing up + request := time.Now() // time of the last skeleton fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer <-timeout.C // timeout channel should be initially empty defer timeout.Stop() getHeaders := func(from uint64) { - glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from) - - go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) request = time.Now() timeout.Reset(headerTTL) + + if skeleton { + glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from) + go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + } else { + glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from) + go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + } } - // Start pulling headers, until all are exhausted + // Start pulling the header chain skeleton until all is done getHeaders(from) - gotHeaders := false for { select { @@ -1205,116 +1203,48 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { return errCancelHeaderFetch case packet := <-d.headerCh: - // Make sure the active peer is giving us the headers + // Make sure the active peer is giving us the skeleton headers if packet.PeerId() != p.id { - glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId()) + glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() + // If the skeleton's finished, pull any remaining head headers directly from the origin + if packet.Items() == 0 && skeleton { + skeleton = false + getHeaders(from) + continue + } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { - select { - case ch <- false: - case <-d.cancelCh: - } - } - // If no headers were retrieved at all, the peer violated it's TD promise that it had a - // better chain compared to ours. The only exception is if it's promised blocks were - // already imported by other means (e.g. fetcher): - // - // R <remote peer>, L <local node>: Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { - return errStallingPeer - } - // If fast or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if d.mode == FastSync || d.mode == LightSync { - if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { - return errStallingPeer - } - } - rollback = nil + d.headerProcCh <- nil return nil } - gotHeaders = true headers := packet.(*headerPack).headers - // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) - - if d.mode == FastSync || d.mode == LightSync { - // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) - for _, header := range headers { - if !d.hasHeader(header.Hash()) { - unknown = append(unknown, header) - } - } - // If we're importing pure headers, verify based on their recentness - frequency := fsHeaderCheckFrequency - if headers[len(headers)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { - frequency = 1 - } - if n, err := d.insertHeaders(headers, frequency); err != nil { - // If some headers were inserted, add them too to the rollback list - if n > 0 { - rollback = append(rollback, headers[:n]...) - } - glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err) + // If we received a skeleton batch, resolve internals concurrently + if skeleton { + filled, proced, err := d.fillHeaderSkeleton(from, headers) + if err != nil { + glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err) return errInvalidChain } - // All verifications passed, store newly found uncertain headers - rollback = append(rollback, unknown...) - if len(rollback) > fsHeaderSafetyNet { - rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) - } - } - if d.mode == FullSync || d.mode == FastSync { - inserts := d.queue.Schedule(headers, from) - if len(inserts) != len(headers) { - glog.V(logger.Debug).Infof("%v: stale headers", p) - return errBadPeer - } + headers = filled[proced:] + from += uint64(proced) } - // Notify the content fetchers of new headers, but stop if queue is full - cont := d.queue.PendingBlocks() < maxQueuedHeaders && d.queue.PendingReceipts() < maxQueuedHeaders - for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { - if cont { - // We still have headers to fetch, send continuation wake signal (potential) - select { - case ch <- true: - default: - } - } else { - // Header limit reached, send a termination wake signal (enforced) - select { - case ch <- false: - case <-d.cancelCh: - } + // Insert all the new headers and fetch the next batch + if len(headers) > 0 { + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from) + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCancelHeaderFetch } + from += uint64(len(headers)) } - if !cont { - return nil - } - // Queue not yet full, fetch the next batch - from += uint64(len(headers)) getHeaders(from) case <-timeout.C: @@ -1330,7 +1260,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: } } - return nil + select { + case d.headerProcCh <- nil: + case <-d.cancelCh: + } + return errBadPeer case <-d.hashCh: case <-d.blockCh: @@ -1340,6 +1274,43 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } +// fillHeaderSkeleton concurrently retrieves headers from all our available peers +// and maps them to the provided skeleton header chain. +// +// Any partial results from the beginning of the skeleton is (if possible) forwarded +// immediately to the header processor to keep the rest of the pipeline full even +// in the case of header stalls. +// +// The method returs the entire filled skeleton and also the number of headers +// already forwarded for processing. +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { + glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from) + d.queue.ScheduleSkeleton(from, skeleton) + + var ( + deliver = func(packet dataPack) (int, error) { + pack := packet.(*headerPack) + return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh) + } + expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) } + throttle = func() bool { return false } + reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + return d.queue.ReserveHeaders(p, count), false, nil + } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } + capacity = func(p *peer) int { return p.HeaderCapacity() } + setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } + ) + err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, + d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, + nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header") + + glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err) + + filled, proced := d.queue.RetrieveHeaders() + return filled, proced, err +} + // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. @@ -1398,6 +1369,11 @@ func (d *Downloader) fetchNodeData() error { deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { + // If the peer gave us nothing, stalling fast sync, drop + if delivered == 0 { + glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId()) + d.dropPeer(packet.PeerId()) + } if err != nil { // If the node data processing failed, the root hash is very wrong, abort glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err) @@ -1438,6 +1414,28 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. +// +// As the scheduling/timeout logic mostly is the same for all downloaded data +// types, this method is used by each for data gathering and is instrumented with +// various callbacks to handle the slight differences between processing them. +// +// The instrumentation parameters: +// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer) +// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers) +// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`) +// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed) +// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping) +// - pending: task callback for the number of requests still needing download (detect completion/non-completability) +// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish) +// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use) +// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions) +// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic) +// - fetch: network callback to actually send a particular download request to a physical remote peer +// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer) +// - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping) +// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks +// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) +// - kind: textual label of the type being downloaded to display in log mesages func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, @@ -1554,7 +1552,9 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv continue } if glog.V(logger.Detail) { - if len(request.Headers) > 0 { + if request.From > 0 { + glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From) + } else if len(request.Headers) > 0 { glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) } else { glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) @@ -1588,9 +1588,162 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } } -// process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents. -func (d *Downloader) process() error { +// processHeaders takes batches of retrieved headers from an input channel and +// keeps processing and scheduling them into the header chain and downloader's +// queue until the stream ends or a failure occurs. +func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { + // Calculate the pivoting point for switching from fast to slow sync + pivot := d.queue.FastSyncPivot() + + // Keep a count of uncertain headers to roll back + rollback := []*types.Header{} + defer func() { + if len(rollback) > 0 { + // Flatten the headers and roll them back + hashes := make([]common.Hash, len(rollback)) + for i, header := range rollback { + hashes[i] = header.Hash() + } + lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() + d.rollback(hashes) + glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", + len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) + + // If we're already past the pivot point, this could be an attack, disable fast sync + if rollback[len(rollback)-1].Number.Uint64() > pivot { + d.noFast = true + } + } + }() + + // Wait for batches of headers to process + gotHeaders := false + + for { + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + + case headers := <-d.headerProcCh: + // Terminate header processing if we synced up + if len(headers) == 0 { + // Notify everyone that headers are fully processed + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } + } + // If no headers were retrieved at all, the peer violated it's TD promise that it had a + // better chain compared to ours. The only exception is if it's promised blocks were + // already imported by other means (e.g. fecher): + // + // R <remote peer>, L <local node>: Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { + return errStallingPeer + } + // If fast or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if d.mode == FastSync || d.mode == LightSync { + if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { + return errStallingPeer + } + } + // Disable any rollback and return + rollback = nil + return nil + } + // Otherwise split the chunk of headers into batches and process them + gotHeaders = true + + for len(headers) > 0 { + // Terminate if something failed in between processing chunks + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + default: + } + // Select the next chunk of headers to import + limit := maxHeadersProcess + if limit > len(headers) { + limit = len(headers) + } + chunk := headers[:limit] + + // In case of header only syncing, validate the chunk immediately + if d.mode == FastSync || d.mode == LightSync { + // Collect the yet unknown headers to mark them as uncertain + unknown := make([]*types.Header, 0, len(headers)) + for _, header := range chunk { + if !d.hasHeader(header.Hash()) { + unknown = append(unknown, header) + } + } + // If we're importing pure headers, verify based on their recentness + frequency := fsHeaderCheckFrequency + if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { + frequency = 1 + } + if n, err := d.insertHeaders(chunk, frequency); err != nil { + // If some headers were inserted, add them too to the rollback list + if n > 0 { + rollback = append(rollback, chunk[:n]...) + } + glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err) + return errInvalidChain + } + // All verifications passed, store newly found uncertain headers + rollback = append(rollback, unknown...) + if len(rollback) > fsHeaderSafetyNet { + rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) + } + } + // Unless we're doing light chains, schedule the headers for associated content retrieval + if d.mode == FullSync || d.mode == FastSync { + // If we've reached the allowed number of pending headers, stall a bit + for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + select { + case <-d.cancelCh: + return errCancelHeaderProcessing + case <-time.After(time.Second): + } + } + // Otherwise insert the headers for content retrieval + inserts := d.queue.Schedule(chunk, origin) + if len(inserts) != len(chunk) { + glog.V(logger.Debug).Infof("stale headers") + return errBadPeer + } + } + headers = headers[limit:] + origin += uint64(limit) + } + // Signal the content downloaders of the availablility of new tasks + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { + select { + case ch <- true: + default: + } + } + } + } +} + +// processContent takes fetch results from the queue and tries to import them +// into the chain. The type of import operation will depend on the result contents. +func (d *Downloader) processContent() error { pivot := d.queue.FastSyncPivot() for { results := d.queue.WaitResults() @@ -1608,7 +1761,7 @@ func (d *Downloader) process() error { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return errCancelProcessing + return errCancelContentProcessing } // Retrieve the a batch of results to import var ( |