From f459a3f0ae43eac29b597427be6602970f10334c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 23 Sep 2015 12:39:17 +0300 Subject: eth/downloader: always send termination wakes, clean leftover --- eth/downloader/downloader.go | 56 ++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 20 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f038e24e4..d1a716c5f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -154,7 +154,7 @@ type Downloader struct { blockCh chan blockPack // [eth/61] Channel receiving inbound blocks headerCh chan headerPack // [eth/62] Channel receiving inbound block headers bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies - processCh chan bool // Channel to signal the block fetcher of new or finished work + wakeCh chan bool // Channel to signal the block/body fetcher of new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -188,7 +188,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he blockCh: make(chan blockPack, 1), headerCh: make(chan headerPack, 1), bodyCh: make(chan bodyPack, 1), - processCh: make(chan bool, 1), + wakeCh: make(chan bool, 1), } } @@ -282,6 +282,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error d.queue.Reset() d.peers.Reset() + select { + case <-d.wakeCh: + default: + } // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) @@ -633,7 +637,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: no available hashes", p) select { - case d.processCh <- false: + case d.wakeCh <- false: case <-d.cancelCh: } // If no hashes were retrieved at all, the peer violated it's TD promise that it had a @@ -664,12 +668,18 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { return errBadPeer } // Notify the block fetcher of new hashes, but stop if queue is full - cont := d.queue.Pending() < maxQueuedHashes - select { - case d.processCh <- cont: - default: - } - if !cont { + if d.queue.Pending() < maxQueuedHashes { + // We still have hashes to fetch, send continuation wake signal (potential) + select { + case d.wakeCh <- true: + default: + } + } else { + // Hash limit reached, send a termination wake signal (enforced) + select { + case d.wakeCh <- false: + case <-d.cancelCh: + } return nil } // Queue not yet full, fetch the next batch @@ -766,7 +776,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { default: } - case cont := <-d.processCh: + case cont := <-d.wakeCh: // The hash fetcher sent a continuation flag, check if it's done if !cont { finished = true @@ -1053,7 +1063,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: no available headers", p) select { - case d.processCh <- false: + case d.wakeCh <- false: case <-d.cancelCh: } // If no headers were retrieved at all, the peer violated it's TD promise that it had a @@ -1084,12 +1094,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { return errBadPeer } // Notify the block fetcher of new headers, but stop if queue is full - cont := d.queue.Pending() < maxQueuedHeaders - select { - case d.processCh <- cont: - default: - } - if !cont { + if d.queue.Pending() < maxQueuedHeaders { + // We still have headers to fetch, send continuation wake signal (potential) + select { + case d.wakeCh <- true: + default: + } + } else { + // Header limit reached, send a termination wake signal (enforced) + select { + case d.wakeCh <- false: + case <-d.cancelCh: + } return nil } // Queue not yet full, fetch the next batch @@ -1104,8 +1120,8 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // Finish the sync gracefully instead of dumping the gathered data though select { - case d.processCh <- false: - default: + case d.wakeCh <- false: + case <-d.cancelCh: } return nil } @@ -1199,7 +1215,7 @@ func (d *Downloader) fetchBodies(from uint64) error { default: } - case cont := <-d.processCh: + case cont := <-d.wakeCh: // The header fetcher sent a continuation flag, check if it's done if !cont { finished = true -- cgit v1.2.3