diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-18 05:04:57 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-06-18 05:04:57 +0800 |
commit | 2f4cbe22f5207b830f2685caae175cce70bcd231 (patch) | |
tree | eac133c9dee26ec44418afc0fda6a1adf6e76555 /eth/downloader/downloader.go | |
parent | ae36beb38f356a08370e95559d04243140105c32 (diff) | |
download | go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.gz go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.bz2 go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.lz go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.xz go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.zst go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.zip |
eth, eth/downloader: fix processing interrupt caused by temp cancel
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index c7a05eb35..a79eabb3c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -87,6 +87,8 @@ type Downloader struct { checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain banned *set.Set // Set of hashes we've received and banned + interrupt int32 // Atomic boolean to signal termination + // Statistics importStart time.Time // Instance when the last blocks were taken from the cache importQueue []*Block // Previously taken blocks to check import progress @@ -245,12 +247,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - - // Create cancel channel for aborting mid-flight - d.cancelLock.Lock() - d.cancelCh = make(chan struct{}) - d.cancelLock.Unlock() - // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue @@ -260,12 +256,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { d.peers.Reset() d.checks = make(map[common.Hash]*crossCheck) + // Create cancel channel for aborting mid-flight + d.cancelLock.Lock() + d.cancelCh = make(chan struct{}) + d.cancelLock.Unlock() + // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) if p == nil { return errUnknownPeer } - return d.syncWithPeer(p, hash) } @@ -282,7 +282,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { defer func() { // reset on error if err != nil { - d.Cancel() + d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -301,9 +301,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { return nil } -// Cancel cancels all of the operations and resets the queue. It returns true +// cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. -func (d *Downloader) Cancel() { +func (d *Downloader) cancel() { // Close the current cancel channel d.cancelLock.Lock() if d.cancelCh != nil { @@ -320,6 +320,12 @@ func (d *Downloader) Cancel() { d.queue.Reset() } +// Terminate interrupts the downloader, canceling all pending operations. +func (d *Downloader) Terminate() { + atomic.StoreInt32(&d.interrupt, 1) + d.cancel() +} + // fetchHahes starts retrieving hashes backwards from a specific peer and hash, // up until it finds a common ancestor. If the source peer times out, alternative // ones are tried for continuation. @@ -737,12 +743,6 @@ func (d *Downloader) process() (err error) { atomic.StoreInt32(&d.processing, 0) }() - - // Fetch the current cancel channel to allow termination - d.cancelLock.RLock() - cancel := d.cancelCh - d.cancelLock.RUnlock() - // Repeat the processing as long as there are blocks to import for { // Fetch the next batch of blocks @@ -759,12 +759,10 @@ func (d *Downloader) process() (err error) { // Actually import the blocks glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) - for len(blocks) != 0 { // TODO: quit + for len(blocks) != 0 { // Check for any termination requests - select { - case <-cancel: + if atomic.LoadInt32(&d.interrupt) == 1 { return errCancelChainImport - default: } // Retrieve the first batch of blocks to insert max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) @@ -777,7 +775,7 @@ func (d *Downloader) process() (err error) { if err != nil { glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) d.dropPeer(blocks[index].OriginPeer) - d.Cancel() + d.cancel() return errCancelChainImport } blocks = blocks[max:] |