aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/netstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r--swarm/storage/netstore.go382
1 files changed, 238 insertions, 144 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index 96a7e51f7..de2d82d2b 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -18,181 +18,275 @@ package storage
import (
"context"
+ "encoding/hex"
+ "fmt"
+ "sync"
+ "sync/atomic"
"time"
+ "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
- opentracing "github.com/opentracing/opentracing-go"
+
+ lru "github.com/hashicorp/golang-lru"
)
-var (
- // NetStore.Get timeout for get and get retries
- // This is the maximum period that the Get will block.
- // If it is reached, Get will return ErrChunkNotFound.
- netStoreRetryTimeout = 30 * time.Second
- // Minimal period between calling get method on NetStore
- // on retry. It protects calling get very frequently if
- // it returns ErrChunkNotFound very fast.
- netStoreMinRetryDelay = 3 * time.Second
- // Timeout interval before retrieval is timed out.
- // It is used in NetStore.get on waiting for ReqC to be
- // closed on a single retrieve request.
- searchTimeout = 10 * time.Second
+type (
+ NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher
)
-// NetStore implements the ChunkStore interface,
-// this chunk access layer assumed 2 chunk stores
-// local storage eg. LocalStore and network storage eg., NetStore
-// access by calling network is blocking with a timeout
+type NetFetcher interface {
+ Request(ctx context.Context)
+ Offer(ctx context.Context, source *discover.NodeID)
+}
+
+// NetStore is an extension of local storage
+// it implements the ChunkStore interface
+// on request it initiates remote cloud retrieval using a fetcher
+// fetchers are unique to a chunk and are stored in fetchers LRU memory cache
+// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address
type NetStore struct {
- localStore *LocalStore
- retrieve func(ctx context.Context, chunk *Chunk) error
+ mu sync.Mutex
+ store SyncChunkStore
+ fetchers *lru.Cache
+ NewNetFetcherFunc NewNetFetcherFunc
+ closeC chan struct{}
}
-func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore {
- return &NetStore{localStore, retrieve}
+// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a
+// constructor function that can create a fetch function for a specific chunk address.
+func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) {
+ fetchers, err := lru.New(defaultChunkRequestsCacheCapacity)
+ if err != nil {
+ return nil, err
+ }
+ return &NetStore{
+ store: store,
+ fetchers: fetchers,
+ NewNetFetcherFunc: nnf,
+ closeC: make(chan struct{}),
+ }, nil
}
-// Get is the entrypoint for local retrieve requests
-// waits for response or times out
-//
-// Get uses get method to retrieve request, but retries if the
-// ErrChunkNotFound is returned by get, until the netStoreRetryTimeout
-// is reached.
-func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
-
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "netstore.get.global")
- defer sp.Finish()
-
- timer := time.NewTimer(netStoreRetryTimeout)
- defer timer.Stop()
-
- // result and resultC provide results from the goroutine
- // where NetStore.get is called.
- type result struct {
- chunk *Chunk
- err error
+// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in
+// the fetchers cache
+func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ // put to the chunk to the store, there should be no error
+ err := n.store.Put(ctx, ch)
+ if err != nil {
+ return err
}
- resultC := make(chan result)
-
- // quitC ensures that retring goroutine is terminated
- // when this function returns.
- quitC := make(chan struct{})
- defer close(quitC)
-
- // do retries in a goroutine so that the timer can
- // force this method to return after the netStoreRetryTimeout.
- go func() {
- // limiter ensures that NetStore.get is not called more frequently
- // then netStoreMinRetryDelay. If NetStore.get takes longer
- // then netStoreMinRetryDelay, the next retry call will be
- // without a delay.
- limiter := time.NewTimer(netStoreMinRetryDelay)
- defer limiter.Stop()
-
- for {
- chunk, err := ns.get(ctx, addr, 0)
- if err != ErrChunkNotFound {
- // break retry only if the error is nil
- // or other error then ErrChunkNotFound
- select {
- case <-quitC:
- // Maybe NetStore.Get function has returned
- // by the timer.C while we were waiting for the
- // results. Terminate this goroutine.
- case resultC <- result{chunk: chunk, err: err}:
- // Send the result to the parrent goroutine.
- }
- return
-
- }
- select {
- case <-quitC:
- // NetStore.Get function has returned, possibly
- // by the timer.C, which makes this goroutine
- // not needed.
- return
- case <-limiter.C:
- }
- // Reset the limiter for the next iteration.
- limiter.Reset(netStoreMinRetryDelay)
- log.Debug("NetStore.Get retry chunk", "key", addr)
- }
- }()
- select {
- case r := <-resultC:
- return r.chunk, r.err
- case <-timer.C:
- return nil, ErrChunkNotFound
+ // if chunk is now put in the store, check if there was an active fetcher and call deliver on it
+ // (this delivers the chunk to requestors via the fetcher)
+ if f := n.getFetcher(ch.Address()); f != nil {
+ f.deliver(ctx, ch)
+ }
+ return nil
+}
+
+// Get retrieves the chunk from the NetStore DPA synchronously.
+// It calls NetStore.get, and if the chunk is not in local Storage
+// it calls fetch with the request, which blocks until the chunk
+// arrived or context is done
+func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) {
+ chunk, fetch, err := n.get(rctx, ref)
+ if err != nil {
+ return nil, err
+ }
+ if chunk != nil {
+ return chunk, nil
}
+ return fetch(rctx)
}
-// GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter
-func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
- return ns.get(ctx, addr, timeout)
+func (n *NetStore) BinIndex(po uint8) uint64 {
+ return n.store.BinIndex(po)
}
-func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
- if timeout == 0 {
- timeout = searchTimeout
+func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
+ return n.store.Iterator(from, to, po, f)
+}
+
+// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function,
+// which returns after the chunk is available or the context is done
+func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error {
+ chunk, fetch, _ := n.get(ctx, ref)
+ if chunk != nil {
+ return nil
+ }
+ return func(ctx context.Context) error {
+ _, err := fetch(ctx)
+ return err
}
+}
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "netstore.get")
- defer sp.Finish()
+// Close chunk store
+func (n *NetStore) Close() {
+ close(n.closeC)
+ n.store.Close()
+ // TODO: loop through fetchers to cancel them
+}
- if ns.retrieve == nil {
- chunk, err = ns.localStore.Get(ctx, addr)
- if err == nil {
- return chunk, nil
- }
- if err != ErrFetching {
- return nil, err
- }
- } else {
- var created bool
- chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr)
+// get attempts at retrieving the chunk from LocalStore
+// If it is not found then using getOrCreateFetcher:
+// 1. Either there is already a fetcher to retrieve it
+// 2. A new fetcher is created and saved in the fetchers cache
+// From here on, all Get will hit on this fetcher until the chunk is delivered
+// or all fetcher contexts are done.
+// It returns a chunk, a fetcher function and an error
+// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk.
+func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) {
+ n.mu.Lock()
+ defer n.mu.Unlock()
- if chunk.ReqC == nil {
- return chunk, nil
+ chunk, err := n.store.Get(ctx, ref)
+ if err != nil {
+ if err != ErrChunkNotFound {
+ log.Debug("Received error from LocalStore other than ErrNotFound", "err", err)
}
+ // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
+ // if it doesn't exist yet
+ f := n.getOrCreateFetcher(ref)
+ // If the caller needs the chunk, it has to use the returned fetch function to get it
+ return nil, f.Fetch, nil
+ }
- if created {
- err := ns.retrieve(ctx, chunk)
- if err != nil {
- // mark chunk request as failed so that we can retry it later
- chunk.SetErrored(ErrChunkUnavailable)
- return nil, err
- }
- }
+ return chunk, nil, nil
+}
+
+// getOrCreateFetcher attempts at retrieving an existing fetchers
+// if none exists, creates one and saves it in the fetchers cache
+// caller must hold the lock
+func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
+ if f := n.getFetcher(ref); f != nil {
+ return f
}
- t := time.NewTicker(timeout)
- defer t.Stop()
+ // no fetcher for the given address, we have to create a new one
+ key := hex.EncodeToString(ref)
+ // create the context during which fetching is kept alive
+ ctx, cancel := context.WithCancel(context.Background())
+ // destroy is called when all requests finish
+ destroy := func() {
+ // remove fetcher from fetchers
+ n.fetchers.Remove(key)
+ // stop fetcher by cancelling context called when
+ // all requests cancelled/timedout or chunk is delivered
+ cancel()
+ }
+ // peers always stores all the peers which have an active request for the chunk. It is shared
+ // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because
+ // the peers which requested the chunk should not be requested to deliver it.
+ peers := &sync.Map{}
- select {
- case <-t.C:
- // mark chunk request as failed so that we can retry
- chunk.SetErrored(ErrChunkNotFound)
- return nil, ErrChunkNotFound
- case <-chunk.ReqC:
+ fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC)
+ n.fetchers.Add(key, fetcher)
+
+ return fetcher
+}
+
+// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists,
+// otherwise it returns nil
+func (n *NetStore) getFetcher(ref Address) *fetcher {
+ key := hex.EncodeToString(ref)
+ f, ok := n.fetchers.Get(key)
+ if ok {
+ return f.(*fetcher)
}
- chunk.SetErrored(nil)
- return chunk, nil
+ return nil
}
-// Put is the entrypoint for local store requests coming from storeLoop
-func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) {
- ns.localStore.Put(ctx, chunk)
+// RequestsCacheLen returns the current number of outgoing requests stored in the cache
+func (n *NetStore) RequestsCacheLen() int {
+ return n.fetchers.Len()
}
-// Close chunk store
-func (ns *NetStore) Close() {
- ns.localStore.Close()
+// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the
+// peers who have requested it and did not receive it yet.
+type fetcher struct {
+ addr Address // address of chunk
+ chunk Chunk // fetcher can set the chunk on the fetcher
+ deliveredC chan struct{} // chan signalling chunk delivery to requests
+ cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore)
+ netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context
+ cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called
+ peers *sync.Map // the peers which asked for the chunk
+ requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
+ deliverOnce *sync.Once // guarantees that we only close deliveredC once
+}
+
+// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually
+// does the retrieval (in non-test cases this is coming from the network package). cancel function is
+// called either
+// 1. when the chunk has been fetched all peers have been either notified or their context has been done
+// 2. the chunk has not been fetched but all context from all the requests has been done
+// The peers map stores all the peers which have requested chunk.
+func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
+ cancelOnce := &sync.Once{} // cancel should only be called once
+ return &fetcher{
+ addr: addr,
+ deliveredC: make(chan struct{}),
+ deliverOnce: &sync.Once{},
+ cancelledC: closeC,
+ netFetcher: nf,
+ cancel: func() {
+ cancelOnce.Do(func() {
+ cancel()
+ })
+ },
+ peers: peers,
+ }
+}
+
+// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available
+// locally.
+func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
+ atomic.AddInt32(&f.requestCnt, 1)
+ defer func() {
+ // if all the requests are done the fetcher can be cancelled
+ if atomic.AddInt32(&f.requestCnt, -1) == 0 {
+ f.cancel()
+ }
+ }()
+
+ // The peer asking for the chunk. Store in the shared peers map, but delete after the request
+ // has been delivered
+ peer := rctx.Value("peer")
+ if peer != nil {
+ f.peers.Store(peer, time.Now())
+ defer f.peers.Delete(peer)
+ }
+
+ // If there is a source in the context then it is an offer, otherwise a request
+ sourceIF := rctx.Value("source")
+ if sourceIF != nil {
+ var source *discover.NodeID
+ id := discover.MustHexID(sourceIF.(string))
+ source = &id
+ f.netFetcher.Offer(rctx, source)
+ } else {
+ f.netFetcher.Request(rctx)
+ }
+
+ // wait until either the chunk is delivered or the context is done
+ select {
+ case <-rctx.Done():
+ return nil, rctx.Err()
+ case <-f.deliveredC:
+ return f.chunk, nil
+ case <-f.cancelledC:
+ return nil, fmt.Errorf("fetcher cancelled")
+ }
+}
+
+// deliver is called by NetStore.Put to notify all pending requests
+func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
+ f.deliverOnce.Do(func() {
+ f.chunk = ch
+ // closing the deliveredC channel will terminate ongoing requests
+ close(f.deliveredC)
+ })
}