diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-17 05:19:09 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-06-18 20:56:08 +0800 |
commit | 2a7411bc9605dae9798ed29ed237e28c8fc98895 (patch) | |
tree | 119efc8c1709e05fdc0d2364f422594f128e7ca1 /eth/fetcher | |
parent | 497a7f1717a798e59c9550fbb58504b08fe13b21 (diff) | |
download | go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.tar go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.tar.gz go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.tar.bz2 go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.tar.lz go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.tar.xz go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.tar.zst go-tangerine-2a7411bc9605dae9798ed29ed237e28c8fc98895.zip |
eth/fetcher: fix premature queue cleanup, general polishes
Diffstat (limited to 'eth/fetcher')
-rw-r--r-- | eth/fetcher/fetcher.go | 82 |
1 files changed, 47 insertions, 35 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 76d3798cb..78d25f72e 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -61,6 +61,10 @@ type Fetcher struct { filter chan chan []*types.Block quit chan struct{} + // Announce states + announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching + fetching map[common.Hash]*announce // Announced blocks, currently fetching + // Block cache queue *prque.Prque // Queue containing the import operations (block number sorted) queued map[common.Hash]struct{} // Presence set of already queued blocks (to dedup imports) @@ -78,6 +82,8 @@ func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHei insert: make(chan *inject), filter: make(chan chan []*types.Block), quit: make(chan struct{}), + announced: make(map[common.Hash][]*announce), + fetching: make(map[common.Hash]*announce), queue: prque.New(), queued: make(map[common.Hash]struct{}), hasBlock: hasBlock, @@ -158,19 +164,16 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { // Loop is the main fetcher loop, checking and processing various notification // events. func (f *Fetcher) loop() { - announced := make(map[common.Hash][]*announce) - fetching := make(map[common.Hash]*announce) - // Iterate the block fetching until a quit is requested fetch := time.NewTimer(0) done := make(chan common.Hash) for { // Clean up any expired block fetches - for hash, announce := range fetching { + for hash, announce := range f.fetching { if time.Since(announce.time) > fetchTimeout { - delete(announced, hash) - delete(fetching, hash) + delete(f.announced, hash) + delete(f.fetching, hash) } } // Import any queued blocks that could potentially fit @@ -184,17 +187,16 @@ func (f *Fetcher) loop() { } // Otherwise if not known yet, try and import hash := op.block.Hash() - delete(f.queued, hash) if f.hasBlock(hash) { continue } // Block may just fit, try to import it - glog.V(logger.Debug).Infof("Peer %s: importing block %x", op.origin, hash.Bytes()[:4]) + glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", op.origin, op.block.NumberU64(), hash.Bytes()[:4]) go func() { defer func() { done <- hash }() if err := f.importBlock(op.origin, op.block); err != nil { - glog.V(logger.Detail).Infof("Peer %s: block %x import failed: %v", op.origin, hash.Bytes()[:4], err) + glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", op.origin, op.block.NumberU64(), hash.Bytes()[:4], err) return } }() @@ -208,14 +210,13 @@ func (f *Fetcher) loop() { case notification := <-f.notify: // A block was announced, schedule if it's not yet downloading glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4]) - if _, ok := fetching[notification.hash]; ok { + if _, ok := f.fetching[notification.hash]; ok { break } - if len(announced) == 0 { - glog.V(logger.Detail).Infof("Scheduling fetch in %v, at %v", arriveTimeout-time.Since(notification.time), notification.time.Add(arriveTimeout)) - fetch.Reset(arriveTimeout - time.Since(notification.time)) + f.announced[notification.hash] = append(f.announced[notification.hash], notification) + if len(f.announced) == 1 { + f.reschedule(fetch) } - announced[notification.hash] = append(announced[notification.hash], notification) case op := <-f.insert: // A direct block insertion was requested, try and fill any pending gaps @@ -223,39 +224,31 @@ func (f *Fetcher) loop() { case hash := <-done: // A pending import finished, remove all traces of the notification - delete(announced, hash) - delete(fetching, hash) + delete(f.announced, hash) + delete(f.fetching, hash) + delete(f.queued, hash) case <-fetch.C: // At least one block's timer ran out, check for needing retrieval request := make(map[string][]common.Hash) - for hash, announces := range announced { + for hash, announces := range f.announced { if time.Since(announces[0].time) > arriveTimeout { announce := announces[rand.Intn(len(announces))] if !f.hasBlock(hash) { request[announce.origin] = append(request[announce.origin], hash) - fetching[hash] = announce + f.fetching[hash] = announce } - delete(announced, hash) + delete(f.announced, hash) } } // Send out all block requests for peer, hashes := range request { glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes)) - go fetching[hashes[0]].fetch(hashes) + go f.fetching[hashes[0]].fetch(hashes) } // Schedule the next fetch if blocks are still pending - if len(announced) > 0 { - nearest := time.Now() - for _, announces := range announced { - if nearest.After(announces[0].time) { - nearest = announces[0].time - } - } - glog.V(logger.Detail).Infof("Rescheduling fetch in %v, at %v", arriveTimeout-time.Since(nearest), nearest.Add(arriveTimeout)) - fetch.Reset(arriveTimeout - time.Since(nearest)) - } + f.reschedule(fetch) case filter := <-f.filter: // Blocks arrived, extract any explicit fetches, return all else @@ -271,12 +264,12 @@ func (f *Fetcher) loop() { hash := block.Hash() // Filter explicitly requested blocks from hash announcements - if _, ok := fetching[hash]; ok { + if _, ok := f.fetching[hash]; ok { // Discard if already imported by other means if !f.hasBlock(hash) { explicit = append(explicit, block) } else { - delete(fetching, hash) + delete(f.fetching, hash) } } else { download = append(download, block) @@ -290,7 +283,7 @@ func (f *Fetcher) loop() { } // Schedule the retrieved blocks for ordered import for _, block := range explicit { - if announce := fetching[block.Hash()]; announce != nil { + if announce := f.fetching[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } } @@ -298,21 +291,40 @@ func (f *Fetcher) loop() { } } +// reschedule resets the specified fetch timer to the next announce timeout. +func (f *Fetcher) reschedule(fetch *time.Timer) { + // Short circuit if no blocks are announced + if len(f.announced) == 0 { + return + } + // Otherwise find the earliest expiring announcement + earliest := time.Now() + for _, announces := range f.announced { + if earliest.After(announces[0].time) { + earliest = announces[0].time + } + } + glog.V(logger.Detail).Infof("Scheduling next fetch in %v", arriveTimeout-time.Since(earliest)) + fetch.Reset(arriveTimeout - time.Since(earliest)) +} + // enqueue schedules a new future import operation, if the block to be imported // has not yet been seen. func (f *Fetcher) enqueue(peer string, block *types.Block) { + hash := block.Hash() + // Make sure the block isn't in some weird place if math.Abs(float64(f.chainHeight())-float64(block.NumberU64())) > maxQueueDist { + glog.Infof("Peer %s: discarded block #%d [%x] too far from head", peer, block.NumberU64(), hash.Bytes()[:4]) return } // Schedule the block for future importing - hash := block.Hash() if _, ok := f.queued[hash]; !ok { f.queued[hash] = struct{}{} f.queue.Push(&inject{origin: peer, block: block}, -float32(block.NumberU64())) if glog.V(logger.Detail) { - glog.Infof("Peer %s: queued block %x, total %v", peer, hash.Bytes()[:4], f.queue.Size()) + glog.Infof("Peer %s: queued block #%d [%x], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size()) } } } |