diff options
author | Daniel A. Nagy <nagy.da@gmail.com> | 2015-05-11 18:47:14 +0800 |
---|---|---|
committer | Daniel A. Nagy <nagy.da@gmail.com> | 2015-05-11 18:47:14 +0800 |
commit | a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5 (patch) | |
tree | 6c16d3e2b216fdf0027a477a8975c9052930e34a /eth/downloader/downloader.go | |
parent | 1fe70a66ba2ef0f148affa7a72b4e65023474859 (diff) | |
parent | 5176fbc6faaa5e7f0305ad7f2b896c092781deaa (diff) | |
download | go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.gz go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.bz2 go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.lz go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.xz go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.zst go-tangerine-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.zip |
Merge branch 'develop' of github.com:ethereum/go-ethereum into develop
Conflicts:
rpc/jeth.go
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 62 |
1 files changed, 60 insertions, 2 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 18f8d2ba8..14ca2cd3d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,6 +34,9 @@ var ( errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") errBlockNumberOverflow = errors.New("received block which overflows") + errCancelHashFetch = errors.New("hash fetching cancelled (requested)") + errCancelBlockFetch = errors.New("block downloading cancelled (requested)") + errNoSyncActive = errors.New("no sync active") ) type hashCheckFn func(common.Hash) bool @@ -74,6 +77,7 @@ type Downloader struct { newPeerCh chan *peer hashCh chan hashPack blockCh chan blockPack + cancelCh chan struct{} } func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { @@ -129,6 +133,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) + // Create cancel channel for aborting midflight + d.cancelCh = make(chan struct{}) + // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue @@ -161,7 +168,6 @@ func (d *Downloader) Has(hash common.Hash) bool { } func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { - d.activePeer = p.id defer func() { // reset on error @@ -191,6 +197,42 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) return nil } +// Cancel cancels all of the operations and resets the queue. It returns true +// if the cancel operation was completed. +func (d *Downloader) Cancel() bool { + hs, bs := d.queue.Size() + // If we're not syncing just return. + if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { + return false + } + + close(d.cancelCh) + + // clean up +hashDone: + for { + select { + case <-d.hashCh: + default: + break hashDone + } + } + +blockDone: + for { + select { + case <-d.blockCh: + default: + break blockDone + } + } + + // reset the queue + d.queue.Reset() + + return true +} + // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) @@ -217,6 +259,8 @@ func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial b out: for { select { + case <-d.cancelCh: + return errCancelHashFetch case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != activePeer.id { @@ -305,6 +349,8 @@ func (d *Downloader) startFetchingBlocks(p *peer) error { out: for { select { + case <-d.cancelCh: + return errCancelBlockFetch case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. @@ -394,11 +440,23 @@ out: // Deliver a chunk to the downloader. This is usually done through the BlocksMsg by // the protocol handler. -func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) { +func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + d.blockCh <- blockPack{id, blocks} + + return nil } func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // make sure that the hashes that are being added are actually from the peer // that's the current active peer. hashes that have been received from other // peers are dropped and ignored. |