aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-09-23 17:39:17 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-09-23 17:39:17 +0800
commitf459a3f0ae43eac29b597427be6602970f10334c (patch)
tree58f45be69e5479a91a26d31092b874dded973f0f /eth/downloader
parente456f27795d3d306d4bb52ef0101b9cdad7a27cd (diff)
downloadgo-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.tar
go-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.tar.gz
go-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.tar.bz2
go-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.tar.lz
go-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.tar.xz
go-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.tar.zst
go-tangerine-f459a3f0ae43eac29b597427be6602970f10334c.zip
eth/downloader: always send termination wakes, clean leftover
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go56
1 files changed, 36 insertions, 20 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index f038e24e4..d1a716c5f 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -154,7 +154,7 @@ type Downloader struct {
blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies
- processCh chan bool // Channel to signal the block fetcher of new or finished work
+ wakeCh chan bool // Channel to signal the block/body fetcher of new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@@ -188,7 +188,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
blockCh: make(chan blockPack, 1),
headerCh: make(chan headerPack, 1),
bodyCh: make(chan bodyPack, 1),
- processCh: make(chan bool, 1),
+ wakeCh: make(chan bool, 1),
}
}
@@ -282,6 +282,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
d.queue.Reset()
d.peers.Reset()
+ select {
+ case <-d.wakeCh:
+ default:
+ }
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
@@ -633,7 +637,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available hashes", p)
select {
- case d.processCh <- false:
+ case d.wakeCh <- false:
case <-d.cancelCh:
}
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
@@ -664,12 +668,18 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return errBadPeer
}
// Notify the block fetcher of new hashes, but stop if queue is full
- cont := d.queue.Pending() < maxQueuedHashes
- select {
- case d.processCh <- cont:
- default:
- }
- if !cont {
+ if d.queue.Pending() < maxQueuedHashes {
+ // We still have hashes to fetch, send continuation wake signal (potential)
+ select {
+ case d.wakeCh <- true:
+ default:
+ }
+ } else {
+ // Hash limit reached, send a termination wake signal (enforced)
+ select {
+ case d.wakeCh <- false:
+ case <-d.cancelCh:
+ }
return nil
}
// Queue not yet full, fetch the next batch
@@ -766,7 +776,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
default:
}
- case cont := <-d.processCh:
+ case cont := <-d.wakeCh:
// The hash fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
@@ -1053,7 +1063,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available headers", p)
select {
- case d.processCh <- false:
+ case d.wakeCh <- false:
case <-d.cancelCh:
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
@@ -1084,12 +1094,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return errBadPeer
}
// Notify the block fetcher of new headers, but stop if queue is full
- cont := d.queue.Pending() < maxQueuedHeaders
- select {
- case d.processCh <- cont:
- default:
- }
- if !cont {
+ if d.queue.Pending() < maxQueuedHeaders {
+ // We still have headers to fetch, send continuation wake signal (potential)
+ select {
+ case d.wakeCh <- true:
+ default:
+ }
+ } else {
+ // Header limit reached, send a termination wake signal (enforced)
+ select {
+ case d.wakeCh <- false:
+ case <-d.cancelCh:
+ }
return nil
}
// Queue not yet full, fetch the next batch
@@ -1104,8 +1120,8 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
// Finish the sync gracefully instead of dumping the gathered data though
select {
- case d.processCh <- false:
- default:
+ case d.wakeCh <- false:
+ case <-d.cancelCh:
}
return nil
}
@@ -1199,7 +1215,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
default:
}
- case cont := <-d.processCh:
+ case cont := <-d.wakeCh:
// The header fetcher sent a continuation flag, check if it's done
if !cont {
finished = true