From 60613b57d1956275bb475a53b5085c4ead4ceb2c Mon Sep 17 00:00:00 2001
From: obscuren <geffobscura@gmail.com>
Date: Sat, 18 Apr 2015 17:35:03 +0200
Subject: downloader: make sure that hashes are only accepted from the active
 peer

---
 eth/downloader/downloader.go | 63 +++++++++++++++++++++++++++++++-------------
 eth/handler.go               | 11 ++++++--
 2 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index c71cfa684..41484e927 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -2,6 +2,7 @@ package downloader
 
 import (
 	"errors"
+	"fmt"
 	"math"
 	"math/big"
 	"sync"
@@ -18,8 +19,10 @@ import (
 
 const (
 	maxBlockFetch       = 256              // Amount of max blocks to be fetched per chunk
-	minDesiredPeerCount = 3                // Amount of peers desired to start syncing
-	blockTtl            = 15 * time.Second // The amount of time it takes for a request to time out
+	minDesiredPeerCount = 5                // Amount of peers desired to start syncing
+	peerCountTimeout    = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
+	blockTtl            = 15 * time.Second // The amount of time it takes for a block request to time out
+	hashTtl             = 20 * time.Second // The amount of time it takes for a hash request to time out
 )
 
 var (
@@ -34,9 +37,10 @@ type hashIterFn func() (common.Hash, error)
 type currentTdFn func() *big.Int
 
 type Downloader struct {
-	mu    sync.RWMutex
-	queue *queue
-	peers peers
+	mu         sync.RWMutex
+	queue      *queue
+	peers      peers
+	activePeer string
 
 	// Callbacks
 	hasBlock    hashCheckFn
@@ -51,7 +55,7 @@ type Downloader struct {
 	// Channels
 	newPeerCh chan *peer
 	syncCh    chan syncPack
-	HashCh    chan []common.Hash
+	hashCh    chan []common.Hash
 	blockCh   chan blockPack
 	quit      chan struct{}
 }
@@ -76,7 +80,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn)
 		currentTd:   currentTd,
 		newPeerCh:   make(chan *peer, 1),
 		syncCh:      make(chan syncPack, 1),
-		HashCh:      make(chan []common.Hash, 1),
+		hashCh:      make(chan []common.Hash, 1),
 		blockCh:     make(chan blockPack, 1),
 		quit:        make(chan struct{}),
 	}
@@ -181,8 +185,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
 
 func (d *Downloader) peerHandler() {
 	// itimer is used to determine when to start ignoring `minDesiredPeerCount`
-	//itimer := time.NewTicker(5 * time.Second)
-	itimer := time.NewTimer(5 * time.Second)
+	itimer := time.NewTimer(peerCountTimeout)
 out:
 	for {
 		select {
@@ -233,12 +236,16 @@ out:
 	for {
 		select {
 		case sync := <-d.syncCh:
-			selectedPeer := sync.peer
-			glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id)
+			start := time.Now()
+
+			var peer *peer = sync.peer
+
+			d.activePeer = peer.id
+			glog.V(logger.Detail).Infoln("Synchronising with the network using:", peer.id)
 			// Start the fetcher. This will block the update entirely
 			// interupts need to be send to the appropriate channels
 			// respectively.
-			if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
+			if err := d.startFetchingHashes(peer, sync.hash, sync.ignoreInitial); err != nil {
 				// handle error
 				glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
 				// XXX Reset
@@ -249,13 +256,13 @@ out:
 			// take any available peers, seserve a chunk for each peer available,
 			// let the peer deliver the chunkn and periodically check if a peer
 			// has timedout. When done downloading, process blocks.
-			if err := d.startFetchingBlocks(selectedPeer); err != nil {
+			if err := d.startFetchingBlocks(peer); err != nil {
 				glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
 				// XXX reset
 				break
 			}
 
-			glog.V(logger.Detail).Infoln("Sync completed")
+			glog.V(logger.Detail).Infoln("Network sync completed in", time.Since(start))
 
 			d.process()
 		case <-d.quit:
@@ -282,10 +289,12 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
 	// Get the first batch of hashes
 	p.getHashes(hash)
 
+	failureResponse := time.NewTimer(hashTtl)
+
 out:
 	for {
 		select {
-		case hashes := <-d.HashCh:
+		case hashes := <-d.hashCh:
 			var done bool // determines whether we're done fetching hashes (i.e. common hash found)
 			hashSet := set.New()
 			for _, hash := range hashes {
@@ -313,15 +322,20 @@ out:
 			} else { // we're done
 				break out
 			}
+		case <-failureResponse.C:
+			glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
+			d.queue.reset()
+
+			break out
 		}
 	}
-	glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
+	glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
 
 	return nil
 }
 
 func (d *Downloader) startFetchingBlocks(p *peer) error {
-	glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks")
+	glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)")
 	atomic.StoreInt32(&d.downloadingBlocks, 1)
 	defer atomic.StoreInt32(&d.downloadingBlocks, 0)
 
@@ -407,7 +421,20 @@ out:
 		}
 	}
 
-	glog.V(logger.Detail).Infoln("Download blocks: done. Took", time.Since(start))
+	glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
+
+	return nil
+}
+
+func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
+	// make sure that the hashes that are being added are actually from the peer
+	// that's the current active peer. hashes that have been received from other
+	// peers are dropped and ignored.
+	if d.activePeer != id {
+		return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
+	}
+
+	d.hashCh <- hashes
 
 	return nil
 }
diff --git a/eth/handler.go b/eth/handler.go
index f3fad68b7..3aa9815f1 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -194,7 +194,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 		if err := msgStream.Decode(&hashes); err != nil {
 			break
 		}
-		self.downloader.HashCh <- hashes
+		err := self.downloader.AddHashes(p.id, hashes)
+		if err != nil {
+			glog.V(logger.Debug).Infoln(err)
+		}
 
 	case GetBlocksMsg:
 		msgStream := rlp.NewStream(msg.Payload)
@@ -259,7 +262,11 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 		// Make sure the block isn't already known. If this is the case simply drop
 		// the message and move on. If the TD is < currentTd; drop it as well. If this
 		// chain at some point becomes canonical, the downloader will fetch it.
-		if self.chainman.HasBlock(hash) && self.chainman.Td().Cmp(request.TD) > 0 {
+		if self.chainman.HasBlock(hash) {
+			break
+		}
+		if self.chainman.Td().Cmp(request.TD) > 0 {
+			glog.V(logger.Debug).Infoln("dropped block", request.Block.Number(), "due to low TD", request.TD)
 			break
 		}
 
-- 
cgit v1.2.3