From 9d188f73b58ee1fe4bda00a9536bda4056755f2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 7 May 2015 21:07:20 +0300 Subject: eth, eth/downloader: make synchronize thread safe --- eth/downloader/downloader.go | 72 ++++++++------------------------------- eth/downloader/downloader_test.go | 2 +- eth/downloader/queue.go | 10 ------ eth/handler.go | 4 +-- eth/sync.go | 16 +++------ 5 files changed, 22 insertions(+), 82 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 25b251112..ef2a193ff 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -68,8 +68,7 @@ type Downloader struct { getBlock getBlockFn // Status - fetchingHashes int32 - downloadingBlocks int32 + synchronizing int32 // Channels newPeerCh chan *peer @@ -120,43 +119,26 @@ func (d *Downloader) UnregisterPeer(id string) { delete(d.peers, id) } -// SynchroniseWithPeer will select the peer and use it for synchronizing. If an empty string is given +// Synchronize will select the peer and use it for synchronizing. If an empty string is given // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronise(id string, hash common.Hash) error { - // Make sure it's doing neither. Once done we can restart the - // downloading process if the TD is higher. For now just get on - // with whatever is going on. This prevents unnecessary switching. - if d.isBusy() { - return errBusy +func (d *Downloader) Synchronize(id string, hash common.Hash) error { + // Make sure only one goroutine is ever allowed past this point at once + if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) { + return nil } + defer atomic.StoreInt32(&d.synchronizing, 0) - // When a synchronization attempt is made while the queue still - // contains items we abort the sync attempt - if done, pend := d.queue.Size(); done+pend > 0 { + // Abort if the queue still contains some leftover data + if _, cached := d.queue.Size(); cached > 0 { return errPendingQueue } - - // Fetch the peer using the id or throw an error if the peer couldn't be found + // Retrieve the origin peer and initiate the downloading process p := d.peers[id] if p == nil { return errUnknownPeer } - - // Get the hash from the peer and initiate the downloading progress. - err := d.getFromPeer(p, hash, false) - if err != nil { - return err - } - - return nil -} - -// Done lets the downloader know that whatever previous hashes were taken -// are processed. If the block count reaches zero and done is called -// we reset the queue for the next batch of incoming hashes and blocks. -func (d *Downloader) Done() { - d.queue.Done() + return d.getFromPeer(p, hash, false) } // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler @@ -176,6 +158,7 @@ func (d *Downloader) Has(hash common.Hash) bool { } func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { + d.activePeer = p.id defer func() { // reset on error @@ -184,7 +167,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) } }() - glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id) + glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. @@ -200,20 +183,13 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) return err } - glog.V(logger.Detail).Infoln("Sync completed") + glog.V(logger.Debug).Infoln("Synchronization completed") return nil } // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { - atomic.StoreInt32(&d.fetchingHashes, 1) - defer atomic.StoreInt32(&d.fetchingHashes, 0) - - if d.queue.Has(h) { // TODO: Is this possible? Shouldn't queue be empty for startFetchingHashes to be even called? - return errAlreadyInPool - } - glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) start := time.Now() @@ -312,10 +288,8 @@ out: } func (d *Downloader) startFetchingBlocks(p *peer) error { - glog.V(logger.Detail).Infoln("Downloading", d.queue.Pending(), "block(s)") + glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") - atomic.StoreInt32(&d.downloadingBlocks, 1) - defer atomic.StoreInt32(&d.downloadingBlocks, 0) // Defer the peer reset. This will empty the peer requested set // and makes sure there are no lingering peers with an incorrect // state @@ -439,19 +413,3 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { return nil } - -func (d *Downloader) isFetchingHashes() bool { - return atomic.LoadInt32(&d.fetchingHashes) == 1 -} - -func (d *Downloader) isDownloadingBlocks() bool { - return atomic.LoadInt32(&d.downloadingBlocks) == 1 -} - -func (d *Downloader) isBusy() bool { - return d.isFetchingHashes() || d.isDownloadingBlocks() -} - -func (d *Downloader) IsBusy() bool { - return d.isBusy() -} diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index bd439d96a..f3402794b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types func (dl *downloadTester) sync(peerId string, hash common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, hash) + return dl.downloader.Synchronize(peerId, hash) } func (dl *downloadTester) hasBlock(hash common.Hash) bool { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index d849d4d68..515440bca 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -63,16 +63,6 @@ func (q *queue) Reset() { q.blockCache = nil } -// Done checks if all the downloads have been retrieved, wiping the queue. -func (q *queue) Done() { - q.lock.Lock() - defer q.lock.Unlock() - - if len(q.blockCache) == 0 { - q.Reset() - } -} - // Size retrieves the number of hashes in the queue, returning separately for // pending and already downloaded. func (q *queue) Size() (int, int) { diff --git a/eth/handler.go b/eth/handler.go index 1e0663816..b2018f336 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -307,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // Attempt to insert the newly received by checking if the parent exists. // if the parent exists we process the block and propagate to our peers - // otherwise synchronise with the peer + // otherwise synchronize with the peer if self.chainman.HasBlock(request.Block.ParentHash()) { if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") @@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } self.BroadcastBlock(hash, request.Block) } else { - go self.synchronise(p) + go self.synchronize(p) } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) diff --git a/eth/sync.go b/eth/sync.go index 9e8b21a7c..b259c1d47 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -32,14 +32,14 @@ func (pm *ProtocolManager) update() { } itimer.Stop() - go pm.synchronise(peer) + go pm.synchronize(peer) case <-itimer.C: // The timer will make sure that the downloader keeps an active state // in which it attempts to always check the network for highest td peers // Either select the peer or restart the timer if no peers could // be selected. if peer := getBestPeer(pm.peers); peer != nil { - go pm.synchronise(peer) + go pm.synchronize(peer) } else { itimer.Reset(5 * time.Second) } @@ -63,7 +63,6 @@ func (pm *ProtocolManager) processBlocks() error { if len(blocks) == 0 { return nil } - defer pm.downloader.Done() glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) @@ -78,26 +77,19 @@ func (pm *ProtocolManager) processBlocks() error { return nil } -func (pm *ProtocolManager) synchronise(peer *peer) { +func (pm *ProtocolManager) synchronize(peer *peer) { // Make sure the peer's TD is higher than our own. If not drop. if peer.td.Cmp(pm.chainman.Td()) <= 0 { return } - // Check downloader if it's busy so it doesn't show the sync message - // for every attempty - if pm.downloader.IsBusy() { - return - } - // FIXME if we have the hash in our chain and the TD of the peer is // much higher than ours, something is wrong with us or the peer. // Check if the hash is on our own chain if pm.chainman.HasBlock(peer.recentHash) { return } - // Get the hashes from the peer (synchronously) - err := pm.downloader.Synchronise(peer.id, peer.recentHash) + err := pm.downloader.Synchronize(peer.id, peer.recentHash) if err != nil && err == downloader.ErrBadPeer { glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action") pm.removePeer(peer) -- cgit v1.2.3