From 6f415b96b3b8581e810a8f40f596d2d213681e54 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Thu, 4 Jun 2015 18:46:07 +0300
Subject: eth: implement the NewBlockHashes protocol proposal

---
 eth/handler.go  | 180 +++++++++++++++++++++++++++++++++++++-------------------
 eth/peer.go     |   4 ++
 eth/protocol.go |   2 +-
 3 files changed, 123 insertions(+), 63 deletions(-)

diff --git a/eth/handler.go b/eth/handler.go
index aea33452c..63ebc4bdd 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -2,6 +2,7 @@ package eth
 
 import (
 	"fmt"
+	"math"
 	"math/big"
 	"sync"
 	"time"
@@ -20,6 +21,7 @@ import (
 const (
 	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 	blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
+	blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 	blockProcAmount     = 256
 )
@@ -186,7 +188,6 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 	defer msg.Discard()
 
 	switch msg.Code {
-	case GetTxMsg: // ignore
 	case StatusMsg:
 		return errResp(ErrExtraStatusMsg, "uncontrolled status message")
 
@@ -227,6 +228,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 
 		// returns either requested hashes or nothing (i.e. not found)
 		return p.sendBlockHashes(hashes)
+
 	case BlockHashesMsg:
 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
 
@@ -266,6 +268,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 			}
 		}
 		return p.sendBlocks(blocks)
+
 	case BlocksMsg:
 		var blocks []*types.Block
 
@@ -274,7 +277,57 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 			glog.V(logger.Detail).Infoln("Decode error", err)
 			blocks = nil
 		}
-		self.downloader.DeliverBlocks(p.id, blocks)
+
+		// Either deliver to the downloader or the importer
+		if self.downloader.Synchronising() {
+			self.downloader.DeliverBlocks(p.id, blocks)
+		} else {
+			for _, block := range blocks {
+				if err := self.importBlock(p, block, nil); err != nil {
+					return err
+				}
+			}
+		}
+
+	case NewBlockHashesMsg:
+		// Retrieve and deseralize the remote new block hashes notification
+		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+
+		var hashes []common.Hash
+		if err := msgStream.Decode(&hashes); err != nil {
+			break
+		}
+		// Mark the hashes as present at the remote node
+		for _, hash := range hashes {
+			p.blockHashes.Add(hash)
+			p.recentHash = hash
+		}
+		// Wait a bit for potentially receiving the blocks, fetch if not
+		go func() {
+			time.Sleep(blockArrivalTimeout)
+
+			// Drop all the hashes that are already known
+			unknown := make([]common.Hash, 0, len(hashes))
+			for _, hash := range hashes {
+				if !self.chainman.HasBlock(hash) {
+					unknown = append(unknown, hash)
+				}
+			}
+			if len(unknown) == 0 {
+				return
+			}
+			// Retrieve all the unknown hashes
+			if err := p.requestBlocks(unknown); err != nil {
+				glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err)
+			}
+			if glog.V(logger.Detail) {
+				hashes := make([]string, len(unknown))
+				for i, hash := range unknown {
+					hashes[i] = fmt.Sprintf("%x", hash[:4])
+				}
+				glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes)
+			}
+		}()
 
 	case NewBlockMsg:
 		var request newBlockMsgData
@@ -286,83 +339,86 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 		}
 		request.Block.ReceivedAt = msg.ReceivedAt
 
-		hash := request.Block.Hash()
-		// Add the block hash as a known hash to the peer. This will later be used to determine
-		// who should receive this.
-		p.blockHashes.Add(hash)
-		// update the peer info
-		p.recentHash = hash
-		p.td = request.TD
-
-		_, chainHead, _ := self.chainman.Status()
-
-		jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
-			BlockHash:     hash.Hex(),
-			BlockNumber:   request.Block.Number(), // this surely must be zero
-			ChainHeadHash: chainHead.Hex(),
-			BlockPrevHash: request.Block.ParentHash().Hex(),
-			RemoteId:      p.ID().String(),
-		})
-
-		// Make sure the block isn't already known. If this is the case simply drop
-		// the message and move on. If the TD is < currentTd; drop it as well. If this
-		// chain at some point becomes canonical, the downloader will fetch it.
-		if self.chainman.HasBlock(hash) {
-			break
-		}
-		if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 {
-			glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD)
-			break
+		if err := self.importBlock(p, request.Block, request.TD); err != nil {
+			return err
 		}
 
-		// Attempt to insert the newly received by checking if the parent exists.
-		// if the parent exists we process the block and propagate to our peers
-		// otherwise synchronize with the peer
-		if self.chainman.HasBlock(request.Block.ParentHash()) {
-			if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
-				glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
-
-				self.removePeer(p.id)
-
-				return nil
-			}
-
-			if err := self.verifyTd(p, request); err != nil {
-				glog.V(logger.Error).Infoln(err)
-				// XXX for now return nil so it won't disconnect (we should in the future)
-				return nil
-			}
-			self.BroadcastBlock(hash, request.Block)
-		} else {
-			go self.synchronise(p)
-		}
 	default:
 		return errResp(ErrInvalidMsgCode, "%v", msg.Code)
 	}
 	return nil
 }
 
-func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error {
-	if request.Block.Td.Cmp(request.TD) != 0 {
-		glog.V(logger.Detail).Infoln(peer)
+// importBlocks injects a new block retrieved from the given peer into the chain
+// manager.
+func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) error {
+	hash := block.Hash()
 
-		return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD)
+	// Mark the block as present at the remote node (don't duplicate already held data)
+	p.blockHashes.Add(hash)
+	p.recentHash = hash
+	if td != nil {
+		p.td = td
+	}
+	// Log the block's arrival
+	_, chainHead, _ := pm.chainman.Status()
+	jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
+		BlockHash:     hash.Hex(),
+		BlockNumber:   block.Number(), // this surely must be zero
+		ChainHeadHash: chainHead.Hex(),
+		BlockPrevHash: block.ParentHash().Hex(),
+		RemoteId:      p.ID().String(),
+	})
+	// If the block's already known or its difficulty is lower than ours, drop
+	if pm.chainman.HasBlock(hash) {
+		p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value
+		return nil
+	}
+	if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 {
+		glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, block.Number(), td)
+		return nil
+	}
+	// Attempt to insert the newly received block and propagate to our peers
+	if pm.chainman.HasBlock(block.ParentHash()) {
+		if _, err := pm.chainman.InsertChain(types.Blocks{block}); err != nil {
+			glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error", err)
+			return err
+		}
+		if td != nil && block.Td.Cmp(td) != 0 {
+			err := fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", block.Number(), p.id, block.Td, td)
+			glog.V(logger.Error).Infoln(err)
+			return err
+		}
+		pm.BroadcastBlock(hash, block)
+		return nil
+	}
+	// Parent of the block is unknown, try to sync with this peer if it seems to be good
+	if td != nil {
+		go pm.synchronise(p)
 	}
-
 	return nil
 }
 
-// BroadcastBlock will propagate the block to its connected peers. It will sort
-// out which peers do not contain the block in their block set and will do a
-// sqrt(peers) to determine the amount of peers we broadcast to.
+// BroadcastBlock will propagate the block to a subset of its connected peers,
+// only notifying the rest of the block's appearance.
 func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
-	// Broadcast block to a batch of peers not knowing about it
+	// Retrieve all the target peers and split between full broadcast or only notification
 	peers := pm.peers.PeersWithoutBlock(hash)
-	//peers = peers[:int(math.Sqrt(float64(len(peers))))]
-	for _, peer := range peers {
+	split := int(math.Sqrt(float64(len(peers))))
+
+	transfer := peers[:split]
+	nofity := peers[split:]
+
+	// Send out the data transfers and the notifications
+	for _, peer := range nofity {
+		peer.sendNewBlockHashes([]common.Hash{hash})
+	}
+	glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.")
+
+	for _, peer := range transfer {
 		peer.sendNewBlock(block)
 	}
-	glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt))
+	glog.V(logger.Detail).Infoln("broadcast block to", len(transfer), "peers. Total processing time:", time.Since(block.ReceivedAt))
 }
 
 // BroadcastTx will propagate the block to its connected peers. It will sort
diff --git a/eth/peer.go b/eth/peer.go
index bb6a20349..1146ebde3 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -88,6 +88,10 @@ func (p *peer) sendBlocks(blocks []*types.Block) error {
 	return p2p.Send(p.rw, BlocksMsg, blocks)
 }
 
+func (p *peer) sendNewBlockHashes(hashes []common.Hash) error {
+	return p2p.Send(p.rw, NewBlockHashesMsg, hashes)
+}
+
 func (p *peer) sendNewBlock(block *types.Block) error {
 	p.blockHashes.Add(block.Hash())
 
diff --git a/eth/protocol.go b/eth/protocol.go
index 948051ed1..9ccf2cb60 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -17,7 +17,7 @@ const (
 // eth protocol message codes
 const (
 	StatusMsg = iota
-	GetTxMsg  // unused
+	NewBlockHashesMsg
 	TxMsg
 	GetBlockHashesMsg
 	BlockHashesMsg
-- 
cgit v1.2.3