diff options
Diffstat (limited to 'eth/sync.go')
-rw-r--r-- | eth/sync.go | 147 |
1 files changed, 9 insertions, 138 deletions
diff --git a/eth/sync.go b/eth/sync.go index a3b177a4d..82abb725f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,11 +12,8 @@ import ( ) const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching - notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested - notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + minDesiredPeerCount = 5 // Amount of peers desired to start syncing // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. @@ -119,140 +116,15 @@ func (pm *ProtocolManager) txsyncLoop() { } } -// fetcher is responsible for collecting hash notifications, and periodically -// checking all unknown ones and individually fetching them. -func (pm *ProtocolManager) fetcher() { - announces := make(map[common.Hash][]*blockAnnounce) - request := make(map[*peer][]common.Hash) - pending := make(map[common.Hash]*blockAnnounce) - cycle := time.Tick(notifyCheckCycle) - done := make(chan common.Hash) - - // Iterate the block fetching until a quit is requested - for { - select { - case notifications := <-pm.newHashCh: - // A batch of hashes the notified, schedule them for retrieval - glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) - for _, announce := range notifications { - // Skip if it's already pending fetch - if _, ok := pending[announce.hash]; ok { - continue - } - // Otherwise queue up the peer as a potential source - announces[announce.hash] = append(announces[announce.hash], announce) - } - - case hash := <-done: - // A pending import finished, remove all traces - delete(pending, hash) - - case <-cycle: - // Clean up any expired block fetches - for hash, announce := range pending { - if time.Since(announce.time) > notifyFetchTimeout { - delete(pending, hash) - } - } - // Check if any notified blocks failed to arrive - for hash, all := range announces { - if time.Since(all[0].time) > notifyArriveTimeout { - announce := all[rand.Intn(len(all))] - if !pm.chainman.HasBlock(hash) { - request[announce.peer] = append(request[announce.peer], hash) - pending[hash] = announce - } - delete(announces, hash) - } - } - if len(request) == 0 { - break - } - // Send out all block requests - for peer, hashes := range request { - glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) - go peer.requestBlocks(hashes) - } - request = make(map[*peer][]common.Hash) - - case filter := <-pm.newBlockCh: - // Blocks arrived, extract any explicit fetches, return all else - var blocks types.Blocks - select { - case blocks = <-filter: - case <-pm.quitSync: - return - } - - explicit, download := []*types.Block{}, []*types.Block{} - for _, block := range blocks { - hash := block.Hash() - - // Filter explicitly requested blocks from hash announcements - if _, ok := pending[hash]; ok { - // Discard if already imported by other means - if !pm.chainman.HasBlock(hash) { - explicit = append(explicit, block) - } else { - delete(pending, hash) - } - } else { - download = append(download, block) - } - } - - select { - case filter <- download: - case <-pm.quitSync: - return - } - // Create a closure with the retrieved blocks and origin peers - peers := make([]*peer, 0, len(explicit)) - blocks = make([]*types.Block, 0, len(explicit)) - for _, block := range explicit { - hash := block.Hash() - if announce := pending[hash]; announce != nil { - // Drop the block if it surely cannot fit - if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) { - // delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout? - continue - } - // Otherwise accumulate for import - peers = append(peers, announce.peer) - blocks = append(blocks, block) - } - } - // If any explicit fetches were replied to, import them - if count := len(blocks); count > 0 { - glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks)) - go func() { - // Make sure all hashes are cleaned up - for _, block := range blocks { - hash := block.Hash() - defer func() { done <- hash }() - } - // Try and actually import the blocks - for i := 0; i < len(blocks); i++ { - if err := pm.importBlock(peers[i], blocks[i], nil); err != nil { - glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) - return - } - } - }() - } - - case <-pm.quitSync: - return - } - } -} - // syncer is responsible for periodically synchronising with the network, both -// downloading hashes and blocks as well as retrieving cached ones. +// downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { - // Abort any pending syncs if we terminate - defer pm.downloader.Cancel() + // Start and ensure cleanup of sync mechanisms + pm.fetcher.Start() + defer pm.fetcher.Stop() + defer pm.downloader.Terminate() + // Wait for different events to fire synchronisation operations forceSync := time.Tick(forceSyncCycle) for { select { @@ -273,8 +145,7 @@ func (pm *ProtocolManager) syncer() { } } -// synchronise tries to sync up our local block chain with a remote peer, both -// adding various sanity checks as well as wrapping it with various log entries. +// synchronise tries to sync up our local block chain with a remote peer. func (pm *ProtocolManager) synchronise(peer *peer) { // Short circuit if no peers are available if peer == nil { |