diff options
author | Péter Szilágyi <peterke@gmail.com> | 2019-02-20 16:48:12 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-20 16:48:12 +0800 |
commit | c942700427557e3ff6de3aaf6b916e2f056c1ec2 (patch) | |
tree | cadf68e7206d6de42b1eefc6967214cf86e35ff2 /swarm/network/fetcher.go | |
parent | 7fa3509e2eaf1a4ebc12344590e5699406690f15 (diff) | |
parent | cde35439e058b4f9579830fec9fb65ae0b998346 (diff) | |
download | go-tangerine-1.8.23.tar go-tangerine-1.8.23.tar.gz go-tangerine-1.8.23.tar.bz2 go-tangerine-1.8.23.tar.lz go-tangerine-1.8.23.tar.xz go-tangerine-1.8.23.tar.zst go-tangerine-1.8.23.zip |
Merge pull request #19029 from holiman/update1.8v1.8.23
Update1.8
Diffstat (limited to 'swarm/network/fetcher.go')
-rw-r--r-- | swarm/network/fetcher.go | 70 |
1 files changed, 39 insertions, 31 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index 6aed57e22..6b2175166 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -26,20 +26,23 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) -var searchTimeout = 1 * time.Second +const ( + defaultSearchTimeout = 1 * time.Second + // maximum number of forwarded requests (hops), to make sure requests are not + // forwarded forever in peer loops + maxHopCount uint8 = 20 +) // Time to consider peer to be skipped. // Also used in stream delivery. var RequestTimeout = 10 * time.Second -var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops - type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error) // Fetcher is created when a chunk is not found locally. It starts a request handler loop once and // keeps it alive until all active requests are completed. This can happen: // 1. either because the chunk is delivered -// 2. or becuse the requestor cancelled/timed out +// 2. or because the requester cancelled/timed out // Fetcher self destroys itself after it is completed. // TODO: cancel all forward requests after termination type Fetcher struct { @@ -47,7 +50,9 @@ type Fetcher struct { addr storage.Address // the address of the chunk to be fetched offerC chan *enode.ID // channel of sources (peer node id strings) requestC chan uint8 // channel for incoming requests (with the hopCount value in it) + searchTimeout time.Duration skipCheck bool + ctx context.Context } type Request struct { @@ -79,7 +84,7 @@ func (r *Request) SkipPeer(nodeID string) bool { } t, ok := val.(time.Time) if ok && time.Now().After(t.Add(RequestTimeout)) { - // deadine expired + // deadline expired r.peersToSkip.Delete(nodeID) return false } @@ -100,32 +105,35 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { } } -// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to -// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting -// this chunk, to make sure we don't request back the chunks from them. +// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip +// are not requested to deliver the given chunk. peersToSkip should always +// contain the peers which are actively requesting this chunk, to make sure we +// don't request back the chunks from them. // The created Fetcher is started and returned. -func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { - fetcher := NewFetcher(source, f.request, f.skipCheck) - go fetcher.run(ctx, peersToSkip) +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(ctx, source, f.request, f.skipCheck) + go fetcher.run(peers) return fetcher } // NewFetcher creates a new Fetcher for the given chunk address using the given request function. -func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { +func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { return &Fetcher{ addr: addr, protoRequestFunc: rf, offerC: make(chan *enode.ID), requestC: make(chan uint8), + searchTimeout: defaultSearchTimeout, skipCheck: skipCheck, + ctx: ctx, } } // Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. -func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { +func (f *Fetcher) Offer(source *enode.ID) { // First we need to have this select to make sure that we return if context is done select { - case <-ctx.Done(): + case <-f.ctx.Done(): return default: } @@ -134,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) select { case f.offerC <- source: - case <-ctx.Done(): + case <-f.ctx.Done(): } } // Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. -func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { +func (f *Fetcher) Request(hopCount uint8) { // First we need to have this select to make sure that we return if context is done select { - case <-ctx.Done(): + case <-f.ctx.Done(): return default: } @@ -156,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) select { case f.requestC <- hopCount + 1: - case <-ctx.Done(): + case <-f.ctx.Done(): } } // start prepares the Fetcher // it keeps the Fetcher alive within the lifecycle of the passed context -func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { +func (f *Fetcher) run(peers *sync.Map) { var ( doRequest bool // determines if retrieval is initiated in the current iteration wait *time.Timer // timer for search timeout @@ -176,7 +184,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // loop that keeps the fetching process alive // after every request a timer is set. If this goes off we request again from another peer // note that the previous request is still alive and has the chance to deliver, so - // rerequesting extends the search. ie., + // requesting again extends the search. ie., // if a peer we requested from is gone we issue a new request, so the number of active // requests never decreases for { @@ -209,20 +217,20 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // search timeout: too much time passed since the last request, // extend the search to a new peer if we can find one case <-waitC: - log.Trace("search timed out: rerequesting", "request addr", f.addr) + log.Trace("search timed out: requesting", "request addr", f.addr) doRequest = requested // all Fetcher context closed, can quit - case <-ctx.Done(): + case <-f.ctx.Done(): log.Trace("terminate fetcher", "request addr", f.addr) - // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from) + // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from) return } // need to issue a new request if doRequest { var err error - sources, err = f.doRequest(ctx, gone, peers, sources, hopCount) + sources, err = f.doRequest(gone, peers, sources, hopCount) if err != nil { log.Info("unable to request", "request addr", f.addr, "err", err) } @@ -231,7 +239,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // if wait channel is not set, set it to a timer if requested { if wait == nil { - wait = time.NewTimer(searchTimeout) + wait = time.NewTimer(f.searchTimeout) defer wait.Stop() waitC = wait.C } else { @@ -242,8 +250,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { default: } } - // reset the timer to go off after searchTimeout - wait.Reset(searchTimeout) + // reset the timer to go off after defaultSearchTimeout + wait.Reset(f.searchTimeout) } } doRequest = false @@ -260,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // * the peer's address is added to the set of peers to skip // * the peer's address is removed from prospective sources, and // * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) -func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { +func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { var i int var sourceID *enode.ID var quit chan struct{} @@ -277,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki for i = 0; i < len(sources); i++ { req.Source = sources[i] var err error - sourceID, quit, err = f.protoRequestFunc(ctx, req) + sourceID, quit, err = f.protoRequestFunc(f.ctx, req) if err == nil { // remove the peer from known sources // Note: we can modify the source although we are looping on it, because we break from the loop immediately @@ -291,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki if !foundSource { req.Source = nil var err error - sourceID, quit, err = f.protoRequestFunc(ctx, req) + sourceID, quit, err = f.protoRequestFunc(f.ctx, req) if err != nil { // if no peers found to request from return sources, err @@ -308,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki select { case <-quit: gone <- sourceID - case <-ctx.Done(): + case <-f.ctx.Done(): } }() return sources, nil |