From 7c2af1c11722dc3175a98342c060afcfaf6a275f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 16 Jun 2015 11:58:32 +0300 Subject: eth, eth/fetcher: separate notification sync mechanism --- eth/downloader/downloader.go | 1 + eth/fetcher/fetcher.go | 258 +++++++++++++++++++++++++++++++++++++++++++ eth/handler.go | 71 +++++------- eth/sync.go | 145 ++---------------------- 4 files changed, 293 insertions(+), 182 deletions(-) create mode 100644 eth/fetcher/fetcher.go (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9866a5b46..18e5f50e8 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1,3 +1,4 @@ +// Package downloader contains the manual full chain synchronisation. package downloader import ( diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go new file mode 100644 index 000000000..19c53048c --- /dev/null +++ b/eth/fetcher/fetcher.go @@ -0,0 +1,258 @@ +// Package fetcher contains the block announcement based synchonisation. +package fetcher + +import ( + "errors" + "math/rand" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +const ( + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block +) + +var ( + errTerminated = errors.New("terminated") +) + +// hashCheckFn is a callback type for verifying a hash's presence in the local chain. +type hashCheckFn func(common.Hash) bool + +// blockRequesterFn is a callback type for sending a block retrieval request. +type blockRequesterFn func([]common.Hash) error + +// blockImporterFn is a callback type for trying to inject a block into the local chain. +type blockImporterFn func(peer string, block *types.Block) error + +// announce is the hash notification of the availability of a new block in the +// network. +type announce struct { + hash common.Hash // Hash of the block being announced + time time.Time // Timestamp of the announcement + + origin string // Identifier of the peer originating the notification + fetch blockRequesterFn // Fetcher function to retrieve +} + +// Fetcher is responsible for accumulating block announcements from various peers +// and scheduling them for retrieval. +type Fetcher struct { + // Various event channels + notify chan *announce + filter chan chan []*types.Block + quit chan struct{} + + // Callbacks + hasBlock hashCheckFn // Checks if a block is present in the chain + importBlock blockImporterFn // Injects a block from an origin peer into the chain +} + +// New creates a block fetcher to retrieve blocks based on hash announcements. +func New(hasBlock hashCheckFn, importBlock blockImporterFn) *Fetcher { + return &Fetcher{ + notify: make(chan *announce), + filter: make(chan chan []*types.Block), + quit: make(chan struct{}), + hasBlock: hasBlock, + importBlock: importBlock, + } +} + +// Start boots up the announcement based synchoniser, accepting and processing +// hash notifications and block fetches until termination requested. +func (f *Fetcher) Start() { + go f.loop() +} + +// Stop terminates the announcement based synchroniser, canceling all pending +// operations. +func (f *Fetcher) Stop() { + close(f.quit) +} + +// Notify announces the fetcher of the potential availability of a new block in +// the network. +func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher blockRequesterFn) error { + block := &announce{ + hash: hash, + time: time, + origin: peer, + fetch: fetcher, + } + select { + case f.notify <- block: + return nil + case <-f.quit: + return errTerminated + } +} + +// Filter extracts all the blocks that were explicitly requested by the fetcher, +// returning those that should be handled differently. +func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { + // Send the filter channel to the fetcher + filter := make(chan []*types.Block) + + select { + case f.filter <- filter: + case <-f.quit: + return nil + } + // Request the filtering of the block list + select { + case filter <- blocks: + case <-f.quit: + return nil + } + // Retrieve the blocks remaining after filtering + select { + case blocks := <-filter: + return blocks + case <-f.quit: + return nil + } +} + +// Loop is the main fetcher loop, checking and processing various notification +// events. +func (f *Fetcher) loop() { + announced := make(map[common.Hash][]*announce) + fetching := make(map[common.Hash]*announce) + fetch := time.NewTimer(0) + done := make(chan common.Hash) + + // Iterate the block fetching until a quit is requested + for { + // Clean up any expired block fetches + for hash, announce := range fetching { + if time.Since(announce.time) > fetchTimeout { + delete(announced, hash) + delete(fetching, hash) + } + } + // Wait for an outside event to occur + select { + case <-f.quit: + // Fetcher terminating, abort all operations + return + + case notification := <-f.notify: + // A block was announced, schedule if it's not yet downloading + glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4]) + if _, ok := fetching[notification.hash]; ok { + break + } + if len(announced) == 0 { + fetch.Reset(arriveTimeout) + } + announced[notification.hash] = append(announced[notification.hash], notification) + + case hash := <-done: + // A pending import finished, remove all traces of the notification + delete(announced, hash) + delete(fetching, hash) + + case <-fetch.C: + // At least one block's timer ran out, check for needing retrieval + request := make(map[string][]common.Hash) + + for hash, announces := range announced { + if time.Since(announces[0].time) > arriveTimeout { + announce := announces[rand.Intn(len(announces))] + if !f.hasBlock(hash) { + request[announce.origin] = append(request[announce.origin], hash) + fetching[hash] = announce + } + delete(announced, hash) + } + } + // Send out all block requests + for peer, hashes := range request { + glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes)) + go fetching[hashes[0]].fetch(hashes) + } + // Schedule the next fetch if blocks are still pending + if len(announced) > 0 { + nearest := time.Now() + for _, announces := range announced { + if nearest.Before(announces[0].time) { + nearest = announces[0].time + } + } + fetch.Reset(arriveTimeout + time.Since(nearest)) + } + + case filter := <-f.filter: + // Blocks arrived, extract any explicit fetches, return all else + var blocks types.Blocks + select { + case blocks = <-filter: + case <-f.quit: + return + } + + explicit, download := []*types.Block{}, []*types.Block{} + for _, block := range blocks { + hash := block.Hash() + + // Filter explicitly requested blocks from hash announcements + if _, ok := fetching[hash]; ok { + // Discard if already imported by other means + if !f.hasBlock(hash) { + explicit = append(explicit, block) + } else { + delete(fetching, hash) + } + } else { + download = append(download, block) + } + } + + select { + case filter <- download: + case <-f.quit: + return + } + // Create a closure with the retrieved blocks and origin peers + peers := make([]string, 0, len(explicit)) + blocks = make([]*types.Block, 0, len(explicit)) + for _, block := range explicit { + hash := block.Hash() + if announce := fetching[hash]; announce != nil { + // Drop the block if it surely cannot fit + if f.hasBlock(hash) || !f.hasBlock(block.ParentHash()) { + // delete(fetching, hash) // if we drop, it will re-fetch it, wait for timeout? + continue + } + // Otherwise accumulate for import + peers = append(peers, announce.origin) + blocks = append(blocks, block) + } + } + // If any explicit fetches were replied to, import them + if count := len(blocks); count > 0 { + glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks)) + go func() { + // Make sure all hashes are cleaned up + for _, block := range blocks { + hash := block.Hash() + defer func() { done <- hash }() + } + // Try and actually import the blocks + for i := 0; i < len(blocks); i++ { + if err := f.importBlock(peers[i], blocks[i]); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return + } + } + }() + } + } + } +} diff --git a/eth/handler.go b/eth/handler.go index ec4f2d53a..99ac4ce68 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -45,6 +47,7 @@ type ProtocolManager struct { txpool txPool chainman *core.ChainManager downloader *downloader.Downloader + fetcher *fetcher.Fetcher peers *peerSet SubProtocol p2p.Protocol @@ -54,11 +57,9 @@ type ProtocolManager struct { minedBlockSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - newHashCh chan []*blockAnnounce - newBlockCh chan chan []*types.Block - txsyncCh chan *txsync - quitSync chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} // wait group is used for graceful shutdowns during downloading // and processing @@ -69,30 +70,33 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager { + // Create the protocol manager and initialize peer handlers manager := &ProtocolManager{ - eventMux: mux, - txpool: txpool, - chainman: chainman, - peers: newPeerSet(), - newPeerCh: make(chan *peer, 1), - newHashCh: make(chan []*blockAnnounce, 1), - newBlockCh: make(chan chan []*types.Block), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + eventMux: mux, + txpool: txpool, + chainman: chainman, + peers: newPeerSet(), + newPeerCh: make(chan *peer, 1), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), Length: ProtocolLength, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(protocolVersion, networkId, p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) }, } + // Construct the different synchronisation mechanisms + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) + + importer := func(peer string, block *types.Block) error { + return manager.importBlock(manager.peers.Peer(peer), block, nil) + } + manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer) return manager } @@ -126,7 +130,6 @@ func (pm *ProtocolManager) Start() { // start sync handlers go pm.syncer() - go pm.fetcher() go pm.txsyncLoop() } @@ -291,20 +294,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - // Filter out any explicitly requested blocks (cascading select to get blocking back to peer) - filter := make(chan []*types.Block) - select { - case <-self.quitSync: - case self.newBlockCh <- filter: - select { - case <-self.quitSync: - case filter <- blocks: - select { - case <-self.quitSync: - case blocks := <-filter: - self.downloader.DeliverBlocks(p.id, blocks) - } - } + // Filter out any explicitly requested blocks, deliver the rest to the downloader + if blocks := self.fetcher.Filter(blocks); len(blocks) > 0 { + self.downloader.DeliverBlocks(p.id, blocks) } case NewBlockHashesMsg: @@ -327,19 +319,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { unknown = append(unknown, hash) } } - announces := make([]*blockAnnounce, len(unknown)) - for i, hash := range unknown { - announces[i] = &blockAnnounce{ - hash: hash, - peer: p, - time: time.Now(), - } - } - if len(announces) > 0 { - select { - case self.newHashCh <- announces: - case <-self.quitSync: - } + for _, hash := range unknown { + self.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks) } case NewBlockMsg: diff --git a/eth/sync.go b/eth/sync.go index 751bc1a2a..82abb725f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,11 +12,8 @@ import ( ) const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching - notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested - notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + minDesiredPeerCount = 5 // Amount of peers desired to start syncing // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. @@ -119,140 +116,15 @@ func (pm *ProtocolManager) txsyncLoop() { } } -// fetcher is responsible for collecting hash notifications, and periodically -// checking all unknown ones and individually fetching them. -func (pm *ProtocolManager) fetcher() { - announces := make(map[common.Hash][]*blockAnnounce) - request := make(map[*peer][]common.Hash) - pending := make(map[common.Hash]*blockAnnounce) - cycle := time.Tick(notifyCheckCycle) - done := make(chan common.Hash) - - // Iterate the block fetching until a quit is requested - for { - select { - case notifications := <-pm.newHashCh: - // A batch of hashes the notified, schedule them for retrieval - glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) - for _, announce := range notifications { - // Skip if it's already pending fetch - if _, ok := pending[announce.hash]; ok { - continue - } - // Otherwise queue up the peer as a potential source - announces[announce.hash] = append(announces[announce.hash], announce) - } - - case hash := <-done: - // A pending import finished, remove all traces - delete(pending, hash) - - case <-cycle: - // Clean up any expired block fetches - for hash, announce := range pending { - if time.Since(announce.time) > notifyFetchTimeout { - delete(pending, hash) - } - } - // Check if any notified blocks failed to arrive - for hash, all := range announces { - if time.Since(all[0].time) > notifyArriveTimeout { - announce := all[rand.Intn(len(all))] - if !pm.chainman.HasBlock(hash) { - request[announce.peer] = append(request[announce.peer], hash) - pending[hash] = announce - } - delete(announces, hash) - } - } - if len(request) == 0 { - break - } - // Send out all block requests - for peer, hashes := range request { - glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) - go peer.requestBlocks(hashes) - } - request = make(map[*peer][]common.Hash) - - case filter := <-pm.newBlockCh: - // Blocks arrived, extract any explicit fetches, return all else - var blocks types.Blocks - select { - case blocks = <-filter: - case <-pm.quitSync: - return - } - - explicit, download := []*types.Block{}, []*types.Block{} - for _, block := range blocks { - hash := block.Hash() - - // Filter explicitly requested blocks from hash announcements - if _, ok := pending[hash]; ok { - // Discard if already imported by other means - if !pm.chainman.HasBlock(hash) { - explicit = append(explicit, block) - } else { - delete(pending, hash) - } - } else { - download = append(download, block) - } - } - - select { - case filter <- download: - case <-pm.quitSync: - return - } - // Create a closure with the retrieved blocks and origin peers - peers := make([]*peer, 0, len(explicit)) - blocks = make([]*types.Block, 0, len(explicit)) - for _, block := range explicit { - hash := block.Hash() - if announce := pending[hash]; announce != nil { - // Drop the block if it surely cannot fit - if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) { - // delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout? - continue - } - // Otherwise accumulate for import - peers = append(peers, announce.peer) - blocks = append(blocks, block) - } - } - // If any explicit fetches were replied to, import them - if count := len(blocks); count > 0 { - glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks)) - go func() { - // Make sure all hashes are cleaned up - for _, block := range blocks { - hash := block.Hash() - defer func() { done <- hash }() - } - // Try and actually import the blocks - for i := 0; i < len(blocks); i++ { - if err := pm.importBlock(peers[i], blocks[i], nil); err != nil { - glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) - return - } - } - }() - } - - case <-pm.quitSync: - return - } - } -} - // syncer is responsible for periodically synchronising with the network, both -// downloading hashes and blocks as well as retrieving cached ones. +// downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { - // Abort any pending syncs if we terminate + // Start and ensure cleanup of sync mechanisms + pm.fetcher.Start() + defer pm.fetcher.Stop() defer pm.downloader.Terminate() + // Wait for different events to fire synchronisation operations forceSync := time.Tick(forceSyncCycle) for { select { @@ -273,8 +145,7 @@ func (pm *ProtocolManager) syncer() { } } -// synchronise tries to sync up our local block chain with a remote peer, both -// adding various sanity checks as well as wrapping it with various log entries. +// synchronise tries to sync up our local block chain with a remote peer. func (pm *ProtocolManager) synchronise(peer *peer) { // Short circuit if no peers are available if peer == nil { -- cgit v1.2.3