aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/fetcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/fetcher.go')
-rw-r--r--swarm/network/fetcher.go36
1 files changed, 19 insertions, 17 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
index 3043f6475..6b2175166 100644
--- a/swarm/network/fetcher.go
+++ b/swarm/network/fetcher.go
@@ -52,6 +52,7 @@ type Fetcher struct {
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration
skipCheck bool
+ ctx context.Context
}
type Request struct {
@@ -109,14 +110,14 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
-func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
- fetcher := NewFetcher(source, f.request, f.skipCheck)
- go fetcher.run(ctx, peersToSkip)
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
+ fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
+ go fetcher.run(peers)
return fetcher
}
// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
-func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{
addr: addr,
protoRequestFunc: rf,
@@ -124,14 +125,15 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
+ ctx: ctx,
}
}
// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
-func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
+func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done
select {
- case <-ctx.Done():
+ case <-f.ctx.Done():
return
default:
}
@@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.offerC <- source:
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
-func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
+func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
- case <-ctx.Done():
+ case <-f.ctx.Done():
return
default:
}
@@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.requestC <- hopCount + 1:
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}
// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
-func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
+func (f *Fetcher) run(peers *sync.Map) {
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
@@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest = requested
// all Fetcher context closed, can quit
- case <-ctx.Done():
+ case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
@@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request
if doRequest {
var err error
- sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
+ sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
@@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
-func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
+func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
@@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
- sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
@@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
if !foundSource {
req.Source = nil
var err error
- sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil {
// if no peers found to request from
return sources, err
@@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
select {
case <-quit:
gone <- sourceID
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}()
return sources, nil