From 7c2af1c11722dc3175a98342c060afcfaf6a275f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Tue, 16 Jun 2015 11:58:32 +0300
Subject: eth, eth/fetcher: separate notification sync mechanism

---
 eth/downloader/downloader.go |   1 +
 eth/fetcher/fetcher.go       | 258 +++++++++++++++++++++++++++++++++++++++++++
 eth/handler.go               |  71 +++++-------
 eth/sync.go                  | 145 ++----------------------
 4 files changed, 293 insertions(+), 182 deletions(-)
 create mode 100644 eth/fetcher/fetcher.go

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 9866a5b46..18e5f50e8 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,3 +1,4 @@
+// Package downloader contains the manual full chain synchronisation.
 package downloader
 
 import (
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
new file mode 100644
index 000000000..19c53048c
--- /dev/null
+++ b/eth/fetcher/fetcher.go
@@ -0,0 +1,258 @@
+// Package fetcher contains the block announcement based synchonisation.
+package fetcher
+
+import (
+	"errors"
+	"math/rand"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
+)
+
+const (
+	arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+	fetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
+)
+
+var (
+	errTerminated = errors.New("terminated")
+)
+
+// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
+type hashCheckFn func(common.Hash) bool
+
+// blockRequesterFn is a callback type for sending a block retrieval request.
+type blockRequesterFn func([]common.Hash) error
+
+// blockImporterFn is a callback type for trying to inject a block into the local chain.
+type blockImporterFn func(peer string, block *types.Block) error
+
+// announce is the hash notification of the availability of a new block in the
+// network.
+type announce struct {
+	hash common.Hash // Hash of the block being announced
+	time time.Time   // Timestamp of the announcement
+
+	origin string           // Identifier of the peer originating the notification
+	fetch  blockRequesterFn // Fetcher function to retrieve
+}
+
+// Fetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type Fetcher struct {
+	// Various event channels
+	notify chan *announce
+	filter chan chan []*types.Block
+	quit   chan struct{}
+
+	// Callbacks
+	hasBlock    hashCheckFn     // Checks if a block is present in the chain
+	importBlock blockImporterFn // Injects a block from an origin peer into the chain
+}
+
+// New creates a block fetcher to retrieve blocks based on hash announcements.
+func New(hasBlock hashCheckFn, importBlock blockImporterFn) *Fetcher {
+	return &Fetcher{
+		notify:      make(chan *announce),
+		filter:      make(chan chan []*types.Block),
+		quit:        make(chan struct{}),
+		hasBlock:    hasBlock,
+		importBlock: importBlock,
+	}
+}
+
+// Start boots up the announcement based synchoniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *Fetcher) Start() {
+	go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *Fetcher) Stop() {
+	close(f.quit)
+}
+
+// Notify announces the fetcher of the potential availability of a new block in
+// the network.
+func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher blockRequesterFn) error {
+	block := &announce{
+		hash:   hash,
+		time:   time,
+		origin: peer,
+		fetch:  fetcher,
+	}
+	select {
+	case f.notify <- block:
+		return nil
+	case <-f.quit:
+		return errTerminated
+	}
+}
+
+// Filter extracts all the blocks that were explicitly requested by the fetcher,
+// returning those that should be handled differently.
+func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
+	// Send the filter channel to the fetcher
+	filter := make(chan []*types.Block)
+
+	select {
+	case f.filter <- filter:
+	case <-f.quit:
+		return nil
+	}
+	// Request the filtering of the block list
+	select {
+	case filter <- blocks:
+	case <-f.quit:
+		return nil
+	}
+	// Retrieve the blocks remaining after filtering
+	select {
+	case blocks := <-filter:
+		return blocks
+	case <-f.quit:
+		return nil
+	}
+}
+
+// Loop is the main fetcher loop, checking and processing various notification
+// events.
+func (f *Fetcher) loop() {
+	announced := make(map[common.Hash][]*announce)
+	fetching := make(map[common.Hash]*announce)
+	fetch := time.NewTimer(0)
+	done := make(chan common.Hash)
+
+	// Iterate the block fetching until a quit is requested
+	for {
+		// Clean up any expired block fetches
+		for hash, announce := range fetching {
+			if time.Since(announce.time) > fetchTimeout {
+				delete(announced, hash)
+				delete(fetching, hash)
+			}
+		}
+		// Wait for an outside event to occur
+		select {
+		case <-f.quit:
+			// Fetcher terminating, abort all operations
+			return
+
+		case notification := <-f.notify:
+			// A block was announced, schedule if it's not yet downloading
+			glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4])
+			if _, ok := fetching[notification.hash]; ok {
+				break
+			}
+			if len(announced) == 0 {
+				fetch.Reset(arriveTimeout)
+			}
+			announced[notification.hash] = append(announced[notification.hash], notification)
+
+		case hash := <-done:
+			// A pending import finished, remove all traces of the notification
+			delete(announced, hash)
+			delete(fetching, hash)
+
+		case <-fetch.C:
+			// At least one block's timer ran out, check for needing retrieval
+			request := make(map[string][]common.Hash)
+
+			for hash, announces := range announced {
+				if time.Since(announces[0].time) > arriveTimeout {
+					announce := announces[rand.Intn(len(announces))]
+					if !f.hasBlock(hash) {
+						request[announce.origin] = append(request[announce.origin], hash)
+						fetching[hash] = announce
+					}
+					delete(announced, hash)
+				}
+			}
+			// Send out all block requests
+			for peer, hashes := range request {
+				glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes))
+				go fetching[hashes[0]].fetch(hashes)
+			}
+			// Schedule the next fetch if blocks are still pending
+			if len(announced) > 0 {
+				nearest := time.Now()
+				for _, announces := range announced {
+					if nearest.Before(announces[0].time) {
+						nearest = announces[0].time
+					}
+				}
+				fetch.Reset(arriveTimeout + time.Since(nearest))
+			}
+
+		case filter := <-f.filter:
+			// Blocks arrived, extract any explicit fetches, return all else
+			var blocks types.Blocks
+			select {
+			case blocks = <-filter:
+			case <-f.quit:
+				return
+			}
+
+			explicit, download := []*types.Block{}, []*types.Block{}
+			for _, block := range blocks {
+				hash := block.Hash()
+
+				// Filter explicitly requested blocks from hash announcements
+				if _, ok := fetching[hash]; ok {
+					// Discard if already imported by other means
+					if !f.hasBlock(hash) {
+						explicit = append(explicit, block)
+					} else {
+						delete(fetching, hash)
+					}
+				} else {
+					download = append(download, block)
+				}
+			}
+
+			select {
+			case filter <- download:
+			case <-f.quit:
+				return
+			}
+			// Create a closure with the retrieved blocks and origin peers
+			peers := make([]string, 0, len(explicit))
+			blocks = make([]*types.Block, 0, len(explicit))
+			for _, block := range explicit {
+				hash := block.Hash()
+				if announce := fetching[hash]; announce != nil {
+					// Drop the block if it surely cannot fit
+					if f.hasBlock(hash) || !f.hasBlock(block.ParentHash()) {
+						// delete(fetching, hash) // if we drop, it will re-fetch it, wait for timeout?
+						continue
+					}
+					// Otherwise accumulate for import
+					peers = append(peers, announce.origin)
+					blocks = append(blocks, block)
+				}
+			}
+			// If any explicit fetches were replied to, import them
+			if count := len(blocks); count > 0 {
+				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
+				go func() {
+					// Make sure all hashes are cleaned up
+					for _, block := range blocks {
+						hash := block.Hash()
+						defer func() { done <- hash }()
+					}
+					// Try and actually import the blocks
+					for i := 0; i < len(blocks); i++ {
+						if err := f.importBlock(peers[i], blocks[i]); err != nil {
+							glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
+							return
+						}
+					}
+				}()
+			}
+		}
+	}
+}
diff --git a/eth/handler.go b/eth/handler.go
index ec4f2d53a..99ac4ce68 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -7,6 +7,8 @@ import (
 	"sync"
 	"time"
 
+	"github.com/ethereum/go-ethereum/eth/fetcher"
+
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/types"
@@ -45,6 +47,7 @@ type ProtocolManager struct {
 	txpool         txPool
 	chainman       *core.ChainManager
 	downloader     *downloader.Downloader
+	fetcher        *fetcher.Fetcher
 	peers          *peerSet
 
 	SubProtocol p2p.Protocol
@@ -54,11 +57,9 @@ type ProtocolManager struct {
 	minedBlockSub event.Subscription
 
 	// channels for fetcher, syncer, txsyncLoop
-	newPeerCh  chan *peer
-	newHashCh  chan []*blockAnnounce
-	newBlockCh chan chan []*types.Block
-	txsyncCh   chan *txsync
-	quitSync   chan struct{}
+	newPeerCh chan *peer
+	txsyncCh  chan *txsync
+	quitSync  chan struct{}
 
 	// wait group is used for graceful shutdowns during downloading
 	// and processing
@@ -69,30 +70,33 @@ type ProtocolManager struct {
 // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 // with the ethereum network.
 func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
+	// Create the protocol manager and initialize peer handlers
 	manager := &ProtocolManager{
-		eventMux:   mux,
-		txpool:     txpool,
-		chainman:   chainman,
-		peers:      newPeerSet(),
-		newPeerCh:  make(chan *peer, 1),
-		newHashCh:  make(chan []*blockAnnounce, 1),
-		newBlockCh: make(chan chan []*types.Block),
-		txsyncCh:   make(chan *txsync),
-		quitSync:   make(chan struct{}),
+		eventMux:  mux,
+		txpool:    txpool,
+		chainman:  chainman,
+		peers:     newPeerSet(),
+		newPeerCh: make(chan *peer, 1),
+		txsyncCh:  make(chan *txsync),
+		quitSync:  make(chan struct{}),
 	}
-	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
 	manager.SubProtocol = p2p.Protocol{
 		Name:    "eth",
 		Version: uint(protocolVersion),
 		Length:  ProtocolLength,
 		Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
 			peer := manager.newPeer(protocolVersion, networkId, p, rw)
-
 			manager.newPeerCh <- peer
-
 			return manager.handle(peer)
 		},
 	}
+	// Construct the different synchronisation mechanisms
+	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
+
+	importer := func(peer string, block *types.Block) error {
+		return manager.importBlock(manager.peers.Peer(peer), block, nil)
+	}
+	manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer)
 
 	return manager
 }
@@ -126,7 +130,6 @@ func (pm *ProtocolManager) Start() {
 
 	// start sync handlers
 	go pm.syncer()
-	go pm.fetcher()
 	go pm.txsyncLoop()
 }
 
@@ -291,20 +294,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 			glog.V(logger.Detail).Infoln("Decode error", err)
 			blocks = nil
 		}
-		// Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
-		filter := make(chan []*types.Block)
-		select {
-		case <-self.quitSync:
-		case self.newBlockCh <- filter:
-			select {
-			case <-self.quitSync:
-			case filter <- blocks:
-				select {
-				case <-self.quitSync:
-				case blocks := <-filter:
-					self.downloader.DeliverBlocks(p.id, blocks)
-				}
-			}
+		// Filter out any explicitly requested blocks, deliver the rest to the downloader
+		if blocks := self.fetcher.Filter(blocks); len(blocks) > 0 {
+			self.downloader.DeliverBlocks(p.id, blocks)
 		}
 
 	case NewBlockHashesMsg:
@@ -327,19 +319,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 				unknown = append(unknown, hash)
 			}
 		}
-		announces := make([]*blockAnnounce, len(unknown))
-		for i, hash := range unknown {
-			announces[i] = &blockAnnounce{
-				hash: hash,
-				peer: p,
-				time: time.Now(),
-			}
-		}
-		if len(announces) > 0 {
-			select {
-			case self.newHashCh <- announces:
-			case <-self.quitSync:
-			}
+		for _, hash := range unknown {
+			self.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
 		}
 
 	case NewBlockMsg:
diff --git a/eth/sync.go b/eth/sync.go
index 751bc1a2a..82abb725f 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -12,11 +12,8 @@ import (
 )
 
 const (
-	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
-	notifyCheckCycle    = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
-	notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
-	notifyFetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
-	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
+	forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
+	minDesiredPeerCount = 5                // Amount of peers desired to start syncing
 
 	// This is the target size for the packs of transactions sent by txsyncLoop.
 	// A pack can get larger than this if a single transactions exceeds this size.
@@ -119,140 +116,15 @@ func (pm *ProtocolManager) txsyncLoop() {
 	}
 }
 
-// fetcher is responsible for collecting hash notifications, and periodically
-// checking all unknown ones and individually fetching them.
-func (pm *ProtocolManager) fetcher() {
-	announces := make(map[common.Hash][]*blockAnnounce)
-	request := make(map[*peer][]common.Hash)
-	pending := make(map[common.Hash]*blockAnnounce)
-	cycle := time.Tick(notifyCheckCycle)
-	done := make(chan common.Hash)
-
-	// Iterate the block fetching until a quit is requested
-	for {
-		select {
-		case notifications := <-pm.newHashCh:
-			// A batch of hashes the notified, schedule them for retrieval
-			glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id)
-			for _, announce := range notifications {
-				// Skip if it's already pending fetch
-				if _, ok := pending[announce.hash]; ok {
-					continue
-				}
-				// Otherwise queue up the peer as a potential source
-				announces[announce.hash] = append(announces[announce.hash], announce)
-			}
-
-		case hash := <-done:
-			// A pending import finished, remove all traces
-			delete(pending, hash)
-
-		case <-cycle:
-			// Clean up any expired block fetches
-			for hash, announce := range pending {
-				if time.Since(announce.time) > notifyFetchTimeout {
-					delete(pending, hash)
-				}
-			}
-			// Check if any notified blocks failed to arrive
-			for hash, all := range announces {
-				if time.Since(all[0].time) > notifyArriveTimeout {
-					announce := all[rand.Intn(len(all))]
-					if !pm.chainman.HasBlock(hash) {
-						request[announce.peer] = append(request[announce.peer], hash)
-						pending[hash] = announce
-					}
-					delete(announces, hash)
-				}
-			}
-			if len(request) == 0 {
-				break
-			}
-			// Send out all block requests
-			for peer, hashes := range request {
-				glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
-				go peer.requestBlocks(hashes)
-			}
-			request = make(map[*peer][]common.Hash)
-
-		case filter := <-pm.newBlockCh:
-			// Blocks arrived, extract any explicit fetches, return all else
-			var blocks types.Blocks
-			select {
-			case blocks = <-filter:
-			case <-pm.quitSync:
-				return
-			}
-
-			explicit, download := []*types.Block{}, []*types.Block{}
-			for _, block := range blocks {
-				hash := block.Hash()
-
-				// Filter explicitly requested blocks from hash announcements
-				if _, ok := pending[hash]; ok {
-					// Discard if already imported by other means
-					if !pm.chainman.HasBlock(hash) {
-						explicit = append(explicit, block)
-					} else {
-						delete(pending, hash)
-					}
-				} else {
-					download = append(download, block)
-				}
-			}
-
-			select {
-			case filter <- download:
-			case <-pm.quitSync:
-				return
-			}
-			// Create a closure with the retrieved blocks and origin peers
-			peers := make([]*peer, 0, len(explicit))
-			blocks = make([]*types.Block, 0, len(explicit))
-			for _, block := range explicit {
-				hash := block.Hash()
-				if announce := pending[hash]; announce != nil {
-					// Drop the block if it surely cannot fit
-					if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) {
-						// delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout?
-						continue
-					}
-					// Otherwise accumulate for import
-					peers = append(peers, announce.peer)
-					blocks = append(blocks, block)
-				}
-			}
-			// If any explicit fetches were replied to, import them
-			if count := len(blocks); count > 0 {
-				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
-				go func() {
-					// Make sure all hashes are cleaned up
-					for _, block := range blocks {
-						hash := block.Hash()
-						defer func() { done <- hash }()
-					}
-					// Try and actually import the blocks
-					for i := 0; i < len(blocks); i++ {
-						if err := pm.importBlock(peers[i], blocks[i], nil); err != nil {
-							glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
-							return
-						}
-					}
-				}()
-			}
-
-		case <-pm.quitSync:
-			return
-		}
-	}
-}
-
 // syncer is responsible for periodically synchronising with the network, both
-// downloading hashes and blocks as well as retrieving cached ones.
+// downloading hashes and blocks as well as handling the announcement handler.
 func (pm *ProtocolManager) syncer() {
-	// Abort any pending syncs if we terminate
+	// Start and ensure cleanup of sync mechanisms
+	pm.fetcher.Start()
+	defer pm.fetcher.Stop()
 	defer pm.downloader.Terminate()
 
+	// Wait for different events to fire synchronisation operations
 	forceSync := time.Tick(forceSyncCycle)
 	for {
 		select {
@@ -273,8 +145,7 @@ func (pm *ProtocolManager) syncer() {
 	}
 }
 
-// synchronise tries to sync up our local block chain with a remote peer, both
-// adding various sanity checks as well as wrapping it with various log entries.
+// synchronise tries to sync up our local block chain with a remote peer.
 func (pm *ProtocolManager) synchronise(peer *peer) {
 	// Short circuit if no peers are available
 	if peer == nil {
-- 
cgit v1.2.3