diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/downloader/downloader.go | 69 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 6 | ||||
-rw-r--r-- | eth/downloader/statesync.go | 12 |
3 files changed, 45 insertions, 42 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 321c4473b..874e6e07f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -83,16 +83,11 @@ var ( errPeersUnavailable = errors.New("no peers available or all tried for download") errInvalidAncestor = errors.New("retrieved ancestor is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid") - errInvalidBlock = errors.New("retrieved block is invalid") errInvalidBody = errors.New("retrieved block body is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid") - errCancelBlockFetch = errors.New("block download canceled (requested)") - errCancelHeaderFetch = errors.New("block header download canceled (requested)") - errCancelBodyFetch = errors.New("block body download canceled (requested)") - errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") - errCancelHeaderProcessing = errors.New("header processing canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)") + errCanceled = errors.New("syncing canceled (requested)") errNoSyncActive = errors.New("no sync active") errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)") ) @@ -319,14 +314,6 @@ func (d *Downloader) UnregisterPeer(id string) error { } d.queue.Revoke(id) - // If this peer was the master peer, abort sync immediately - d.cancelLock.RLock() - master := id == d.cancelPeer - d.cancelLock.RUnlock() - - if master { - d.cancel() - } return nil } @@ -336,7 +323,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode err := d.synchronise(id, head, td, mode) switch err { case nil: - case errBusy: + case errBusy, errCanceled: case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer, errEmptyHeaderSet, errPeersUnavailable, errTooOld, @@ -555,7 +542,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error { // it has processed the queue. d.queue.Close() } - if err = <-errc; err != nil { + if err = <-errc; err != nil && err != errCanceled { break } } @@ -621,7 +608,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { for { select { case <-d.cancelCh: - return nil, errCancelBlockFetch + return nil, errCanceled case packet := <-d.headerCh: // Discard anything not from the origin peer @@ -767,7 +754,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) for finished := false; !finished; { select { case <-d.cancelCh: - return 0, errCancelHeaderFetch + return 0, errCanceled case packet := <-d.headerCh: // Discard anything not from the origin peer @@ -853,7 +840,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) for arrived := false; !arrived; { select { case <-d.cancelCh: - return 0, errCancelHeaderFetch + return 0, errCanceled case packer := <-d.headerCh: // Discard anything not from the origin peer @@ -954,7 +941,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) for { select { case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled case packet := <-d.headerCh: // Make sure the active peer is giving us the skeleton headers @@ -981,7 +968,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) getHeaders(from) continue case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } } // Pivot done (or not in fast sync) and no more headers, terminate the process @@ -990,7 +977,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) case d.headerProcCh <- nil: return nil case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } } headers := packet.(*headerPack).headers @@ -1041,7 +1028,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) select { case d.headerProcCh <- headers: case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } from += uint64(len(headers)) getHeaders(from) @@ -1053,7 +1040,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) getHeaders(from) continue case <-d.cancelCh: - return errCancelHeaderFetch + return errCanceled } } @@ -1112,7 +1099,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } ) - err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, + err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") @@ -1138,7 +1125,7 @@ func (d *Downloader) fetchBodies(from uint64) error { capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } ) - err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") @@ -1162,7 +1149,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } ) - err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") @@ -1195,7 +1182,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - kind: textual label of the type being downloaded to display in log mesages -func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, +func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { @@ -1211,7 +1198,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv for { select { case <-d.cancelCh: - return errCancel + return errCanceled case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver its pack @@ -1282,12 +1269,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv setIdle(peer, 0) } else { peer.log.Debug("Stalling delivery, dropping", "type", kind) + if d.dropPeer == nil { // The dropPeer method is nil when `--copydb` is used for a local copy. // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid) } else { d.dropPeer(pid) + + // If this peer was the master peer, abort sync immediately + d.cancelLock.RLock() + master := pid == d.cancelPeer + d.cancelLock.RUnlock() + + if master { + d.cancel() + return errTimeout + } } } } @@ -1392,7 +1390,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er for { select { case <-d.cancelCh: - return errCancelHeaderProcessing + return errCanceled case headers := <-d.headerProcCh: // Terminate header processing if we synced up @@ -1445,7 +1443,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // Terminate if something failed in between processing chunks select { case <-d.cancelCh: - return errCancelHeaderProcessing + return errCanceled default: } // Select the next chunk of headers to import @@ -1488,7 +1486,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { select { case <-d.cancelCh: - return errCancelHeaderProcessing + return errCanceled case <-time.After(time.Second): } } @@ -1579,7 +1577,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { stateSync := d.syncState(latest.Root) defer stateSync.Cancel() go func() { - if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled { d.queue.Close() // wake up Results } }() @@ -1607,7 +1605,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { // If sync failed, stop select { case <-d.cancelCh: - return stateSync.Cancel() + stateSync.Cancel() + return errCanceled default: } } @@ -1637,7 +1636,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { stateSync = d.syncState(P.Header.Root) defer stateSync.Cancel() go func() { - if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { + if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled { d.queue.Close() // wake up Results } }() diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5b56ff161..e1f0e03fc 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -1114,14 +1114,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin {errInvalidBody, false}, // A bad peer was detected, but not the sync origin {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 7e156f46e..1ccdf4def 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -302,7 +302,7 @@ func (s *stateSync) loop() (err error) { return errCancelStateFetch case <-s.d.cancelCh: - return errCancelStateFetch + return errCanceled case req := <-s.deliver: // Response, disconnect or timeout triggered, drop the peer if stalling @@ -317,6 +317,16 @@ func (s *stateSync) loop() (err error) { req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id) } else { s.d.dropPeer(req.peer.id) + + // If this peer was the master peer, abort sync immediately + s.d.cancelLock.RLock() + master := req.peer.id == s.d.cancelPeer + s.d.cancelLock.RUnlock() + + if master { + s.d.cancel() + return errTimeout + } } } // Process all the received blobs and check for stale delivery |