diff options
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r-- | swarm/storage/netstore.go | 382 |
1 files changed, 238 insertions, 144 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 96a7e51f7..de2d82d2b 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -18,181 +18,275 @@ package storage import ( "context" + "encoding/hex" + "fmt" + "sync" + "sync/atomic" "time" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/spancontext" - opentracing "github.com/opentracing/opentracing-go" + + lru "github.com/hashicorp/golang-lru" ) -var ( - // NetStore.Get timeout for get and get retries - // This is the maximum period that the Get will block. - // If it is reached, Get will return ErrChunkNotFound. - netStoreRetryTimeout = 30 * time.Second - // Minimal period between calling get method on NetStore - // on retry. It protects calling get very frequently if - // it returns ErrChunkNotFound very fast. - netStoreMinRetryDelay = 3 * time.Second - // Timeout interval before retrieval is timed out. - // It is used in NetStore.get on waiting for ReqC to be - // closed on a single retrieve request. - searchTimeout = 10 * time.Second +type ( + NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher ) -// NetStore implements the ChunkStore interface, -// this chunk access layer assumed 2 chunk stores -// local storage eg. LocalStore and network storage eg., NetStore -// access by calling network is blocking with a timeout +type NetFetcher interface { + Request(ctx context.Context) + Offer(ctx context.Context, source *discover.NodeID) +} + +// NetStore is an extension of local storage +// it implements the ChunkStore interface +// on request it initiates remote cloud retrieval using a fetcher +// fetchers are unique to a chunk and are stored in fetchers LRU memory cache +// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address type NetStore struct { - localStore *LocalStore - retrieve func(ctx context.Context, chunk *Chunk) error + mu sync.Mutex + store SyncChunkStore + fetchers *lru.Cache + NewNetFetcherFunc NewNetFetcherFunc + closeC chan struct{} } -func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore { - return &NetStore{localStore, retrieve} +// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a +// constructor function that can create a fetch function for a specific chunk address. +func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) { + fetchers, err := lru.New(defaultChunkRequestsCacheCapacity) + if err != nil { + return nil, err + } + return &NetStore{ + store: store, + fetchers: fetchers, + NewNetFetcherFunc: nnf, + closeC: make(chan struct{}), + }, nil } -// Get is the entrypoint for local retrieve requests -// waits for response or times out -// -// Get uses get method to retrieve request, but retries if the -// ErrChunkNotFound is returned by get, until the netStoreRetryTimeout -// is reached. -func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { - - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "netstore.get.global") - defer sp.Finish() - - timer := time.NewTimer(netStoreRetryTimeout) - defer timer.Stop() - - // result and resultC provide results from the goroutine - // where NetStore.get is called. - type result struct { - chunk *Chunk - err error +// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in +// the fetchers cache +func (n *NetStore) Put(ctx context.Context, ch Chunk) error { + n.mu.Lock() + defer n.mu.Unlock() + + // put to the chunk to the store, there should be no error + err := n.store.Put(ctx, ch) + if err != nil { + return err } - resultC := make(chan result) - - // quitC ensures that retring goroutine is terminated - // when this function returns. - quitC := make(chan struct{}) - defer close(quitC) - - // do retries in a goroutine so that the timer can - // force this method to return after the netStoreRetryTimeout. - go func() { - // limiter ensures that NetStore.get is not called more frequently - // then netStoreMinRetryDelay. If NetStore.get takes longer - // then netStoreMinRetryDelay, the next retry call will be - // without a delay. - limiter := time.NewTimer(netStoreMinRetryDelay) - defer limiter.Stop() - - for { - chunk, err := ns.get(ctx, addr, 0) - if err != ErrChunkNotFound { - // break retry only if the error is nil - // or other error then ErrChunkNotFound - select { - case <-quitC: - // Maybe NetStore.Get function has returned - // by the timer.C while we were waiting for the - // results. Terminate this goroutine. - case resultC <- result{chunk: chunk, err: err}: - // Send the result to the parrent goroutine. - } - return - - } - select { - case <-quitC: - // NetStore.Get function has returned, possibly - // by the timer.C, which makes this goroutine - // not needed. - return - case <-limiter.C: - } - // Reset the limiter for the next iteration. - limiter.Reset(netStoreMinRetryDelay) - log.Debug("NetStore.Get retry chunk", "key", addr) - } - }() - select { - case r := <-resultC: - return r.chunk, r.err - case <-timer.C: - return nil, ErrChunkNotFound + // if chunk is now put in the store, check if there was an active fetcher and call deliver on it + // (this delivers the chunk to requestors via the fetcher) + if f := n.getFetcher(ch.Address()); f != nil { + f.deliver(ctx, ch) + } + return nil +} + +// Get retrieves the chunk from the NetStore DPA synchronously. +// It calls NetStore.get, and if the chunk is not in local Storage +// it calls fetch with the request, which blocks until the chunk +// arrived or context is done +func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) { + chunk, fetch, err := n.get(rctx, ref) + if err != nil { + return nil, err + } + if chunk != nil { + return chunk, nil } + return fetch(rctx) } -// GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter -func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { - return ns.get(ctx, addr, timeout) +func (n *NetStore) BinIndex(po uint8) uint64 { + return n.store.BinIndex(po) } -func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { - if timeout == 0 { - timeout = searchTimeout +func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { + return n.store.Iterator(from, to, po, f) +} + +// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function, +// which returns after the chunk is available or the context is done +func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error { + chunk, fetch, _ := n.get(ctx, ref) + if chunk != nil { + return nil + } + return func(ctx context.Context) error { + _, err := fetch(ctx) + return err } +} - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "netstore.get") - defer sp.Finish() +// Close chunk store +func (n *NetStore) Close() { + close(n.closeC) + n.store.Close() + // TODO: loop through fetchers to cancel them +} - if ns.retrieve == nil { - chunk, err = ns.localStore.Get(ctx, addr) - if err == nil { - return chunk, nil - } - if err != ErrFetching { - return nil, err - } - } else { - var created bool - chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr) +// get attempts at retrieving the chunk from LocalStore +// If it is not found then using getOrCreateFetcher: +// 1. Either there is already a fetcher to retrieve it +// 2. A new fetcher is created and saved in the fetchers cache +// From here on, all Get will hit on this fetcher until the chunk is delivered +// or all fetcher contexts are done. +// It returns a chunk, a fetcher function and an error +// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk. +func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) { + n.mu.Lock() + defer n.mu.Unlock() - if chunk.ReqC == nil { - return chunk, nil + chunk, err := n.store.Get(ctx, ref) + if err != nil { + if err != ErrChunkNotFound { + log.Debug("Received error from LocalStore other than ErrNotFound", "err", err) } + // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one + // if it doesn't exist yet + f := n.getOrCreateFetcher(ref) + // If the caller needs the chunk, it has to use the returned fetch function to get it + return nil, f.Fetch, nil + } - if created { - err := ns.retrieve(ctx, chunk) - if err != nil { - // mark chunk request as failed so that we can retry it later - chunk.SetErrored(ErrChunkUnavailable) - return nil, err - } - } + return chunk, nil, nil +} + +// getOrCreateFetcher attempts at retrieving an existing fetchers +// if none exists, creates one and saves it in the fetchers cache +// caller must hold the lock +func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { + if f := n.getFetcher(ref); f != nil { + return f } - t := time.NewTicker(timeout) - defer t.Stop() + // no fetcher for the given address, we have to create a new one + key := hex.EncodeToString(ref) + // create the context during which fetching is kept alive + ctx, cancel := context.WithCancel(context.Background()) + // destroy is called when all requests finish + destroy := func() { + // remove fetcher from fetchers + n.fetchers.Remove(key) + // stop fetcher by cancelling context called when + // all requests cancelled/timedout or chunk is delivered + cancel() + } + // peers always stores all the peers which have an active request for the chunk. It is shared + // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because + // the peers which requested the chunk should not be requested to deliver it. + peers := &sync.Map{} - select { - case <-t.C: - // mark chunk request as failed so that we can retry - chunk.SetErrored(ErrChunkNotFound) - return nil, ErrChunkNotFound - case <-chunk.ReqC: + fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC) + n.fetchers.Add(key, fetcher) + + return fetcher +} + +// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists, +// otherwise it returns nil +func (n *NetStore) getFetcher(ref Address) *fetcher { + key := hex.EncodeToString(ref) + f, ok := n.fetchers.Get(key) + if ok { + return f.(*fetcher) } - chunk.SetErrored(nil) - return chunk, nil + return nil } -// Put is the entrypoint for local store requests coming from storeLoop -func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) { - ns.localStore.Put(ctx, chunk) +// RequestsCacheLen returns the current number of outgoing requests stored in the cache +func (n *NetStore) RequestsCacheLen() int { + return n.fetchers.Len() } -// Close chunk store -func (ns *NetStore) Close() { - ns.localStore.Close() +// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the +// peers who have requested it and did not receive it yet. +type fetcher struct { + addr Address // address of chunk + chunk Chunk // fetcher can set the chunk on the fetcher + deliveredC chan struct{} // chan signalling chunk delivery to requests + cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore) + netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context + cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called + peers *sync.Map // the peers which asked for the chunk + requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called + deliverOnce *sync.Once // guarantees that we only close deliveredC once +} + +// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually +// does the retrieval (in non-test cases this is coming from the network package). cancel function is +// called either +// 1. when the chunk has been fetched all peers have been either notified or their context has been done +// 2. the chunk has not been fetched but all context from all the requests has been done +// The peers map stores all the peers which have requested chunk. +func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { + cancelOnce := &sync.Once{} // cancel should only be called once + return &fetcher{ + addr: addr, + deliveredC: make(chan struct{}), + deliverOnce: &sync.Once{}, + cancelledC: closeC, + netFetcher: nf, + cancel: func() { + cancelOnce.Do(func() { + cancel() + }) + }, + peers: peers, + } +} + +// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available +// locally. +func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { + atomic.AddInt32(&f.requestCnt, 1) + defer func() { + // if all the requests are done the fetcher can be cancelled + if atomic.AddInt32(&f.requestCnt, -1) == 0 { + f.cancel() + } + }() + + // The peer asking for the chunk. Store in the shared peers map, but delete after the request + // has been delivered + peer := rctx.Value("peer") + if peer != nil { + f.peers.Store(peer, time.Now()) + defer f.peers.Delete(peer) + } + + // If there is a source in the context then it is an offer, otherwise a request + sourceIF := rctx.Value("source") + if sourceIF != nil { + var source *discover.NodeID + id := discover.MustHexID(sourceIF.(string)) + source = &id + f.netFetcher.Offer(rctx, source) + } else { + f.netFetcher.Request(rctx) + } + + // wait until either the chunk is delivered or the context is done + select { + case <-rctx.Done(): + return nil, rctx.Err() + case <-f.deliveredC: + return f.chunk, nil + case <-f.cancelledC: + return nil, fmt.Errorf("fetcher cancelled") + } +} + +// deliver is called by NetStore.Put to notify all pending requests +func (f *fetcher) deliver(ctx context.Context, ch Chunk) { + f.deliverOnce.Do(func() { + f.chunk = ch + // closing the deliveredC channel will terminate ongoing requests + close(f.deliveredC) + }) } |