diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 488 |
1 files changed, 208 insertions, 280 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 153427ee4..c272d05af 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -45,16 +45,17 @@ var ( MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request - hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired - headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out - bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth - bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired - stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired + hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out + blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request + blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired + + headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out + bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request + bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired + receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request + receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired + stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request + stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) @@ -74,7 +75,6 @@ var ( errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") errTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHeaderSet = errors.New("empty header set by peer") @@ -90,6 +90,7 @@ var ( errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelProcessing = errors.New("processing canceled (requested)") errNoSyncActive = errors.New("no sync active") ) @@ -129,7 +130,6 @@ type Downloader struct { // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 - processing int32 notified int32 // Channels @@ -215,7 +215,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) { // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { - return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 + return atomic.LoadInt32(&d.synchronising) > 0 } // RegisterPeer injects a new download peer into the set of block source to be @@ -263,9 +263,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) - case errPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } @@ -290,10 +287,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - // Abort if the queue still contains some leftover data - if d.queue.GetHeadResult() != nil { - return errPendingQueue - } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() @@ -335,7 +328,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer func() { // reset on error if err != nil { - d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -365,23 +357,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent hash and block retrieval algorithm + d.queue.Prepare(origin+1, d.mode, 0) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, d.mode, 0) - - errc := make(chan error, 2) - go func() { errc <- d.fetchHashes61(p, td, origin+1) }() - go func() { errc <- d.fetchBlocks61(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err - } - return <-errc + return d.spawnSync( + func() error { return d.fetchHashes61(p, td, origin+1) }, + func() error { return d.fetchBlocks61(origin + 1) }, + ) case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block @@ -405,7 +389,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e switch d.mode { case LightSync: pivot = latest - case FastSync: // Calculate the new fast/slow sync pivot point pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) @@ -426,34 +409,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) } d.queue.Prepare(origin+1, d.mode, pivot) - if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 4) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync - go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync - - // If any fetcher fails, cancel the others - var fail error - for i := 0; i < cap(errc); i++ { - if err := <-errc; err != nil { - if fail == nil { - fail = err - d.cancel() - } - } - } - return fail + return d.spawnSync( + func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + ) default: // Something very wrong, stop right here glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) return errBadPeer } - return nil +} + +// spawnSync runs d.process and all given fetcher functions to completion in +// separate goroutines, returning the first error that appears. +func (d *Downloader) spawnSync(fetchers ...func() error) error { + var wg sync.WaitGroup + errc := make(chan error, len(fetchers)+1) + wg.Add(len(fetchers) + 1) + go func() { defer wg.Done(); errc <- d.process() }() + for _, fn := range fetchers { + fn := fn + go func() { defer wg.Done(); errc <- fn() }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fetchers)+1; i++ { + if i == len(fetchers) { + // Close the queue when all fetchers have exited. + // This will cause the block processor to end when + // it has processed the queue. + d.queue.Close() + } + if err = <-errc; err != nil { + break + } + } + d.queue.Close() + d.cancel() + wg.Wait() + return err } // cancel cancels all of the operations and resets the queue. It returns true @@ -470,12 +470,10 @@ func (d *Downloader) cancel() { } } d.cancelLock.Unlock() - - // Reset the queue - d.queue.Reset() } // Terminate interrupts the downloader, canceling all pending operations. +// The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { atomic.StoreInt32(&d.interrupt, 1) d.cancel() @@ -489,21 +487,12 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { // Request the advertised remote head block and wait for the response go p.getBlocks([]common.Hash{p.head}) - timeout := time.After(blockSoftTTL) + timeout := time.After(hashTTL) for { select { case <-d.cancelCh: return 0, errCancelBlockFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-d.hashCh: - // Out of bounds hashes received, ignore them - case packet := <-d.blockCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { @@ -521,6 +510,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { case <-timeout: glog.V(logger.Debug).Infof("%v: head block timeout", p) return 0, errTimeout + + case <-d.hashCh: + // Out of bounds hashes received, ignore them + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -571,18 +570,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { } } + case <-timeout: + glog.V(logger.Debug).Infof("%v: head hash timeout", p) + return 0, errTimeout + case <-d.blockCh: // Out of bounds blocks received, ignore them case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head hash timeout", p) - return 0, errTimeout + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } // If the head fetch already found an ancestor, return @@ -631,18 +631,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { } start = check + case <-timeout: + glog.V(logger.Debug).Infof("%v: search hash timeout", p) + return 0, errTimeout + case <-d.blockCh: // Out of bounds blocks received, ignore them case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: search hash timeout", p) - return 0, errTimeout + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -676,12 +677,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHashFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - case packet := <-d.hashCh: // Make sure the active peer is giving us the hashes if packet.PeerId() != p.id { @@ -750,6 +745,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: hash request timed out", p) hashTimeoutMeter.Mark(1) return errTimeout + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -774,59 +776,31 @@ func (d *Downloader) fetchBlocks61(from uint64) error { case <-d.cancelCh: return errCancelBlockFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - case packet := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of blocks, and demote in case of errors blocks := packet.(*blockPack).blocks - err := d.queue.DeliverBlocks(peer.id, blocks) - switch err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(blocks) == 0 { - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) - go d.process() - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of blocks and check chain validity + accepted, err := d.queue.DeliverBlocks(peer.id, blocks) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + peer.SetBlocksIdle(accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && len(blocks) == 0: + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) - go d.process() + glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err) } } // Blocks arrived, try to update the progress @@ -859,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { + for pid, fails := range d.queue.ExpireBlocks(blockTTL) { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + peer.SetBlocksIdle(0) + } else { + glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -909,6 +888,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error { if !throttled && !d.queue.InFlightBlocks() && len(idles) == total { return errPeersUnavailable } + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -941,18 +927,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { } return headers[0].Number.Uint64(), nil + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1008,18 +995,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } } + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } // If the head fetch already found an ancestor, return @@ -1068,18 +1056,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } start = check + case <-timeout: + glog.V(logger.Debug).Infof("%v: search header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: search header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1141,12 +1130,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHeaderFetch - case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - case packet := <-d.headerCh: // Make sure the active peer is giving us the headers if packet.PeerId() != p.id { @@ -1268,6 +1251,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } return nil + + case <-d.hashCh: + case <-d.blockCh: + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1279,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } + expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - setIdle = func(p *peer) { p.SetBodiesIdle() } + setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -1303,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } - expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) } + expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } capacity = func(p *peer) int { return p.ReceiptCapacity() } - setIdle = func(p *peer) { p.SetReceiptsIdle() } + setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -1327,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error { glog.V(logger.Debug).Infof("Downloading node state data") var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { if err != nil { @@ -1336,10 +1324,8 @@ func (d *Downloader) fetchNodeData() error { d.cancel() return } - // Processing succeeded, notify state fetcher and processor of continuation - if d.queue.PendingNodeData() == 0 { - go d.process() - } else { + // Processing succeeded, notify state fetcher of continuation + if d.queue.PendingNodeData() > 0 { select { case d.stateWakeCh <- true: default: @@ -1348,19 +1334,18 @@ func (d *Downloader) fetchNodeData() error { // Log a message to the user and return d.syncStatsLock.Lock() defer d.syncStatsLock.Unlock() - d.syncStatsStateDone += uint64(delivered) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) }) } - expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveNodeData(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } capacity = func(p *peer) int { return p.NodeDataCapacity() } - setIdle = func(p *peer) { p.SetNodeDataIdle() } + setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, @@ -1373,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, - expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1391,57 +1376,29 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv case <-d.cancelCh: return errCancel - case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of data, and demote in case of errors - switch err := deliver(packet); err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Items() == 0 { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) - go d.process() - - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of data and check chain validity + accepted, err := deliver(packet) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind)) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + setIdle(peer, accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && packet.Items() == 0: + glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) - go d.process() + glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err) } } // Blocks assembled, try to update the progress @@ -1474,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv return errNoPeers } // Check for fetch request timeouts and demote the responsible peers - for _, pid := range expire() { + for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + setIdle(peer, 0) + } else { + glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind)) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -1508,7 +1469,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if progress { progressed = true - go d.process() } if request == nil { continue @@ -1540,51 +1500,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable } + + case <-d.hashCh: + case <-d.blockCh: + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } // process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents: -// - -// -// The algorithmic flow is as follows: -// - The `processing` flag is swapped to 1 to ensure singleton access -// - The current `cancel` channel is retrieved to detect sync abortions -// - Blocks are iteratively taken from the cache and inserted into the chain -// - When the cache becomes empty, insertion stops -// - The `processing` flag is swapped back to 0 -// - A post-exit check is made whether new blocks became available -// - This step is important: it handles a potential race condition between -// checking for no more work, and releasing the processing "mutex". In -// between these state changes, a block may have arrived, but a processing -// attempt denied, so we need to re-enter to ensure the block isn't left -// to idle in the cache. -func (d *Downloader) process() { - // Make sure only one goroutine is ever allowed to process blocks at once - if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { - return - } - // If the processor just exited, but there are freshly pending items, try to - // reenter. This is needed because the goroutine spinned up for processing - // the fresh results might have been rejected entry to to this present thread - // not yet releasing the `processing` state. - defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { - d.process() - } - }() - // Release the lock upon exit (note, before checking for reentry!) - // the import statistics to zero. - defer atomic.StoreInt32(&d.processing, 0) - - // Repeat the processing as long as there are results to process +// chain. The type of import operation will depend on the result contents. +func (d *Downloader) process() error { + pivot := d.queue.FastSyncPivot() for { - // Fetch the next batch of results - pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race - results := d.queue.TakeResults() + results := d.queue.WaitResults() if len(results) == 0 { - return + return nil // queue empty } if d.chainInsertHook != nil { d.chainInsertHook(results) @@ -1597,7 +1529,7 @@ func (d *Downloader) process() { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return + return errCancelProcessing } // Retrieve the a batch of results to import var ( @@ -1633,8 +1565,7 @@ func (d *Downloader) process() { } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) - d.cancel() - return + return err } // Shift the results to the next batch results = results[items:] @@ -1685,19 +1616,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i dropMeter.Mark(int64(packet.Items())) } }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } // Deliver or abort if the sync is canceled while queuing d.cancelLock.RLock() cancel := d.cancelCh d.cancelLock.RUnlock() - + if cancel == nil { + return errNoSyncActive + } select { case destCh <- packet: return nil - case <-cancel: return errNoSyncActive } |