From ee0c8923035f44a44cfc9120255b807e90baada9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 13:47:21 +0300 Subject: eth/downloader: fix deliveries to check for sync cancels --- eth/downloader/downloader.go | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c6eecfe2f..04e9c3a21 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -70,7 +70,9 @@ type Downloader struct { newPeerCh chan *peer hashCh chan hashPack blockCh chan blockPack - cancelCh chan struct{} + + cancelCh chan struct{} // Channel to cancel mid-flight syncs + cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers } func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { @@ -83,6 +85,9 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), } + // Set the initial downloader state as canceled (sanity check) + downloader.cancelCh = make(chan struct{}) + close(downloader.cancelCh) return downloader } @@ -123,8 +128,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) - // Create cancel channel for aborting midflight + // Create cancel channel for aborting mid-flight + d.cancelLock.Lock() d.cancelCh = make(chan struct{}) + d.cancelLock.Unlock() // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { @@ -421,9 +428,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - d.blockCh <- blockPack{id, blocks} + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() - return nil + select { + case d.blockCh <- blockPack{id, blocks}: + return nil + + case <-cancel: + return errNoSyncActive + } } // DeliverHashes injects a new batch of hashes received from a remote node into @@ -434,11 +450,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - if glog.V(logger.Debug) && len(hashes) != 0 { - from, to := hashes[0], hashes[len(hashes)-1] - glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) - } - d.hashCh <- hashPack{id, hashes} + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() - return nil + select { + case d.hashCh <- hashPack{id, hashes}: + return nil + + case <-cancel: + return errNoSyncActive + } } -- cgit v1.2.3 From ec57aa64cda2e525687641971a54df15a04362d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 14:01:08 +0300 Subject: eth/downloader: sync the cancel channel during cancel too --- eth/downloader/downloader.go | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 04e9c3a21..0cbf42d30 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -190,32 +190,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. func (d *Downloader) Cancel() bool { - hs, bs := d.queue.Size() // If we're not syncing just return. + hs, bs := d.queue.Size() if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { return false } - + // Close the current cancel channel + d.cancelLock.RLock() close(d.cancelCh) - - // clean up -hashDone: - for { - select { - case <-d.hashCh: - default: - break hashDone - } - } - -blockDone: - for { - select { - case <-d.blockCh: - default: - break blockDone - } - } + d.cancelLock.RUnlock() // reset the queue d.queue.Reset() -- cgit v1.2.3 From 48ee0777a5acbf59aab691866eae5e9adf172f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 16:03:05 +0300 Subject: eth/downloader: add a user sync notificaton --- eth/downloader/downloader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0cbf42d30..ba380eca3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -65,6 +65,7 @@ type Downloader struct { // Status synchronising int32 + notified int32 // Channels newPeerCh chan *peer @@ -128,6 +129,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) + // Post a user notification of the sync (only once per session) + if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { + glog.V(logger.Info).Infoln("Block synchronisation started") + } // Create cancel channel for aborting mid-flight d.cancelLock.Lock() d.cancelCh = make(chan struct{}) -- cgit v1.2.3 From de3a71cafdbf20ad5887033aa991131bfb6e0ed2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 16:56:01 +0300 Subject: eth/downloader: remove a redundant sync progress check --- eth/downloader/downloader.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ba380eca3..5850ea13a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -86,10 +86,6 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), } - // Set the initial downloader state as canceled (sanity check) - downloader.cancelCh = make(chan struct{}) - close(downloader.cancelCh) - return downloader } -- cgit v1.2.3