From 9ed166c196b07047299579e5ea2b6ece26aec5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 8 Jun 2015 20:38:39 +0300 Subject: eth: split and handle explicitly vs. download requested blocks --- eth/sync.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) (limited to 'eth/sync.go') diff --git a/eth/sync.go b/eth/sync.go index 1a1cbdb47..f761f3cd1 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -12,6 +12,16 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 +) + // blockAnnounce is the hash notification of the availability of a new block in // the network. type blockAnnounce struct { @@ -25,6 +35,7 @@ type blockAnnounce struct { func (pm *ProtocolManager) fetcher() { announces := make(map[common.Hash]*blockAnnounce) request := make(map[*peer][]common.Hash) + pending := make(map[common.Hash]*blockAnnounce) cycle := time.Tick(notifyCheckCycle) // Iterate the block fetching until a quit is requested @@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() { } case <-cycle: + // Clean up any expired block fetches + for hash, announce := range pending { + if time.Since(announce.time) > notifyFetchTimeout { + delete(pending, hash) + } + } // Check if any notified blocks failed to arrive for hash, announce := range announces { if time.Since(announce.time) > notifyArriveTimeout { if !pm.chainman.HasBlock(hash) { request[announce.peer] = append(request[announce.peer], hash) + pending[hash] = announce } delete(announces, hash) } @@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() { } request = make(map[*peer][]common.Hash) + case filter := <-pm.newBlockCh: + // Blocks arrived, extract any explicit requests, return all else + var blocks types.Blocks + select { + case blocks = <-filter: + case <-pm.quitSync: + return + } + + fetch, sync := []*types.Block{}, []*types.Block{} + for _, block := range blocks { + hash := block.Hash() + if _, ok := pending[hash]; ok { + fetch = append(fetch, block) + } else { + sync = append(sync, block) + } + } + + select { + case filter <- sync: + case <-pm.quitSync: + return + } + // If any explicit fetches were replied to, import them + if len(fetch) > 0 { + go func() { + for _, block := range fetch { + if announce := pending[block.Hash()]; announce != nil { + if err := pm.importBlock(announce.peer, block, nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return + } + } + } + }() + } + case <-pm.quitSync: return } -- cgit v1.2.3