From 9d188f73b58ee1fe4bda00a9536bda4056755f2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 7 May 2015 21:07:20 +0300 Subject: eth, eth/downloader: make synchronize thread safe --- eth/downloader/downloader.go | 72 ++++++++------------------------------- eth/downloader/downloader_test.go | 2 +- eth/downloader/queue.go | 10 ------ 3 files changed, 16 insertions(+), 68 deletions(-) (limited to 'eth/downloader') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 25b251112..ef2a193ff 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -68,8 +68,7 @@ type Downloader struct { getBlock getBlockFn // Status - fetchingHashes int32 - downloadingBlocks int32 + synchronizing int32 // Channels newPeerCh chan *peer @@ -120,43 +119,26 @@ func (d *Downloader) UnregisterPeer(id string) { delete(d.peers, id) } -// SynchroniseWithPeer will select the peer and use it for synchronizing. If an empty string is given +// Synchronize will select the peer and use it for synchronizing. If an empty string is given // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronise(id string, hash common.Hash) error { - // Make sure it's doing neither. Once done we can restart the - // downloading process if the TD is higher. For now just get on - // with whatever is going on. This prevents unnecessary switching. - if d.isBusy() { - return errBusy +func (d *Downloader) Synchronize(id string, hash common.Hash) error { + // Make sure only one goroutine is ever allowed past this point at once + if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) { + return nil } + defer atomic.StoreInt32(&d.synchronizing, 0) - // When a synchronization attempt is made while the queue still - // contains items we abort the sync attempt - if done, pend := d.queue.Size(); done+pend > 0 { + // Abort if the queue still contains some leftover data + if _, cached := d.queue.Size(); cached > 0 { return errPendingQueue } - - // Fetch the peer using the id or throw an error if the peer couldn't be found + // Retrieve the origin peer and initiate the downloading process p := d.peers[id] if p == nil { return errUnknownPeer } - - // Get the hash from the peer and initiate the downloading progress. - err := d.getFromPeer(p, hash, false) - if err != nil { - return err - } - - return nil -} - -// Done lets the downloader know that whatever previous hashes were taken -// are processed. If the block count reaches zero and done is called -// we reset the queue for the next batch of incoming hashes and blocks. -func (d *Downloader) Done() { - d.queue.Done() + return d.getFromPeer(p, hash, false) } // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler @@ -176,6 +158,7 @@ func (d *Downloader) Has(hash common.Hash) bool { } func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { + d.activePeer = p.id defer func() { // reset on error @@ -184,7 +167,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) } }() - glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) + glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. @@ -200,20 +183,13 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) return err } - glog.V(logger.Detail).Infoln("Sync completed") + glog.V(logger.Debug).Infoln("Synchronization completed") return nil } // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { - atomic.StoreInt32(&d.fetchingHashes, 1) - defer atomic.StoreInt32(&d.fetchingHashes, 0) - - if d.queue.Has(h) { // TODO: Is this possible? Shouldn't queue be empty for startFetchingHashes to be even called? - return errAlreadyInPool - } - glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) start := time.Now() @@ -312,10 +288,8 @@ out: } func (d *Downloader) startFetchingBlocks(p *peer) error { - glog.V(logger.Detail).Infoln("Downloading", d.queue.Pending(), "block(s)") + glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") - atomic.StoreInt32(&d.downloadingBlocks, 1) - defer atomic.StoreInt32(&d.downloadingBlocks, 0) // Defer the peer reset. This will empty the peer requested set // and makes sure there are no lingering peers with an incorrect // state @@ -439,19 +413,3 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { return nil } - -func (d *Downloader) isFetchingHashes() bool { - return atomic.LoadInt32(&d.fetchingHashes) == 1 -} - -func (d *Downloader) isDownloadingBlocks() bool { - return atomic.LoadInt32(&d.downloadingBlocks) == 1 -} - -func (d *Downloader) isBusy() bool { - return d.isFetchingHashes() || d.isDownloadingBlocks() -} - -func (d *Downloader) IsBusy() bool { - return d.isBusy() -} diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index bd439d96a..f3402794b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types func (dl *downloadTester) sync(peerId string, hash common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, hash) + return dl.downloader.Synchronize(peerId, hash) } func (dl *downloadTester) hasBlock(hash common.Hash) bool { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index d849d4d68..515440bca 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -63,16 +63,6 @@ func (q *queue) Reset() { q.blockCache = nil } -// Done checks if all the downloads have been retrieved, wiping the queue. -func (q *queue) Done() { - q.lock.Lock() - defer q.lock.Unlock() - - if len(q.blockCache) == 0 { - q.Reset() - } -} - // Size retrieves the number of hashes in the queue, returning separately for // pending and already downloaded. func (q *queue) Size() (int, int) { -- cgit v1.2.3