From d2d5dbc6fbdb613a0c5b1967ee82a74cd94739a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 13 May 2015 13:13:43 +0300 Subject: eth/downloader: fix active peer shadowing, polish func names --- eth/downloader/downloader.go | 28 ++++++++++------------------ eth/downloader/downloader_test.go | 8 ++++---- eth/handler.go | 4 ++-- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 577152a21..c6eecfe2f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -55,10 +55,9 @@ type hashPack struct { } type Downloader struct { - mu sync.RWMutex - queue *queue - peers *peerSet - activePeer string + mu sync.RWMutex + queue *queue + peers *peerSet // Callbacks hasBlock hashCheckFn @@ -162,7 +161,6 @@ func (d *Downloader) Has(hash common.Hash) bool { // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { - d.activePeer = p.id defer func() { // reset on error if err != nil { @@ -416,32 +414,26 @@ out: return nil } -// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by -// the protocol handler. -func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error { +// DeliverBlocks injects a new batch of blocks received from a remote node. +// This is usually invoked through the BlocksMsg by the protocol handler. +func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - d.blockCh <- blockPack{id, blocks} return nil } -func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { +// DeliverHashes injects a new batch of hashes received from a remote node into +// the download schedule. This is usually invoked through the BlockHashesMsg by +// the protocol handler. +func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error { // Make sure the downloader is active if atomic.LoadInt32(&d.synchronising) == 0 { return errNoSyncActive } - - // make sure that the hashes that are being added are actually from the peer - // that's the current active peer. hashes that have been received from other - // peers are dropped and ignored. - if d.activePeer != id { - return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer) - } - if glog.V(logger.Debug) && len(hashes) != 0 { from, to := hashes[0], hashes[len(hashes)-1] glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 385ad2909..78eff011a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -76,7 +76,7 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { } func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.AddHashes(dl.activePeerId, dl.hashes) + dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes) return nil } @@ -87,7 +87,7 @@ func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { blocks[i] = dl.blocks[hash] } - go dl.downloader.DeliverChunk(id, blocks) + go dl.downloader.DeliverBlocks(id, blocks) return nil } @@ -188,12 +188,12 @@ func TestInactiveDownloader(t *testing.T) { blocks := createBlocksFromHashSet(createHashSet(hashes)) tester := newTester(t, hashes, nil) - err := tester.downloader.AddHashes("bad peer 001", hashes) + err := tester.downloader.DeliverHashes("bad peer 001", hashes) if err != errNoSyncActive { t.Error("expected no sync error, got", err) } - err = tester.downloader.DeliverChunk("bad peer 001", blocks) + err = tester.downloader.DeliverBlocks("bad peer 001", blocks) if err != errNoSyncActive { t.Error("expected no sync error, got", err) } diff --git a/eth/handler.go b/eth/handler.go index 88394543e..b2d741295 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -224,7 +224,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } - err := self.downloader.AddHashes(p.id, hashes) + err := self.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } @@ -264,7 +264,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - self.downloader.DeliverChunk(p.id, blocks) + self.downloader.DeliverBlocks(p.id, blocks) case NewBlockMsg: var request newBlockMsgData -- cgit v1.2.3