diff options
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r-- | swarm/storage/netstore.go | 43 |
1 files changed, 34 insertions, 9 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 16bc48a9a..202af2bf5 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -34,8 +34,8 @@ type ( ) type NetFetcher interface { - Request(ctx context.Context, hopCount uint8) - Offer(ctx context.Context, source *enode.ID) + Request(hopCount uint8) + Offer(source *enode.ID) } // NetStore is an extension of local storage @@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont func (n *NetStore) Close() { close(n.closeC) n.store.Close() - // TODO: loop through fetchers to cancel them + + wg := sync.WaitGroup{} + for _, key := range n.fetchers.Keys() { + if f, ok := n.fetchers.Get(key); ok { + if fetch, ok := f.(*fetcher); ok { + wg.Add(1) + go func(fetch *fetcher) { + defer wg.Done() + fetch.cancel() + + select { + case <-fetch.deliveredC: + case <-fetch.cancelledC: + } + }(fetch) + } + } + } + wg.Wait() } // get attempts at retrieving the chunk from LocalStore @@ -150,7 +168,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co } // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one // if it doesn't exist yet - f := n.getOrCreateFetcher(ref) + f := n.getOrCreateFetcher(ctx, ref) // If the caller needs the chunk, it has to use the returned fetch function to get it return nil, f.Fetch, nil } @@ -158,10 +176,17 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co return chunk, nil, nil } +// Has is the storage layer entry point to query the underlying +// database to return if it has a chunk or not. +// Called from the DebugAPI +func (n *NetStore) Has(ctx context.Context, ref Address) bool { + return n.store.Has(ctx, ref) +} + // getOrCreateFetcher attempts at retrieving an existing fetchers // if none exists, creates one and saves it in the fetchers cache // caller must hold the lock -func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { +func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher { if f := n.getFetcher(ref); f != nil { return f } @@ -169,7 +194,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { // no fetcher for the given address, we have to create a new one key := hex.EncodeToString(ref) // create the context during which fetching is kept alive - ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout) + cctx, cancel := context.WithTimeout(ctx, fetcherTimeout) // destroy is called when all requests finish destroy := func() { // remove fetcher from fetchers @@ -183,7 +208,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { // the peers which requested the chunk should not be requested to deliver it. peers := &sync.Map{} - fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC) + fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC) n.fetchers.Add(key, fetcher) return fetcher @@ -271,9 +296,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil { return nil, err } - f.netFetcher.Offer(rctx, &source) + f.netFetcher.Offer(&source) } else { - f.netFetcher.Request(rctx, hopCount) + f.netFetcher.Request(hopCount) } // wait until either the chunk is delivered or the context is done |