aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/netstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r--swarm/storage/netstore.go43
1 files changed, 34 insertions, 9 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index 16bc48a9a..202af2bf5 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -34,8 +34,8 @@ type (
)
type NetFetcher interface {
- Request(ctx context.Context, hopCount uint8)
- Offer(ctx context.Context, source *enode.ID)
+ Request(hopCount uint8)
+ Offer(source *enode.ID)
}
// NetStore is an extension of local storage
@@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
func (n *NetStore) Close() {
close(n.closeC)
n.store.Close()
- // TODO: loop through fetchers to cancel them
+
+ wg := sync.WaitGroup{}
+ for _, key := range n.fetchers.Keys() {
+ if f, ok := n.fetchers.Get(key); ok {
+ if fetch, ok := f.(*fetcher); ok {
+ wg.Add(1)
+ go func(fetch *fetcher) {
+ defer wg.Done()
+ fetch.cancel()
+
+ select {
+ case <-fetch.deliveredC:
+ case <-fetch.cancelledC:
+ }
+ }(fetch)
+ }
+ }
+ }
+ wg.Wait()
}
// get attempts at retrieving the chunk from LocalStore
@@ -150,7 +168,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co
}
// The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
// if it doesn't exist yet
- f := n.getOrCreateFetcher(ref)
+ f := n.getOrCreateFetcher(ctx, ref)
// If the caller needs the chunk, it has to use the returned fetch function to get it
return nil, f.Fetch, nil
}
@@ -158,10 +176,17 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co
return chunk, nil, nil
}
+// Has is the storage layer entry point to query the underlying
+// database to return if it has a chunk or not.
+// Called from the DebugAPI
+func (n *NetStore) Has(ctx context.Context, ref Address) bool {
+ return n.store.Has(ctx, ref)
+}
+
// getOrCreateFetcher attempts at retrieving an existing fetchers
// if none exists, creates one and saves it in the fetchers cache
// caller must hold the lock
-func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
+func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher {
if f := n.getFetcher(ref); f != nil {
return f
}
@@ -169,7 +194,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
// no fetcher for the given address, we have to create a new one
key := hex.EncodeToString(ref)
// create the context during which fetching is kept alive
- ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout)
+ cctx, cancel := context.WithTimeout(ctx, fetcherTimeout)
// destroy is called when all requests finish
destroy := func() {
// remove fetcher from fetchers
@@ -183,7 +208,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
// the peers which requested the chunk should not be requested to deliver it.
peers := &sync.Map{}
- fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC)
+ fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
n.fetchers.Add(key, fetcher)
return fetcher
@@ -271,9 +296,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
return nil, err
}
- f.netFetcher.Offer(rctx, &source)
+ f.netFetcher.Offer(&source)
} else {
- f.netFetcher.Request(rctx, hopCount)
+ f.netFetcher.Request(hopCount)
}
// wait until either the chunk is delivered or the context is done