aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-11-14 00:08:15 +0800
committerFelix Lange <fjl@twurst.com>2015-11-19 21:18:34 +0800
commit900da3d800ad299c22ecb5d14f477600931d70b6 (patch)
treeba63e738bd44c77a1c161cf906ddab374beb8f43 /eth/downloader/downloader.go
parent9422eec55460aaca300cabd52124ed0cbd8dedd3 (diff)
downloaddexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.gz
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.bz2
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.lz
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.xz
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.tar.zst
dexon-900da3d800ad299c22ecb5d14f477600931d70b6.zip
eth/downloader: don't hang for spurious deliveries
Unexpected deliveries could block indefinitely if they arrived at the right time. The fix is to ensure that the cancellation channel is always closed when the sync ends, unblocking any deliveries. Also remove the atomic check for whether a sync is currently running because it doesn't help and can be misleading. Cancelling always seems to break the tests though. The downloader spawned d.process whenever new data arrived, making it somewhat hard to track when block processing was actually done. Fix this by running d.process in a dedicated goroutine that is tied to the lifecycle of the sync. d.process gets notified of new work by the queue instead of being invoked all the time. This removes a ton of weird workaround code, including a hairy use of atomic CAS.
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go162
1 files changed, 57 insertions, 105 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 153427ee4..ac324176d 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -74,7 +74,6 @@ var (
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer")
@@ -90,6 +89,7 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelProcessing = errors.New("processing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
)
@@ -129,7 +129,6 @@ type Downloader struct {
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
- processing int32
notified int32
// Channels
@@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
- return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0
+ return atomic.LoadInt32(&d.synchronising) > 0
}
// RegisterPeer injects a new download peer into the set of block source to be
@@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id)
- case errPendingQueue:
- glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
-
default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
@@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
}
- // Abort if the queue still contains some leftover data
- if d.queue.GetHeadResult() != nil {
- return errPendingQueue
- }
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
@@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer func() {
// reset on error
if err != nil {
- d.cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -365,23 +356,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent hash and block retrieval algorithm
+ // Initiate the sync using a concurrent hash and block retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode, 0)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, d.mode, 0)
-
- errc := make(chan error, 2)
- go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
- go func() { errc <- d.fetchBlocks61(origin + 1) }()
-
- // If any fetcher fails, cancel the other
- if err := <-errc; err != nil {
- d.cancel()
- <-errc
- return err
- }
- return <-errc
+ return d.spawnSync(
+ func() error { return d.fetchHashes61(p, td, origin+1) },
+ func() error { return d.fetchBlocks61(origin + 1) },
+ )
case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block
@@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch d.mode {
case LightSync:
pivot = latest
-
case FastSync:
// Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
}
d.queue.Prepare(origin+1, d.mode, pivot)
-
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 4)
- go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
- go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
-
- // If any fetcher fails, cancel the others
- var fail error
- for i := 0; i < cap(errc); i++ {
- if err := <-errc; err != nil {
- if fail == nil {
- fail = err
- d.cancel()
- }
- }
- }
- return fail
+ return d.spawnSync(
+ func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
+ )
default:
// Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer
}
- return nil
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers ...func() error) error {
+ var wg sync.WaitGroup
+ errc := make(chan error, len(fetchers)+1)
+ wg.Add(len(fetchers) + 1)
+ go func() { defer wg.Done(); errc <- d.process() }()
+ for _, fn := range fetchers {
+ fn := fn
+ go func() { defer wg.Done(); errc <- fn() }()
+ }
+ // Wait for the first error, then terminate the others.
+ var err error
+ for i := 0; i < len(fetchers)+1; i++ {
+ if i == len(fetchers) {
+ // Close the queue when all fetchers have exited.
+ // This will cause the block processor to end when
+ // it has processed the queue.
+ d.queue.Close()
+ }
+ if err = <-errc; err != nil {
+ break
+ }
+ }
+ d.queue.Close()
+ d.cancel()
+ wg.Wait()
+ return err
}
// cancel cancels all of the operations and resets the queue. It returns true
@@ -470,12 +469,10 @@ func (d *Downloader) cancel() {
}
}
d.cancelLock.Unlock()
-
- // Reset the queue
- d.queue.Reset()
}
// Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1)
d.cancel()
@@ -800,7 +797,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Promote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
- go d.process()
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -826,7 +822,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
- go d.process()
}
}
// Blocks arrived, try to update the progress
@@ -1336,10 +1331,8 @@ func (d *Downloader) fetchNodeData() error {
d.cancel()
return
}
- // Processing succeeded, notify state fetcher and processor of continuation
- if d.queue.PendingNodeData() == 0 {
- go d.process()
- } else {
+ // Processing succeeded, notify state fetcher of continuation
+ if d.queue.PendingNodeData() > 0 {
select {
case d.stateWakeCh <- true:
default:
@@ -1348,7 +1341,6 @@ func (d *Downloader) fetchNodeData() error {
// Log a message to the user and return
d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
-
d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
@@ -1415,7 +1407,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Promote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
- go d.process()
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -1441,7 +1432,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
- go d.process()
}
}
// Blocks assembled, try to update the progress
@@ -1508,7 +1498,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
if progress {
progressed = true
- go d.process()
}
if request == nil {
continue
@@ -1545,46 +1534,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
// process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents:
-// -
-//
-// The algorithmic flow is as follows:
-// - The `processing` flag is swapped to 1 to ensure singleton access
-// - The current `cancel` channel is retrieved to detect sync abortions
-// - Blocks are iteratively taken from the cache and inserted into the chain
-// - When the cache becomes empty, insertion stops
-// - The `processing` flag is swapped back to 0
-// - A post-exit check is made whether new blocks became available
-// - This step is important: it handles a potential race condition between
-// checking for no more work, and releasing the processing "mutex". In
-// between these state changes, a block may have arrived, but a processing
-// attempt denied, so we need to re-enter to ensure the block isn't left
-// to idle in the cache.
-func (d *Downloader) process() {
- // Make sure only one goroutine is ever allowed to process blocks at once
- if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
- return
- }
- // If the processor just exited, but there are freshly pending items, try to
- // reenter. This is needed because the goroutine spinned up for processing
- // the fresh results might have been rejected entry to to this present thread
- // not yet releasing the `processing` state.
- defer func() {
- if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
- d.process()
- }
- }()
- // Release the lock upon exit (note, before checking for reentry!)
- // the import statistics to zero.
- defer atomic.StoreInt32(&d.processing, 0)
-
- // Repeat the processing as long as there are results to process
+// chain. The type of import operation will depend on the result contents.
+func (d *Downloader) process() error {
+ pivot := d.queue.FastSyncPivot()
for {
- // Fetch the next batch of results
- pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
- results := d.queue.TakeResults()
+ results := d.queue.WaitResults()
if len(results) == 0 {
- return
+ return nil // queue empty
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
@@ -1597,7 +1553,7 @@ func (d *Downloader) process() {
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
- return
+ return errCancelProcessing
}
// Retrieve the a batch of results to import
var (
@@ -1633,8 +1589,7 @@ func (d *Downloader) process() {
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
- d.cancel()
- return
+ return err
}
// Shift the results to the next batch
results = results[items:]
@@ -1685,19 +1640,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
dropMeter.Mark(int64(packet.Items()))
}
}()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
-
+ if cancel == nil {
+ return errNoSyncActive
+ }
select {
case destCh <- packet:
return nil
-
case <-cancel:
return errNoSyncActive
}