From 5422fe51256e45c42939d1bfbcf13e07d2660f8e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Mon, 18 May 2015 21:33:37 +0300
Subject: eth: make the peer set thread safe

---
 eth/handler.go | 65 ++++++++++++++++++++++------------------------------------
 1 file changed, 24 insertions(+), 41 deletions(-)

(limited to 'eth/handler.go')

diff --git a/eth/handler.go b/eth/handler.go
index b2d741295..835097d84 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -47,9 +47,7 @@ type ProtocolManager struct {
 	txpool         txPool
 	chainman       *core.ChainManager
 	downloader     *downloader.Downloader
-
-	pmu   sync.Mutex
-	peers map[string]*peer
+	peers          *peerSet
 
 	SubProtocol p2p.Protocol
 
@@ -73,7 +71,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
 		txpool:     txpool,
 		chainman:   chainman,
 		downloader: downloader,
-		peers:      make(map[string]*peer),
+		peers:      newPeerSet(),
 		newPeerCh:  make(chan *peer, 1),
 		quitSync:   make(chan struct{}),
 	}
@@ -95,10 +93,14 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
 }
 
 func (pm *ProtocolManager) removePeer(peer *peer) {
-	pm.pmu.Lock()
-	defer pm.pmu.Unlock()
+	// Unregister the peer from the downloader
 	pm.downloader.UnregisterPeer(peer.id)
-	delete(pm.peers, peer.id)
+
+	// Remove the peer from the Ethereum peer set too
+	glog.V(logger.Detail).Infoln("Removing peer", peer.id)
+	if err := pm.peers.Unregister(peer.id); err != nil {
+		glog.V(logger.Error).Infoln("Removal failed:", err)
+	}
 }
 
 func (pm *ProtocolManager) Start() {
@@ -136,31 +138,32 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
 }
 
 func (pm *ProtocolManager) handle(p *peer) error {
+	// Execute the Ethereum handshake, short circuit if fails
 	if err := p.handleStatus(); err != nil {
 		return err
 	}
-	pm.pmu.Lock()
-	pm.peers[p.id] = p
-	pm.pmu.Unlock()
-
-	pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks)
-	defer func() {
-		pm.removePeer(p)
-	}()
+	// Register the peer locally and in the downloader too
+	glog.V(logger.Detail).Infoln("Adding peer", p.id)
+	if err := pm.peers.Register(p); err != nil {
+		glog.V(logger.Error).Infoln("Addition failed:", err)
+		return err
+	}
+	defer pm.removePeer(p)
 
+	if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil {
+		return err
+	}
 	// propagate existing transactions. new transactions appearing
 	// after this will be sent via broadcasts.
 	if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
 		return err
 	}
-
 	// main loop. handle incoming messages.
 	for {
 		if err := pm.handleMsg(p); err != nil {
 			return err
 		}
 	}
-
 	return nil
 }
 
@@ -346,18 +349,8 @@ func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error {
 // out which peers do not contain the block in their block set and will do a
 // sqrt(peers) to determine the amount of peers we broadcast to.
 func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
-	pm.pmu.Lock()
-	defer pm.pmu.Unlock()
-
-	// Find peers who don't know anything about the given hash. Peers that
-	// don't know about the hash will be a candidate for the broadcast loop
-	var peers []*peer
-	for _, peer := range pm.peers {
-		if !peer.blockHashes.Has(hash) {
-			peers = append(peers, peer)
-		}
-	}
-	// Broadcast block to peer set
+	// Broadcast block to a batch of peers not knowing about it
+	peers := pm.peers.BlockLackingPeers(hash)
 	peers = peers[:int(math.Sqrt(float64(len(peers))))]
 	for _, peer := range peers {
 		peer.sendNewBlock(block)
@@ -369,18 +362,8 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
 // out which peers do not contain the block in their block set and will do a
 // sqrt(peers) to determine the amount of peers we broadcast to.
 func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
-	pm.pmu.Lock()
-	defer pm.pmu.Unlock()
-
-	// Find peers who don't know anything about the given hash. Peers that
-	// don't know about the hash will be a candidate for the broadcast loop
-	var peers []*peer
-	for _, peer := range pm.peers {
-		if !peer.txHashes.Has(hash) {
-			peers = append(peers, peer)
-		}
-	}
-	// Broadcast block to peer set
+	// Broadcast transaction to a batch of peers not knowing about it
+	peers := pm.peers.TxLackingPeers(hash)
 	//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
 	for _, peer := range peers {
 		peer.sendTransaction(tx)
-- 
cgit v1.2.3