diff options
author | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-11-24 20:48:47 +0800 |
---|---|---|
committer | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-11-24 20:48:47 +0800 |
commit | 5490437942967638bcc6198035315f6811febaa8 (patch) | |
tree | ec4fbee454bacbf2b80b5a7ff402fb48dd2c10cf /eth/downloader | |
parent | e5532154a50114d5ffb1ffd850b746cab00cb899 (diff) | |
parent | b0fb48c389460193d9fc0a5118d79ff6dec48ce0 (diff) | |
download | go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.gz go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.bz2 go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.lz go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.xz go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.zst go-tangerine-5490437942967638bcc6198035315f6811febaa8.zip |
Merge branch 'develop' into release/1.3.2v1.3.2
Conflicts:
VERSION
cmd/geth/main.go
Diffstat (limited to 'eth/downloader')
-rw-r--r-- | eth/downloader/downloader.go | 488 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 135 | ||||
-rw-r--r-- | eth/downloader/peer.go | 240 | ||||
-rw-r--r-- | eth/downloader/queue.go | 301 |
4 files changed, 620 insertions, 544 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 153427ee4..c272d05af 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -45,16 +45,17 @@ var ( MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request - hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired - headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out - bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth - bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired - stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired + hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out + blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request + blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired + + headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out + bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request + bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired + receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request + receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired + stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request + stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) @@ -74,7 +75,6 @@ var ( errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") errTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHeaderSet = errors.New("empty header set by peer") @@ -90,6 +90,7 @@ var ( errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelProcessing = errors.New("processing canceled (requested)") errNoSyncActive = errors.New("no sync active") ) @@ -129,7 +130,6 @@ type Downloader struct { // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 - processing int32 notified int32 // Channels @@ -215,7 +215,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) { // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { - return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 + return atomic.LoadInt32(&d.synchronising) > 0 } // RegisterPeer injects a new download peer into the set of block source to be @@ -263,9 +263,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) - case errPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } @@ -290,10 +287,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - // Abort if the queue still contains some leftover data - if d.queue.GetHeadResult() != nil { - return errPendingQueue - } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() @@ -335,7 +328,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer func() { // reset on error if err != nil { - d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -365,23 +357,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent hash and block retrieval algorithm + d.queue.Prepare(origin+1, d.mode, 0) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, d.mode, 0) - - errc := make(chan error, 2) - go func() { errc <- d.fetchHashes61(p, td, origin+1) }() - go func() { errc <- d.fetchBlocks61(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err - } - return <-errc + return d.spawnSync( + func() error { return d.fetchHashes61(p, td, origin+1) }, + func() error { return d.fetchBlocks61(origin + 1) }, + ) case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block @@ -405,7 +389,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e switch d.mode { case LightSync: pivot = latest - case FastSync: // Calculate the new fast/slow sync pivot point pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) @@ -426,34 +409,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) } d.queue.Prepare(origin+1, d.mode, pivot) - if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 4) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync - go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync - - // If any fetcher fails, cancel the others - var fail error - for i := 0; i < cap(errc); i++ { - if err := <-errc; err != nil { - if fail == nil { - fail = err - d.cancel() - } - } - } - return fail + return d.spawnSync( + func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + ) default: // Something very wrong, stop right here glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) return errBadPeer } - return nil +} + +// spawnSync runs d.process and all given fetcher functions to completion in +// separate goroutines, returning the first error that appears. +func (d *Downloader) spawnSync(fetchers ...func() error) error { + var wg sync.WaitGroup + errc := make(chan error, len(fetchers)+1) + wg.Add(len(fetchers) + 1) + go func() { defer wg.Done(); errc <- d.process() }() + for _, fn := range fetchers { + fn := fn + go func() { defer wg.Done(); errc <- fn() }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fetchers)+1; i++ { + if i == len(fetchers) { + // Close the queue when all fetchers have exited. + // This will cause the block processor to end when + // it has processed the queue. + d.queue.Close() + } + if err = <-errc; err != nil { + break + } + } + d.queue.Close() + d.cancel() + wg.Wait() + return err } // cancel cancels all of the operations and resets the queue. It returns true @@ -470,12 +470,10 @@ func (d *Downloader) cancel() { } } d.cancelLock.Unlock() - - // Reset the queue - d.queue.Reset() } // Terminate interrupts the downloader, canceling all pending operations. +// The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { atomic.StoreInt32(&d.interrupt, 1) d.cancel() @@ -489,21 +487,12 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { // Request the advertised remote head block and wait for the response go p.getBlocks([]common.Hash{p.head}) - timeout := time.After(blockSoftTTL) + timeout := time.After(hashTTL) for { select { case <-d.cancelCh: return 0, errCancelBlockFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-d.hashCh: - // Out of bounds hashes received, ignore them - case packet := <-d.blockCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { @@ -521,6 +510,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { case <-timeout: glog.V(logger.Debug).Infof("%v: head block timeout", p) return 0, errTimeout + + case <-d.hashCh: + // Out of bounds hashes received, ignore them + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -571,18 +570,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { } } + case <-timeout: + glog.V(logger.Debug).Infof("%v: head hash timeout", p) + return 0, errTimeout + case <-d.blockCh: // Out of bounds blocks received, ignore them case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head hash timeout", p) - return 0, errTimeout + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } // If the head fetch already found an ancestor, return @@ -631,18 +631,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { } start = check + case <-timeout: + glog.V(logger.Debug).Infof("%v: search hash timeout", p) + return 0, errTimeout + case <-d.blockCh: // Out of bounds blocks received, ignore them case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: search hash timeout", p) - return 0, errTimeout + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -676,12 +677,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHashFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - case packet := <-d.hashCh: // Make sure the active peer is giving us the hashes if packet.PeerId() != p.id { @@ -750,6 +745,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: hash request timed out", p) hashTimeoutMeter.Mark(1) return errTimeout + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -774,59 +776,31 @@ func (d *Downloader) fetchBlocks61(from uint64) error { case <-d.cancelCh: return errCancelBlockFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - case packet := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of blocks, and demote in case of errors blocks := packet.(*blockPack).blocks - err := d.queue.DeliverBlocks(peer.id, blocks) - switch err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(blocks) == 0 { - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) - go d.process() - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of blocks and check chain validity + accepted, err := d.queue.DeliverBlocks(peer.id, blocks) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + peer.SetBlocksIdle(accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && len(blocks) == 0: + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) - go d.process() + glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err) } } // Blocks arrived, try to update the progress @@ -859,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { + for pid, fails := range d.queue.ExpireBlocks(blockTTL) { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + peer.SetBlocksIdle(0) + } else { + glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -909,6 +888,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error { if !throttled && !d.queue.InFlightBlocks() && len(idles) == total { return errPeersUnavailable } + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -941,18 +927,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { } return headers[0].Number.Uint64(), nil + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1008,18 +995,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } } + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } // If the head fetch already found an ancestor, return @@ -1068,18 +1056,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } start = check + case <-timeout: + glog.V(logger.Debug).Infof("%v: search header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: search header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1141,12 +1130,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHeaderFetch - case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - case packet := <-d.headerCh: // Make sure the active peer is giving us the headers if packet.PeerId() != p.id { @@ -1268,6 +1251,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } return nil + + case <-d.hashCh: + case <-d.blockCh: + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1279,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } + expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - setIdle = func(p *peer) { p.SetBodiesIdle() } + setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -1303,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } - expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) } + expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } capacity = func(p *peer) int { return p.ReceiptCapacity() } - setIdle = func(p *peer) { p.SetReceiptsIdle() } + setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -1327,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error { glog.V(logger.Debug).Infof("Downloading node state data") var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { if err != nil { @@ -1336,10 +1324,8 @@ func (d *Downloader) fetchNodeData() error { d.cancel() return } - // Processing succeeded, notify state fetcher and processor of continuation - if d.queue.PendingNodeData() == 0 { - go d.process() - } else { + // Processing succeeded, notify state fetcher of continuation + if d.queue.PendingNodeData() > 0 { select { case d.stateWakeCh <- true: default: @@ -1348,19 +1334,18 @@ func (d *Downloader) fetchNodeData() error { // Log a message to the user and return d.syncStatsLock.Lock() defer d.syncStatsLock.Unlock() - d.syncStatsStateDone += uint64(delivered) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) }) } - expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveNodeData(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } capacity = func(p *peer) int { return p.NodeDataCapacity() } - setIdle = func(p *peer) { p.SetNodeDataIdle() } + setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, @@ -1373,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, - expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1391,57 +1376,29 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv case <-d.cancelCh: return errCancel - case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of data, and demote in case of errors - switch err := deliver(packet); err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Items() == 0 { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) - go d.process() - - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of data and check chain validity + accepted, err := deliver(packet) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind)) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + setIdle(peer, accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && packet.Items() == 0: + glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) - go d.process() + glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err) } } // Blocks assembled, try to update the progress @@ -1474,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv return errNoPeers } // Check for fetch request timeouts and demote the responsible peers - for _, pid := range expire() { + for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + setIdle(peer, 0) + } else { + glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind)) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -1508,7 +1469,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if progress { progressed = true - go d.process() } if request == nil { continue @@ -1540,51 +1500,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable } + + case <-d.hashCh: + case <-d.blockCh: + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } // process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents: -// - -// -// The algorithmic flow is as follows: -// - The `processing` flag is swapped to 1 to ensure singleton access -// - The current `cancel` channel is retrieved to detect sync abortions -// - Blocks are iteratively taken from the cache and inserted into the chain -// - When the cache becomes empty, insertion stops -// - The `processing` flag is swapped back to 0 -// - A post-exit check is made whether new blocks became available -// - This step is important: it handles a potential race condition between -// checking for no more work, and releasing the processing "mutex". In -// between these state changes, a block may have arrived, but a processing -// attempt denied, so we need to re-enter to ensure the block isn't left -// to idle in the cache. -func (d *Downloader) process() { - // Make sure only one goroutine is ever allowed to process blocks at once - if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { - return - } - // If the processor just exited, but there are freshly pending items, try to - // reenter. This is needed because the goroutine spinned up for processing - // the fresh results might have been rejected entry to to this present thread - // not yet releasing the `processing` state. - defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { - d.process() - } - }() - // Release the lock upon exit (note, before checking for reentry!) - // the import statistics to zero. - defer atomic.StoreInt32(&d.processing, 0) - - // Repeat the processing as long as there are results to process +// chain. The type of import operation will depend on the result contents. +func (d *Downloader) process() error { + pivot := d.queue.FastSyncPivot() for { - // Fetch the next batch of results - pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race - results := d.queue.TakeResults() + results := d.queue.WaitResults() if len(results) == 0 { - return + return nil // queue empty } if d.chainInsertHook != nil { d.chainInsertHook(results) @@ -1597,7 +1529,7 @@ func (d *Downloader) process() { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return + return errCancelProcessing } // Retrieve the a batch of results to import var ( @@ -1633,8 +1565,7 @@ func (d *Downloader) process() { } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) - d.cancel() - return + return err } // Shift the results to the next batch results = results[items:] @@ -1685,19 +1616,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i dropMeter.Mark(int64(packet.Items())) } }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } // Deliver or abort if the sync is canceled while queuing d.cancelLock.RLock() cancel := d.cancelCh d.cancelLock.RUnlock() - + if cancel == nil { + return errNoSyncActive + } select { case destCh <- packet: return nil - case <-cancel: return errNoSyncActive } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ef6f74a6b..cfcc8a2ef 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { } } dl.lock.RUnlock() - - err := dl.downloader.synchronise(id, hash, td, mode) - for { - // If the queue is empty and processing stopped, break - if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 { - break - } - // Otherwise sleep a bit and retry - time.Sleep(time.Millisecond) - } - return err + return dl.downloader.synchronise(id, hash, td, mode) } // hasHeader checks if a header is present in the testers canonical chain. @@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) } func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) } func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } func testThrottling(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a long block chain to download and the tester targetBlocks := 8 * blockCacheLimit hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { for start := time.Now(); time.Since(start) < time.Second; { time.Sleep(25 * time.Millisecond) - tester.lock.RLock() - tester.downloader.queue.lock.RLock() + tester.lock.Lock() + tester.downloader.queue.lock.Lock() cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { @@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { } frozen = int(atomic.LoadUint32(&blocked)) retrieved = len(tester.ownBlocks) - tester.downloader.queue.lock.RUnlock() - tester.lock.RUnlock() + tester.downloader.queue.lock.Unlock() + tester.lock.Unlock() if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { break @@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation( func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) } func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a long enough forked chain common, fork := MaxHashFetch, 2*MaxHashFetch hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil) @@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader61(t *testing.T) { + t.Parallel() tester := newTester() // Check that neither hashes nor blocks are accepted @@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) { // Tests that an inactive downloader will not accept incoming block headers and // bodies. func TestInactiveDownloader62(t *testing.T) { + t.Parallel() tester := newTester() // Check that neither block headers nor bodies are accepted @@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) { // Tests that an inactive downloader will not accept incoming block headers, // bodies and receipts. func TestInactiveDownloader63(t *testing.T) { + t.Parallel() tester := newTester() // Check that neither block headers nor bodies are accepted @@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) } func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) } func testCancel(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 if targetBlocks >= MaxHashFetch { @@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) } func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create various peers with various parts of the chain targetPeers := 8 targetBlocks := targetPeers*blockCacheLimit - 15 @@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) } func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a block chain to download targetBlocks := 2*blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6 func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) } func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) } func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + tester := newTester() hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil) @@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin - {errInvalidBody, false}, // A bad peer was detected, but not the sync origin - {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() @@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) } func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) } func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) } func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a forked chain to simulate origin revertal common, fork := MaxHashFetch, 2*MaxHashFetch hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil) @@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) } func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) } func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small block chain targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil) @@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks) } } + +// This test reproduces an issue where unexpected deliveries would +// block indefinitely if they arrived at the right time. +func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) } +func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) } +func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) } +func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) } +func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } +func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } + +func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil) + fakeHeads := []*types.Header{{}, {}, {}, {}} + for i := 0; i < 200; i++ { + tester := newTester() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + // Whenever the downloader requests headers, flood it with + // a lot of unrequested header deliveries. + tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone); i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + tester.downloader.DeliverHeaders(peer, fakeHeads) + deliveriesDone <- struct{}{} + }() + } + // Deliver the actual requested headers. + impl := tester.peerGetAbsHeadersFn("peer", 0) + go impl(from, count, skip, reverse) + // None of the extra deliveries should block. + timeout := time.After(5 * time.Second) + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + case <-timeout: + panic("blocked") + } + } + return nil + } + if err := tester.sync("peer", nil, mode); err != nil { + t.Errorf("sync failed: %v", err) + } + } +} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 1f457cb15..80f08b68f 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -28,7 +28,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "gopkg.in/fatih/set.v0" +) + +const ( + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items + throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. ) // Hash and block fetchers belonging to eth/61 and below @@ -57,17 +61,16 @@ type peer struct { blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation - blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request - receiptCapacity int32 // Number of receipts allowed to fetch per request - stateCapacity int32 // Number of node data pieces allowed to fetch per request + blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second + receiptThroughput float64 // Number of receipts measured to be retrievable per second + stateThroughput float64 // Number of node data pieces measured to be retrievable per second blockStarted time.Time // Time instance when the last block (body)fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started - ignored *set.Set // Set of hashes not to request (didn't have previously) + lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position @@ -81,6 +84,7 @@ type peer struct { getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies + lock sync.RWMutex } // newPeer create a new downloader peer, with specific hash and block retrieval @@ -90,12 +94,9 @@ func newPeer(id string, version int, head common.Hash, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ - id: id, - head: head, - blockCapacity: 1, - receiptCapacity: 1, - stateCapacity: 1, - ignored: set.New(), + id: id, + head: head, + lacking: make(map[common.Hash]struct{}), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -114,12 +115,18 @@ func newPeer(id string, version int, head common.Hash, // Reset clears the internal state of a peer entity. func (p *peer) Reset() { + p.lock.Lock() + defer p.lock.Unlock() + atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0) - atomic.StoreInt32(&p.blockCapacity, 1) - atomic.StoreInt32(&p.receiptCapacity, 1) - atomic.StoreInt32(&p.stateCapacity, 1) - p.ignored.Clear() + atomic.StoreInt32(&p.stateIdle, 0) + + p.blockThroughput = 0 + p.receiptThroughput = 0 + p.stateThroughput = 0 + + p.lacking = make(map[common.Hash]struct{}) } // Fetch61 sends a block retrieval request to the remote peer. @@ -210,108 +217,116 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return nil } -// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its block retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetBlocksIdle() { - p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) +// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval +// requests. Its estimated block retrieval throughput is updated with that measured +// just now. +func (p *peer) SetBlocksIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } -// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its block body retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetBodiesIdle() { - p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle) +// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval +// requests. Its estimated body retrieval throughput is updated with that measured +// just now. +func (p *peer) SetBodiesIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } -// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its receipt retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetReceiptsIdle() { - p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) +// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt +// retrieval requests. Its estimated receipt retrieval throughput is updated +// with that measured just now. +func (p *peer) SetReceiptsIdle(delivered int) { + p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) } -// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval -// requests. Its node data retrieval allowance will also be updated either up- or -// downwards, depending on whether the previous fetch completed in time. -func (p *peer) SetNodeDataIdle() { - p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie +// data retrieval requests. Its estimated state retrieval throughput is updated +// with that measured just now. +func (p *peer) SetNodeDataIdle(delivered int) { + p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) } // setIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its data retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { - // Update the peer's download allowance based on previous performance - scale := 2.0 - if time.Since(started) > softTTL { - scale = 0.5 - if time.Since(started) > hardTTL { - scale = 1 / float64(maxFetch) // reduces capacity to 1 - } +// Its estimated retrieval throughput is updated with that measured just now. +func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { + // Irrelevant of the scaling, make sure the peer ends up idle + defer atomic.StoreInt32(idle, 0) + + p.lock.RLock() + defer p.lock.RUnlock() + + // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum + if delivered == 0 { + *throughput = 0 + return } - for { - // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(capacity) - next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale))) - - // Try to update the old value - if atomic.CompareAndSwapInt32(capacity, prev, next) { - // If we're having problems at 1 capacity, try to find better peers - if next == 1 { - p.Demote() - } - break - } - } - // Set the peer to idle to allow further fetch requests - atomic.StoreInt32(idle, 0) + // Otherwise update the throughput with a new measurement + measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor + *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured } // BlockCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// previously discovered throughput. func (p *peer) BlockCapacity() int { - return int(atomic.LoadInt32(&p.blockCapacity)) + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch)))) } -// ReceiptCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// ReceiptCapacity retrieves the peers receipt download allowance based on its +// previously discovered throughput. func (p *peer) ReceiptCapacity() int { - return int(atomic.LoadInt32(&p.receiptCapacity)) + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch)))) } -// NodeDataCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// NodeDataCapacity retrieves the peers state download allowance based on its +// previously discovered throughput. func (p *peer) NodeDataCapacity() int { - return int(atomic.LoadInt32(&p.stateCapacity)) -} + p.lock.RLock() + defer p.lock.RUnlock() -// Promote increases the peer's reputation. -func (p *peer) Promote() { - atomic.AddInt32(&p.rep, 1) + return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch)))) } -// Demote decreases the peer's reputation or leaves it at 0. -func (p *peer) Demote() { - for { - // Calculate the new reputation value - prev := atomic.LoadInt32(&p.rep) - next := prev / 2 +// MarkLacking appends a new entity to the set of items (blocks, receipts, states) +// that a peer is known not to have (i.e. have been requested before). If the +// set reaches its maximum allowed capacity, items are randomly dropped off. +func (p *peer) MarkLacking(hash common.Hash) { + p.lock.Lock() + defer p.lock.Unlock() - // Try to update the old value - if atomic.CompareAndSwapInt32(&p.rep, prev, next) { - return + for len(p.lacking) >= maxLackingHashes { + for drop, _ := range p.lacking { + delete(p.lacking, drop) + break } } + p.lacking[hash] = struct{}{} +} + +// Lacks retrieves whether the hash of a blockchain item is on the peers lacking +// list (i.e. whether we know that the peer does not have it). +func (p *peer) Lacks(hash common.Hash) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + _, ok := p.lacking[hash] + return ok } // String implements fmt.Stringer. func (p *peer) String() string { + p.lock.RLock() + defer p.lock.RUnlock() + return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ - fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ - fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ - fmt.Sprintf("ignored %4d", p.ignored.Size()), + fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ + fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ + fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ + fmt.Sprintf("lacking %4d", len(p.lacking)), ) } @@ -342,6 +357,10 @@ func (ps *peerSet) Reset() { // Register injects a new peer into the working set, or returns an error if the // peer is already known. +// +// The method also sets the starting throughput values of the new peer to the +// average of all existing peers, to give it a realistic change of being used +// for data retrievals. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -349,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error { if _, ok := ps.peers[p.id]; ok { return errAlreadyRegistered } + if len(ps.peers) > 0 { + p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0 + + for _, peer := range ps.peers { + peer.lock.RLock() + p.blockThroughput += peer.blockThroughput + p.receiptThroughput += peer.receiptThroughput + p.stateThroughput += peer.stateThroughput + peer.lock.RUnlock() + } + p.blockThroughput /= float64(len(ps.peers)) + p.receiptThroughput /= float64(len(ps.peers)) + p.stateThroughput /= float64(len(ps.peers)) + } ps.peers[p.id] = p return nil } @@ -400,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - return ps.idlePeers(61, 61, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(61, 61, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -409,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - return ps.idlePeers(62, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(62, 64, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -418,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.receiptIdle) == 0 } - return ps.idlePeers(63, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.receiptThroughput + } + return ps.idlePeers(63, 64, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -427,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.stateIdle) == 0 } - return ps.idlePeers(63, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.stateThroughput + } + return ps.idlePeers(63, 64, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the // protocol version constraints, using the provided function to check idleness. -func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { +// The resulting set of peers are sorted by their measure throughput. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() @@ -447,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) } for i := 0; i < len(idle); i++ { for j := i + 1; j < len(idle); j++ { - if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + if throughput(idle[i]) < throughput(idle[j]) { idle[i], idle[j] = idle[j], idle[i] } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 56b46e285..1e55560db 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -101,11 +101,14 @@ type queue struct { resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain - lock sync.RWMutex + lock *sync.Mutex + active *sync.Cond + closed bool } // newQueue creates a new download queue for scheduling block retrieval. func newQueue(stateDb ethdb.Database) *queue { + lock := new(sync.Mutex) return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue { statePendPool: make(map[string]*fetchRequest), stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), + active: sync.NewCond(lock), + lock: lock, } } @@ -133,6 +138,7 @@ func (q *queue) Reset() { q.stateSchedLock.Lock() defer q.stateSchedLock.Unlock() + q.closed = false q.mode = FullSync q.fastSyncPivot = 0 @@ -162,18 +168,27 @@ func (q *queue) Reset() { q.resultOffset = 0 } +// Close marks the end of the sync, unblocking WaitResults. +// It may be called even if the queue is already closed. +func (q *queue) Close() { + q.lock.Lock() + q.closed = true + q.lock.Unlock() + q.active.Broadcast() +} + // PendingBlocks retrieves the number of block (body) requests pending for retrieval. func (q *queue) PendingBlocks() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.hashQueue.Size() + q.blockTaskQueue.Size() } // PendingReceipts retrieves the number of block receipts pending for retrieval. func (q *queue) PendingReceipts() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.receiptTaskQueue.Size() } @@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int { // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.blockPendPool) > 0 } @@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool { // InFlightReceipts retrieves whether there are receipt fetch requests currently // in flight. func (q *queue) InFlightReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.receiptPendPool) > 0 } @@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool { // InFlightNodeData retrieves whether there are node data entry fetch requests // currently in flight. func (q *queue) InFlightNodeData() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 } @@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool { // Idle returns if the queue is fully idle or has some data still inside. This // method is used by the tester to detect termination events. func (q *queue) Idle() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) @@ -237,8 +252,8 @@ func (q *queue) Idle() bool { // FastSyncPivot retrieves the currently used fast sync pivot point. func (q *queue) FastSyncPivot() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.fastSyncPivot } @@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 { // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight block (body) requests pending := 0 @@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool { // ShouldThrottleReceipts checks if the download should be throttled (active receipt // fetches exceed block cache). func (q *queue) ShouldThrottleReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight receipt requests pending := 0 @@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't -// been downloaded yet (or simply non existent). -func (q *queue) GetHeadResult() *fetchResult { - q.lock.RLock() - defer q.lock.RUnlock() +// WaitResults retrieves and permanently removes a batch of fetch +// results from the cache. the result slice will be empty if the queue +// has been closed. +func (q *queue) WaitResults() []*fetchResult { + q.lock.Lock() + defer q.lock.Unlock() - // If there are no results pending, return nil - if len(q.resultCache) == 0 || q.resultCache[0] == nil { - return nil - } - // If the next result is still incomplete, return nil - if q.resultCache[0].Pending > 0 { - return nil + nproc := q.countProcessableItems() + for nproc == 0 && !q.closed { + q.active.Wait() + nproc = q.countProcessableItems() } - // If the next result is the fast sync pivot... - if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { - // If the pivot state trie is still being pulled, return nil - if len(q.stateTaskPool) > 0 { - return nil + results := make([]*fetchResult, nproc) + copy(results, q.resultCache[:nproc]) + if len(results) > 0 { + // Mark results as done before dropping them from the cache. + for _, result := range results { + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) } - if q.PendingNodeData() > 0 { - return nil - } - // If the state is done, but not enough post-pivot headers were verified, stall... - for i := 0; i < fsHeaderForceVerify; i++ { - if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil { - return nil - } + // Delete the results from the cache and clear the tail. + copy(q.resultCache, q.resultCache[nproc:]) + for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { + q.resultCache[i] = nil } + // Advance the expected block number of the first cache entry. + q.resultOffset += uint64(nproc) } - return q.resultCache[0] + return results } -// TakeResults retrieves and permanently removes a batch of fetch results from -// the cache. -func (q *queue) TakeResults() []*fetchResult { - q.lock.Lock() - defer q.lock.Unlock() - - // Accumulate all available results - results := []*fetchResult{} +// countProcessableItems counts the processable items. +func (q *queue) countProcessableItems() int { for i, result := range q.resultCache { - // Stop if no more results are ready + // Don't process incomplete or unavailable items. if result == nil || result.Pending > 0 { - break + return i } - // The fast sync pivot block may only be processed after state fetch completes - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - if len(q.stateTaskPool) > 0 { - break - } - if q.PendingNodeData() > 0 { - break - } - // Even is state fetch is done, ensure post-pivot headers passed verifications - safe := true - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - safe = false + // Special handling for the fast-sync pivot block: + if q.mode == FastSync { + bnum := result.Header.Number.Uint64() + if bnum == q.fastSyncPivot { + // If the state of the pivot block is not + // available yet, we cannot proceed and return 0. + // + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 { + return i + } + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { + return i + } } } - if !safe { - break + // If we're just the fast sync pivot, stop as well + // because the following batch needs different insertion. + // This simplifies handling the switchover in d.process. + if bnum == q.fastSyncPivot+1 && i > 0 { + return i } } - // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { - break - } - results = append(results, result) - - hash := result.Header.Hash() - delete(q.blockDonePool, hash) - delete(q.receiptDonePool, hash) - } - // Delete the results from the slice and let them be garbage collected - // without this slice trick the results would stay in memory until nil - // would be assigned to them. - copy(q.resultCache, q.resultCache[len(results):]) - for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { - q.resultCache[k] = nil } - q.resultOffset += uint64(len(results)) - - return results + return len(q.resultCache) } // ReserveBlocks reserves a set of block hashes for the given peer, skipping any @@ -501,7 +499,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { hash, priority := taskQueue.Pop() - if p.ignored.Has(hash) { + if p.Lacks(hash.(common.Hash)) { skip[hash.(common.Hash)] = int(priority) } else { send[hash.(common.Hash)] = int(priority) @@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { + common.Report("index allocation went beyond available resultCache space") return nil, false, errInvalidChain } if q.resultCache[index] == nil { @@ -607,7 +606,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ continue } // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.ignored.Has(header.Hash()) { + if p.Lacks(header.Hash()) { skip = append(skip, header) } else { send = append(send, header) @@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ for _, header := range skip { taskQueue.Push(header, -float32(header.Number.Uint64())) } + if progress { + // Wake WaitResults, resultCache was modified + q.active.Signal() + } // Assemble and return the block download request if len(send) == 0 { return nil, progress, nil @@ -700,7 +703,7 @@ func (q *queue) Revoke(peerId string) { // ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { +func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -709,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string { // ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBodies(timeout time.Duration) []string { +func (q *queue) ExpireBodies(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -718,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string { // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireReceipts(timeout time.Duration) []string { +func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -727,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string { // ExpireNodeData checks for in flight node data requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) []string { +func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -737,12 +740,12 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // -// Note, this method expects the queue lock to be already held for writing. The +// Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { // Iterate over the expired requests and return each to the queue - peers := []string{} + expiries := make(map[string]int) for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout @@ -755,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } - peers = append(peers, id) + // Add the peer to the expiry report along the the number of failed requests + expirations := len(request.Hashes) + if expirations < len(request.Headers) { + expirations = len(request.Headers) + } + expiries[id] = expirations } } // Remove the expired requests from the pending pool - for _, id := range peers { + for id, _ := range expiries { delete(pendPool, id) } - return peers + return expiries } -// DeliverBlocks injects a block retrieval response into the download queue. -func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { +// DeliverBlocks injects a block retrieval response into the download queue. The +// method returns the number of blocks accepted from the delivery and also wakes +// any threads waiting for data delivery. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the blocks were never requested request := q.blockPendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } blockReqTimer.UpdateSince(request.Time) delete(q.blockPendPool, id) @@ -781,11 +791,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { // If no blocks were retrieved, mark them as unavailable for the origin peer if len(blocks) == 0 { for hash, _ := range request.Hashes { - request.Peer.ignored.Add(hash) + request.Peer.MarkLacking(hash) } } // Iterate over the downloaded blocks and add each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) for _, block := range blocks { // Skip any blocks that were not requested hash := block.Hash() @@ -808,29 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { delete(request.Hashes, hash) delete(q.hashPool, hash) + accepted++ } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + // Wake up WaitResults + if accepted > 0 { + q.active.Signal() + } // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: - return nil - + return accepted, nil case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): - return errs[0] - + return accepted, errs[0] case len(errs) == len(blocks): - return errStaleDelivery - + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } // DeliverBodies injects a block body retrieval response into the results queue. -func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// The method returns the number of blocks bodies accepted from the delivery and +// also wakes any threads waiting for data delivery. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -846,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi } // DeliverReceipts injects a receipt retrieval response into the results queue. -func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { +// The method returns the number of transaction receipts accepted from the delivery +// and also wakes any threads waiting for data delivery. +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -865,26 +881,29 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, - donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, + results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { + // Short circuit if the data was never requested request := pendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } reqTimer.UpdateSince(request.Time) delete(pendPool, id) // If no data items were retrieved, mark them as unavailable for the origin peer if results == 0 { - for hash, _ := range request.Headers { - request.Peer.ignored.Add(hash) + for _, header := range request.Headers { + request.Peer.MarkLacking(header.Hash()) } } // Assemble each of the results with their headers and retrieved data parts var ( - failure error - useful bool + accepted int + failure error + useful bool ) for i, header := range request.Headers { // Short circuit assembly if no more fetch results are found @@ -904,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ donePool[header.Hash()] = struct{}{} q.resultCache[index].Pending-- useful = true + accepted++ // Clean up a successful fetch request.Headers[i] = nil @@ -915,28 +935,32 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -float32(header.Number.Uint64())) } } + // Wake up WaitResults + if accepted > 0 { + q.active.Signal() + } // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: - return failure - + return accepted, failure case useful: - return fmt.Errorf("partial failure: %v", failure) - + return accepted, fmt.Errorf("partial failure: %v", failure) default: - return errStaleDelivery + return accepted, errStaleDelivery } } // DeliverNodeData injects a node state data retrieval response into the queue. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error { +// The method returns the number of node state entries originally requested, and +// the number of them actually accepted from the delivery. +func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the data was never requested request := q.statePendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } stateReqTimer.UpdateSince(request.Time) delete(q.statePendPool, id) @@ -944,14 +968,14 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If no data was retrieved, mark their hashes as unavailable for the origin peer if len(data) == 0 { for hash, _ := range request.Hashes { - request.Peer.ignored.Add(hash) + request.Peer.MarkLacking(hash) } } // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) process := []trie.SyncResult{} for _, blob := range data { - // Skip any blocks that were not requested + // Skip any state trie entires that were not requested hash := common.BytesToHash(crypto.Sha3(blob)) if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) @@ -959,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i } // Inject the next state trie item into the processing queue process = append(process, trie.SyncResult{hash, blob}) + accepted++ delete(request.Hashes, hash) delete(q.stateTaskPool, hash) @@ -976,19 +1001,21 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If none of the data items were good, it's a stale delivery switch { case len(errs) == 0: - return nil - + return accepted, nil case len(errs) == len(request.Hashes): - return errStaleDelivery - + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } // deliverNodeData is the asynchronous node data processor that injects a batch // of sync results into the state scheduler. func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { + // Wake up WaitResults after the state has been written because it + // might be waiting for the pivot block state to get completed. + defer q.active.Signal() + // Process results one by one to permit task fetches in between for i, result := range results { q.stateSchedLock.Lock() |