diff options
Diffstat (limited to 'swarm/network/fetcher.go')
-rw-r--r-- | swarm/network/fetcher.go | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index 3043f6475..6b2175166 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -52,6 +52,7 @@ type Fetcher struct { requestC chan uint8 // channel for incoming requests (with the hopCount value in it) searchTimeout time.Duration skipCheck bool + ctx context.Context } type Request struct { @@ -109,14 +110,14 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { // contain the peers which are actively requesting this chunk, to make sure we // don't request back the chunks from them. // The created Fetcher is started and returned. -func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { - fetcher := NewFetcher(source, f.request, f.skipCheck) - go fetcher.run(ctx, peersToSkip) +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(ctx, source, f.request, f.skipCheck) + go fetcher.run(peers) return fetcher } // NewFetcher creates a new Fetcher for the given chunk address using the given request function. -func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { +func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { return &Fetcher{ addr: addr, protoRequestFunc: rf, @@ -124,14 +125,15 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { requestC: make(chan uint8), searchTimeout: defaultSearchTimeout, skipCheck: skipCheck, + ctx: ctx, } } // Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. -func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { +func (f *Fetcher) Offer(source *enode.ID) { // First we need to have this select to make sure that we return if context is done select { - case <-ctx.Done(): + case <-f.ctx.Done(): return default: } @@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) { // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) select { case f.offerC <- source: - case <-ctx.Done(): + case <-f.ctx.Done(): } } // Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. -func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { +func (f *Fetcher) Request(hopCount uint8) { // First we need to have this select to make sure that we return if context is done select { - case <-ctx.Done(): + case <-f.ctx.Done(): return default: } @@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) { // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) select { case f.requestC <- hopCount + 1: - case <-ctx.Done(): + case <-f.ctx.Done(): } } // start prepares the Fetcher // it keeps the Fetcher alive within the lifecycle of the passed context -func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { +func (f *Fetcher) run(peers *sync.Map) { var ( doRequest bool // determines if retrieval is initiated in the current iteration wait *time.Timer // timer for search timeout @@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { doRequest = requested // all Fetcher context closed, can quit - case <-ctx.Done(): + case <-f.ctx.Done(): log.Trace("terminate fetcher", "request addr", f.addr) // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from) return @@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // need to issue a new request if doRequest { var err error - sources, err = f.doRequest(ctx, gone, peers, sources, hopCount) + sources, err = f.doRequest(gone, peers, sources, hopCount) if err != nil { log.Info("unable to request", "request addr", f.addr, "err", err) } @@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { // * the peer's address is added to the set of peers to skip // * the peer's address is removed from prospective sources, and // * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) -func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { +func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { var i int var sourceID *enode.ID var quit chan struct{} @@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki for i = 0; i < len(sources); i++ { req.Source = sources[i] var err error - sourceID, quit, err = f.protoRequestFunc(ctx, req) + sourceID, quit, err = f.protoRequestFunc(f.ctx, req) if err == nil { // remove the peer from known sources // Note: we can modify the source although we are looping on it, because we break from the loop immediately @@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki if !foundSource { req.Source = nil var err error - sourceID, quit, err = f.protoRequestFunc(ctx, req) + sourceID, quit, err = f.protoRequestFunc(f.ctx, req) if err != nil { // if no peers found to request from return sources, err @@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki select { case <-quit: gone <- sourceID - case <-ctx.Done(): + case <-f.ctx.Done(): } }() return sources, nil |