aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
authorJeffrey Wilcke <geffobscura@gmail.com>2015-11-24 20:48:47 +0800
committerJeffrey Wilcke <geffobscura@gmail.com>2015-11-24 20:48:47 +0800
commit5490437942967638bcc6198035315f6811febaa8 (patch)
treeec4fbee454bacbf2b80b5a7ff402fb48dd2c10cf /eth/downloader
parente5532154a50114d5ffb1ffd850b746cab00cb899 (diff)
parentb0fb48c389460193d9fc0a5118d79ff6dec48ce0 (diff)
downloadgo-tangerine-5490437942967638bcc6198035315f6811febaa8.tar
go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.gz
go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.bz2
go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.lz
go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.xz
go-tangerine-5490437942967638bcc6198035315f6811febaa8.tar.zst
go-tangerine-5490437942967638bcc6198035315f6811febaa8.zip
Merge branch 'develop' into release/1.3.2v1.3.2
Conflicts: VERSION cmd/geth/main.go
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go488
-rw-r--r--eth/downloader/downloader_test.go135
-rw-r--r--eth/downloader/peer.go240
-rw-r--r--eth/downloader/queue.go301
4 files changed, 620 insertions, 544 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 153427ee4..c272d05af 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -45,16 +45,17 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
- hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
- blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
- blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
- headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
- bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
- bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
- receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
- stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
+ hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
+ blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
+ blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
+
+ headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
+ bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
+ bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
+ receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
+ receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
+ stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -74,7 +75,6 @@ var (
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer")
@@ -90,6 +90,7 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelProcessing = errors.New("processing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
)
@@ -129,7 +130,6 @@ type Downloader struct {
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
- processing int32
notified int32
// Channels
@@ -215,7 +215,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
- return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0
+ return atomic.LoadInt32(&d.synchronising) > 0
}
// RegisterPeer injects a new download peer into the set of block source to be
@@ -263,9 +263,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id)
- case errPendingQueue:
- glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
-
default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
@@ -290,10 +287,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
}
- // Abort if the queue still contains some leftover data
- if d.queue.GetHeadResult() != nil {
- return errPendingQueue
- }
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
@@ -335,7 +328,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer func() {
// reset on error
if err != nil {
- d.cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -365,23 +357,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent hash and block retrieval algorithm
+ // Initiate the sync using a concurrent hash and block retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode, 0)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, d.mode, 0)
-
- errc := make(chan error, 2)
- go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
- go func() { errc <- d.fetchBlocks61(origin + 1) }()
-
- // If any fetcher fails, cancel the other
- if err := <-errc; err != nil {
- d.cancel()
- <-errc
- return err
- }
- return <-errc
+ return d.spawnSync(
+ func() error { return d.fetchHashes61(p, td, origin+1) },
+ func() error { return d.fetchBlocks61(origin + 1) },
+ )
case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block
@@ -405,7 +389,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch d.mode {
case LightSync:
pivot = latest
-
case FastSync:
// Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@@ -426,34 +409,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
}
d.queue.Prepare(origin+1, d.mode, pivot)
-
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 4)
- go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
- go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
-
- // If any fetcher fails, cancel the others
- var fail error
- for i := 0; i < cap(errc); i++ {
- if err := <-errc; err != nil {
- if fail == nil {
- fail = err
- d.cancel()
- }
- }
- }
- return fail
+ return d.spawnSync(
+ func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
+ )
default:
// Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer
}
- return nil
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers ...func() error) error {
+ var wg sync.WaitGroup
+ errc := make(chan error, len(fetchers)+1)
+ wg.Add(len(fetchers) + 1)
+ go func() { defer wg.Done(); errc <- d.process() }()
+ for _, fn := range fetchers {
+ fn := fn
+ go func() { defer wg.Done(); errc <- fn() }()
+ }
+ // Wait for the first error, then terminate the others.
+ var err error
+ for i := 0; i < len(fetchers)+1; i++ {
+ if i == len(fetchers) {
+ // Close the queue when all fetchers have exited.
+ // This will cause the block processor to end when
+ // it has processed the queue.
+ d.queue.Close()
+ }
+ if err = <-errc; err != nil {
+ break
+ }
+ }
+ d.queue.Close()
+ d.cancel()
+ wg.Wait()
+ return err
}
// cancel cancels all of the operations and resets the queue. It returns true
@@ -470,12 +470,10 @@ func (d *Downloader) cancel() {
}
}
d.cancelLock.Unlock()
-
- // Reset the queue
- d.queue.Reset()
}
// Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1)
d.cancel()
@@ -489,21 +487,12 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
// Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head})
- timeout := time.After(blockSoftTTL)
+ timeout := time.After(hashTTL)
for {
select {
case <-d.cancelCh:
return 0, errCancelBlockFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-d.hashCh:
- // Out of bounds hashes received, ignore them
-
case packet := <-d.blockCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
@@ -521,6 +510,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout
+
+ case <-d.hashCh:
+ // Out of bounds hashes received, ignore them
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -571,18 +570,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
}
}
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head hash timeout", p)
+ return 0, errTimeout
+
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head hash timeout", p)
- return 0, errTimeout
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@@ -631,18 +631,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
}
start = check
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: search hash timeout", p)
+ return 0, errTimeout
+
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: search hash timeout", p)
- return 0, errTimeout
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -676,12 +677,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
return errCancelHashFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
if packet.PeerId() != p.id {
@@ -750,6 +745,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: hash request timed out", p)
hashTimeoutMeter.Mark(1)
return errTimeout
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -774,59 +776,31 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.cancelCh:
return errCancelBlockFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of blocks, and demote in case of errors
blocks := packet.(*blockPack).blocks
- err := d.queue.DeliverBlocks(peer.id, blocks)
- switch err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above)
- if len(blocks) == 0 {
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
- go d.process()
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of blocks and check chain validity
+ accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: stale delivery", peer)
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ peer.SetBlocksIdle(accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && len(blocks) == 0:
+ glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
- go d.process()
+ glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
}
}
// Blocks arrived, try to update the progress
@@ -859,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
+ for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ peer.SetBlocksIdle(0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
@@ -909,6 +888,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable
}
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -941,18 +927,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
}
return headers[0].Number.Uint64(), nil
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1008,18 +995,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
}
}
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@@ -1068,18 +1056,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
}
start = check
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: search header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: search header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1141,12 +1130,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
return errCancelHeaderFetch
- case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
- case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers
if packet.PeerId() != p.id {
@@ -1268,6 +1251,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
return nil
+
+ case <-d.hashCh:
+ case <-d.blockCh:
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1279,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- setIdle = func(p *peer) { p.SetBodiesIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -1303,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
- expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() }
- setIdle = func(p *peer) { p.SetReceiptsIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -1327,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Debug).Infof("Downloading node state data")
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
if err != nil {
@@ -1336,10 +1324,8 @@ func (d *Downloader) fetchNodeData() error {
d.cancel()
return
}
- // Processing succeeded, notify state fetcher and processor of continuation
- if d.queue.PendingNodeData() == 0 {
- go d.process()
- } else {
+ // Processing succeeded, notify state fetcher of continuation
+ if d.queue.PendingNodeData() > 0 {
select {
case d.stateWakeCh <- true:
default:
@@ -1348,19 +1334,18 @@ func (d *Downloader) fetchNodeData() error {
// Log a message to the user and return
d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
-
d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
}
- expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() }
- setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@@ -1373,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
- expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
- idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
+ idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1391,57 +1376,29 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
case <-d.cancelCh:
return errCancel
- case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
- case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of data, and demote in case of errors
- switch err := deliver(packet); err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Items() == 0 {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
- go d.process()
-
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of data and check chain validity
+ accepted, err := deliver(packet)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ setIdle(peer, accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && packet.Items() == 0:
+ glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
- go d.process()
+ glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
}
}
// Blocks assembled, try to update the progress
@@ -1474,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
- for _, pid := range expire() {
+ for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ setIdle(peer, 0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
@@ -1508,7 +1469,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
if progress {
progressed = true
- go d.process()
}
if request == nil {
continue
@@ -1540,51 +1500,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable
}
+
+ case <-d.hashCh:
+ case <-d.blockCh:
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
// process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents:
-// -
-//
-// The algorithmic flow is as follows:
-// - The `processing` flag is swapped to 1 to ensure singleton access
-// - The current `cancel` channel is retrieved to detect sync abortions
-// - Blocks are iteratively taken from the cache and inserted into the chain
-// - When the cache becomes empty, insertion stops
-// - The `processing` flag is swapped back to 0
-// - A post-exit check is made whether new blocks became available
-// - This step is important: it handles a potential race condition between
-// checking for no more work, and releasing the processing "mutex". In
-// between these state changes, a block may have arrived, but a processing
-// attempt denied, so we need to re-enter to ensure the block isn't left
-// to idle in the cache.
-func (d *Downloader) process() {
- // Make sure only one goroutine is ever allowed to process blocks at once
- if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
- return
- }
- // If the processor just exited, but there are freshly pending items, try to
- // reenter. This is needed because the goroutine spinned up for processing
- // the fresh results might have been rejected entry to to this present thread
- // not yet releasing the `processing` state.
- defer func() {
- if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
- d.process()
- }
- }()
- // Release the lock upon exit (note, before checking for reentry!)
- // the import statistics to zero.
- defer atomic.StoreInt32(&d.processing, 0)
-
- // Repeat the processing as long as there are results to process
+// chain. The type of import operation will depend on the result contents.
+func (d *Downloader) process() error {
+ pivot := d.queue.FastSyncPivot()
for {
- // Fetch the next batch of results
- pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
- results := d.queue.TakeResults()
+ results := d.queue.WaitResults()
if len(results) == 0 {
- return
+ return nil // queue empty
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
@@ -1597,7 +1529,7 @@ func (d *Downloader) process() {
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
- return
+ return errCancelProcessing
}
// Retrieve the a batch of results to import
var (
@@ -1633,8 +1565,7 @@ func (d *Downloader) process() {
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
- d.cancel()
- return
+ return err
}
// Shift the results to the next batch
results = results[items:]
@@ -1685,19 +1616,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
dropMeter.Mark(int64(packet.Items()))
}
}()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
-
+ if cancel == nil {
+ return errNoSyncActive
+ }
select {
case destCh <- packet:
return nil
-
case <-cancel:
return errNoSyncActive
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ef6f74a6b..cfcc8a2ef 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
}
}
dl.lock.RUnlock()
-
- err := dl.downloader.synchronise(id, hash, td, mode)
- for {
- // If the queue is empty and processing stopped, break
- if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
- break
- }
- // Otherwise sleep a bit and retry
- time.Sleep(time.Millisecond)
- }
- return err
+ return dl.downloader.synchronise(id, hash, td, mode)
}
// hasHeader checks if a header is present in the testers canonical chain.
@@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis
func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
for start := time.Now(); time.Since(start) < time.Second; {
time.Sleep(25 * time.Millisecond)
- tester.lock.RLock()
- tester.downloader.queue.lock.RLock()
+ tester.lock.Lock()
+ tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
}
frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks)
- tester.downloader.queue.lock.RUnlock()
- tester.lock.RUnlock()
+ tester.downloader.queue.lock.Unlock()
+ tester.lock.Unlock()
if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
break
@@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(
func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither hashes nor blocks are accepted
@@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers and
// bodies.
func TestInactiveDownloader62(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers,
// bodies and receipts.
func TestInactiveDownloader63(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch {
@@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t,
func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
@@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t,
func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6
func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac
func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
tester := newTester()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
@@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
- {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
- {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()
@@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64,
func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64,
func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F
func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small block chain
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
@@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
}
}
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) }
+func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) }
+func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) }
+func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
+func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
+func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+ hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
+ fakeHeads := []*types.Header{{}, {}, {}, {}}
+ for i := 0; i < 200; i++ {
+ tester := newTester()
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+ // Whenever the downloader requests headers, flood it with
+ // a lot of unrequested header deliveries.
+ tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
+ deliveriesDone := make(chan struct{}, 500)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ peer := fmt.Sprintf("fake-peer%d", i)
+ go func() {
+ tester.downloader.DeliverHeaders(peer, fakeHeads)
+ deliveriesDone <- struct{}{}
+ }()
+ }
+ // Deliver the actual requested headers.
+ impl := tester.peerGetAbsHeadersFn("peer", 0)
+ go impl(from, count, skip, reverse)
+ // None of the extra deliveries should block.
+ timeout := time.After(5 * time.Second)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ select {
+ case <-deliveriesDone:
+ case <-timeout:
+ panic("blocked")
+ }
+ }
+ return nil
+ }
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Errorf("sync failed: %v", err)
+ }
+ }
+}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 1f457cb15..80f08b68f 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -28,7 +28,11 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "gopkg.in/fatih/set.v0"
+)
+
+const (
+ maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+ throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
)
// Hash and block fetchers belonging to eth/61 and below
@@ -57,17 +61,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation
- blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
- receiptCapacity int32 // Number of receipts allowed to fetch per request
- stateCapacity int32 // Number of node data pieces allowed to fetch per request
+ blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
+ receiptThroughput float64 // Number of receipts measured to be retrievable per second
+ stateThroughput float64 // Number of node data pieces measured to be retrievable per second
blockStarted time.Time // Time instance when the last block (body)fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
- ignored *set.Set // Set of hashes not to request (didn't have previously)
+ lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -81,6 +84,7 @@ type peer struct {
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
+ lock sync.RWMutex
}
// newPeer create a new downloader peer, with specific hash and block retrieval
@@ -90,12 +94,9 @@ func newPeer(id string, version int, head common.Hash,
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{
- id: id,
- head: head,
- blockCapacity: 1,
- receiptCapacity: 1,
- stateCapacity: 1,
- ignored: set.New(),
+ id: id,
+ head: head,
+ lacking: make(map[common.Hash]struct{}),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
@@ -114,12 +115,18 @@ func newPeer(id string, version int, head common.Hash,
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
- atomic.StoreInt32(&p.blockCapacity, 1)
- atomic.StoreInt32(&p.receiptCapacity, 1)
- atomic.StoreInt32(&p.stateCapacity, 1)
- p.ignored.Clear()
+ atomic.StoreInt32(&p.stateIdle, 0)
+
+ p.blockThroughput = 0
+ p.receiptThroughput = 0
+ p.stateThroughput = 0
+
+ p.lacking = make(map[common.Hash]struct{})
}
// Fetch61 sends a block retrieval request to the remote peer.
@@ -210,108 +217,116 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil
}
-// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBlocksIdle() {
- p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
+// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
+// requests. Its estimated block retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBlocksIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
-// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block body retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBodiesIdle() {
- p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle)
+// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
+// requests. Its estimated body retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBodiesIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
-// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its receipt retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetReceiptsIdle() {
- p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
+// retrieval requests. Its estimated receipt retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetReceiptsIdle(delivered int) {
+ p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
}
-// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
-// requests. Its node data retrieval allowance will also be updated either up- or
-// downwards, depending on whether the previous fetch completed in time.
-func (p *peer) SetNodeDataIdle() {
- p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
+// data retrieval requests. Its estimated state retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetNodeDataIdle(delivered int) {
+ p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its data retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
- // Update the peer's download allowance based on previous performance
- scale := 2.0
- if time.Since(started) > softTTL {
- scale = 0.5
- if time.Since(started) > hardTTL {
- scale = 1 / float64(maxFetch) // reduces capacity to 1
- }
+// Its estimated retrieval throughput is updated with that measured just now.
+func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+ // Irrelevant of the scaling, make sure the peer ends up idle
+ defer atomic.StoreInt32(idle, 0)
+
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
+ if delivered == 0 {
+ *throughput = 0
+ return
}
- for {
- // Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(capacity)
- next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
-
- // Try to update the old value
- if atomic.CompareAndSwapInt32(capacity, prev, next) {
- // If we're having problems at 1 capacity, try to find better peers
- if next == 1 {
- p.Demote()
- }
- break
- }
- }
- // Set the peer to idle to allow further fetch requests
- atomic.StoreInt32(idle, 0)
+ // Otherwise update the throughput with a new measurement
+ measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
+ *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
}
// BlockCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
+// previously discovered throughput.
func (p *peer) BlockCapacity() int {
- return int(atomic.LoadInt32(&p.blockCapacity))
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
}
-// ReceiptCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
+// ReceiptCapacity retrieves the peers receipt download allowance based on its
+// previously discovered throughput.
func (p *peer) ReceiptCapacity() int {
- return int(atomic.LoadInt32(&p.receiptCapacity))
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
}
-// NodeDataCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
+// NodeDataCapacity retrieves the peers state download allowance based on its
+// previously discovered throughput.
func (p *peer) NodeDataCapacity() int {
- return int(atomic.LoadInt32(&p.stateCapacity))
-}
+ p.lock.RLock()
+ defer p.lock.RUnlock()
-// Promote increases the peer's reputation.
-func (p *peer) Promote() {
- atomic.AddInt32(&p.rep, 1)
+ return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
}
-// Demote decreases the peer's reputation or leaves it at 0.
-func (p *peer) Demote() {
- for {
- // Calculate the new reputation value
- prev := atomic.LoadInt32(&p.rep)
- next := prev / 2
+// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
+// that a peer is known not to have (i.e. have been requested before). If the
+// set reaches its maximum allowed capacity, items are randomly dropped off.
+func (p *peer) MarkLacking(hash common.Hash) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
- // Try to update the old value
- if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
- return
+ for len(p.lacking) >= maxLackingHashes {
+ for drop, _ := range p.lacking {
+ delete(p.lacking, drop)
+ break
}
}
+ p.lacking[hash] = struct{}{}
+}
+
+// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
+// list (i.e. whether we know that the peer does not have it).
+func (p *peer) Lacks(hash common.Hash) bool {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ _, ok := p.lacking[hash]
+ return ok
}
// String implements fmt.Stringer.
func (p *peer) String() string {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
- fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
- fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
- fmt.Sprintf("ignored %4d", p.ignored.Size()),
+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
+ fmt.Sprintf("lacking %4d", len(p.lacking)),
)
}
@@ -342,6 +357,10 @@ func (ps *peerSet) Reset() {
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
+//
+// The method also sets the starting throughput values of the new peer to the
+// average of all existing peers, to give it a realistic change of being used
+// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
@@ -349,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error {
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
}
+ if len(ps.peers) > 0 {
+ p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
+
+ for _, peer := range ps.peers {
+ peer.lock.RLock()
+ p.blockThroughput += peer.blockThroughput
+ p.receiptThroughput += peer.receiptThroughput
+ p.stateThroughput += peer.stateThroughput
+ peer.lock.RUnlock()
+ }
+ p.blockThroughput /= float64(len(ps.peers))
+ p.receiptThroughput /= float64(len(ps.peers))
+ p.stateThroughput /= float64(len(ps.peers))
+ }
ps.peers[p.id] = p
return nil
}
@@ -400,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- return ps.idlePeers(61, 61, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(61, 61, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@@ -409,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- return ps.idlePeers(62, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@@ -418,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
- return ps.idlePeers(63, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.receiptThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@@ -427,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
- return ps.idlePeers(63, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.stateThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
-func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
+// The resulting set of peers are sorted by their measure throughput.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
@@ -447,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
- if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ if throughput(idle[i]) < throughput(idle[j]) {
idle[i], idle[j] = idle[j], idle[i]
}
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 56b46e285..1e55560db 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
- lock sync.RWMutex
+ lock *sync.Mutex
+ active *sync.Cond
+ closed bool
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue {
+ lock := new(sync.Mutex)
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
+ active: sync.NewCond(lock),
+ lock: lock,
}
}
@@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock()
+ q.closed = false
q.mode = FullSync
q.fastSyncPivot = 0
@@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0
}
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+ q.lock.Lock()
+ q.closed = true
+ q.lock.Unlock()
+ q.active.Broadcast()
+}
+
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.receiptTaskQueue.Size()
}
@@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.blockPendPool) > 0
}
@@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.receiptPendPool) > 0
}
@@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
}
@@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events.
func (q *queue) Idle() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.fastSyncPivot
}
@@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests
pending := 0
@@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests
pending := 0
@@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
-// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadResult() *fetchResult {
- q.lock.RLock()
- defer q.lock.RUnlock()
+// WaitResults retrieves and permanently removes a batch of fetch
+// results from the cache. the result slice will be empty if the queue
+// has been closed.
+func (q *queue) WaitResults() []*fetchResult {
+ q.lock.Lock()
+ defer q.lock.Unlock()
- // If there are no results pending, return nil
- if len(q.resultCache) == 0 || q.resultCache[0] == nil {
- return nil
- }
- // If the next result is still incomplete, return nil
- if q.resultCache[0].Pending > 0 {
- return nil
+ nproc := q.countProcessableItems()
+ for nproc == 0 && !q.closed {
+ q.active.Wait()
+ nproc = q.countProcessableItems()
}
- // If the next result is the fast sync pivot...
- if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
- // If the pivot state trie is still being pulled, return nil
- if len(q.stateTaskPool) > 0 {
- return nil
+ results := make([]*fetchResult, nproc)
+ copy(results, q.resultCache[:nproc])
+ if len(results) > 0 {
+ // Mark results as done before dropping them from the cache.
+ for _, result := range results {
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
}
- if q.PendingNodeData() > 0 {
- return nil
- }
- // If the state is done, but not enough post-pivot headers were verified, stall...
- for i := 0; i < fsHeaderForceVerify; i++ {
- if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
- return nil
- }
+ // Delete the results from the cache and clear the tail.
+ copy(q.resultCache, q.resultCache[nproc:])
+ for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+ q.resultCache[i] = nil
}
+ // Advance the expected block number of the first cache entry.
+ q.resultOffset += uint64(nproc)
}
- return q.resultCache[0]
+ return results
}
-// TakeResults retrieves and permanently removes a batch of fetch results from
-// the cache.
-func (q *queue) TakeResults() []*fetchResult {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Accumulate all available results
- results := []*fetchResult{}
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Stop if no more results are ready
+ // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
- break
+ return i
}
- // The fast sync pivot block may only be processed after state fetch completes
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- if len(q.stateTaskPool) > 0 {
- break
- }
- if q.PendingNodeData() > 0 {
- break
- }
- // Even is state fetch is done, ensure post-pivot headers passed verifications
- safe := true
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- safe = false
+ // Special handling for the fast-sync pivot block:
+ if q.mode == FastSync {
+ bnum := result.Header.Number.Uint64()
+ if bnum == q.fastSyncPivot {
+ // If the state of the pivot block is not
+ // available yet, we cannot proceed and return 0.
+ //
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+ return i
+ }
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
+ return i
+ }
}
}
- if !safe {
- break
+ // If we're just the fast sync pivot, stop as well
+ // because the following batch needs different insertion.
+ // This simplifies handling the switchover in d.process.
+ if bnum == q.fastSyncPivot+1 && i > 0 {
+ return i
}
}
- // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
- break
- }
- results = append(results, result)
-
- hash := result.Header.Hash()
- delete(q.blockDonePool, hash)
- delete(q.receiptDonePool, hash)
- }
- // Delete the results from the slice and let them be garbage collected
- // without this slice trick the results would stay in memory until nil
- // would be assigned to them.
- copy(q.resultCache, q.resultCache[len(results):])
- for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
- q.resultCache[k] = nil
}
- q.resultOffset += uint64(len(results))
-
- return results
+ return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
@@ -501,7 +499,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe
for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
hash, priority := taskQueue.Pop()
- if p.ignored.Has(hash) {
+ if p.Lacks(hash.(common.Hash)) {
skip[hash.(common.Hash)] = int(priority)
} else {
send[hash.(common.Hash)] = int(priority)
@@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
+ common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
@@ -607,7 +606,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
continue
}
// Otherwise unless the peer is known not to have the data, add to the retrieve list
- if p.ignored.Has(header.Hash()) {
+ if p.Lacks(header.Hash()) {
skip = append(skip, header)
} else {
send = append(send, header)
@@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if progress {
+ // Wake WaitResults, resultCache was modified
+ q.active.Signal()
+ }
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, nil
@@ -700,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -709,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBodies(timeout time.Duration) []string {
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -718,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -727,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -737,12 +740,12 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
-// Note, this method expects the queue lock to be already held for writing. The
+// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
- peers := []string{}
+ expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
@@ -755,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
- peers = append(peers, id)
+ // Add the peer to the expiry report along the the number of failed requests
+ expirations := len(request.Hashes)
+ if expirations < len(request.Headers) {
+ expirations = len(request.Headers)
+ }
+ expiries[id] = expirations
}
}
// Remove the expired requests from the pending pool
- for _, id := range peers {
+ for id, _ := range expiries {
delete(pendPool, id)
}
- return peers
+ return expiries
}
-// DeliverBlocks injects a block retrieval response into the download queue.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a block retrieval response into the download queue. The
+// method returns the number of blocks accepted from the delivery and also wakes
+// any threads waiting for data delivery.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
request := q.blockPendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id)
@@ -781,11 +791,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
for hash, _ := range request.Hashes {
- request.Peer.ignored.Add(hash)
+ request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded blocks and add each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
for _, block := range blocks {
// Skip any blocks that were not requested
hash := block.Hash()
@@ -808,29 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
delete(request.Hashes, hash)
delete(q.hashPool, hash)
+ accepted++
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
-
+ return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
- return errs[0]
-
+ return accepted, errs[0]
case len(errs) == len(blocks):
- return errStaleDelivery
-
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// DeliverBodies injects a block body retrieval response into the results queue.
-func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -846,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
-func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -865,26 +881,29 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
- donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+ results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
// If no data items were retrieved, mark them as unavailable for the origin peer
if results == 0 {
- for hash, _ := range request.Headers {
- request.Peer.ignored.Add(hash)
+ for _, header := range request.Headers {
+ request.Peer.MarkLacking(header.Hash())
}
}
// Assemble each of the results with their headers and retrieved data parts
var (
- failure error
- useful bool
+ accepted int
+ failure error
+ useful bool
)
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
@@ -904,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending--
useful = true
+ accepted++
// Clean up a successful fetch
request.Headers[i] = nil
@@ -915,28 +935,32 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
- return failure
-
+ return accepted, failure
case useful:
- return fmt.Errorf("partial failure: %v", failure)
-
+ return accepted, fmt.Errorf("partial failure: %v", failure)
default:
- return errStaleDelivery
+ return accepted, errStaleDelivery
}
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
+// The method returns the number of node state entries originally requested, and
+// the number of them actually accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
@@ -944,14 +968,14 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If no data was retrieved, mark their hashes as unavailable for the origin peer
if len(data) == 0 {
for hash, _ := range request.Hashes {
- request.Peer.ignored.Add(hash)
+ request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
- // Skip any blocks that were not requested
+ // Skip any state trie entires that were not requested
hash := common.BytesToHash(crypto.Sha3(blob))
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@@ -959,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob})
+ accepted++
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
@@ -976,19 +1001,21 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
-
+ return accepted, nil
case len(errs) == len(request.Hashes):
- return errStaleDelivery
-
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+ // Wake up WaitResults after the state has been written because it
+ // might be waiting for the pivot block state to get completed.
+ defer q.active.Signal()
+
// Process results one by one to permit task fetches in between
for i, result := range results {
q.stateSchedLock.Lock()