diff options
Diffstat (limited to 'swarm/storage/netstore.go')
-rw-r--r-- | swarm/storage/netstore.go | 45 |
1 files changed, 16 insertions, 29 deletions
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 7741b8f7b..b675384ce 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/opentracing/opentracing-go" @@ -49,8 +50,8 @@ type NetFetcher interface { // fetchers are unique to a chunk and are stored in fetchers LRU memory cache // fetchFuncFactory is a factory object to create a fetch function for a specific chunk address type NetStore struct { + chunk.Store mu sync.Mutex - store SyncChunkStore fetchers *lru.Cache NewNetFetcherFunc NewNetFetcherFunc closeC chan struct{} @@ -60,13 +61,13 @@ var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if re // NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a // constructor function that can create a fetch function for a specific chunk address. -func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) { +func NewNetStore(store chunk.Store, nnf NewNetFetcherFunc) (*NetStore, error) { fetchers, err := lru.New(defaultChunkRequestsCacheCapacity) if err != nil { return nil, err } return &NetStore{ - store: store, + Store: store, fetchers: fetchers, NewNetFetcherFunc: nnf, closeC: make(chan struct{}), @@ -75,14 +76,14 @@ func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) // Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in // the fetchers cache -func (n *NetStore) Put(ctx context.Context, ch Chunk) error { +func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool, error) { n.mu.Lock() defer n.mu.Unlock() // put to the chunk to the store, there should be no error - err := n.store.Put(ctx, ch) + exists, err := n.Store.Put(ctx, mode, ch) if err != nil { - return err + return exists, err } // if chunk is now put in the store, check if there was an active fetcher and call deliver on it @@ -92,15 +93,15 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error { log.Trace("n.getFetcher deliver", "ref", ch.Address()) f.deliver(ctx, ch) } - return nil + return exists, nil } // Get retrieves the chunk from the NetStore DPA synchronously. // It calls NetStore.get, and if the chunk is not in local Storage // it calls fetch with the request, which blocks until the chunk // arrived or context is done -func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) { - chunk, fetch, err := n.get(rctx, ref) +func (n *NetStore) Get(rctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, error) { + chunk, fetch, err := n.get(rctx, mode, ref) if err != nil { return nil, err } @@ -118,18 +119,10 @@ func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) { return fetch(rctx) } -func (n *NetStore) BinIndex(po uint8) uint64 { - return n.store.BinIndex(po) -} - -func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { - return n.store.Iterator(from, to, po, f) -} - // FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function, // which returns after the chunk is available or the context is done func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error { - chunk, fetch, _ := n.get(ctx, ref) + chunk, fetch, _ := n.get(ctx, chunk.ModeGetRequest, ref) if chunk != nil { return nil } @@ -140,9 +133,8 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont } // Close chunk store -func (n *NetStore) Close() { +func (n *NetStore) Close() (err error) { close(n.closeC) - n.store.Close() wg := sync.WaitGroup{} for _, key := range n.fetchers.Keys() { @@ -162,6 +154,8 @@ func (n *NetStore) Close() { } } wg.Wait() + + return n.Store.Close() } // get attempts at retrieving the chunk from LocalStore @@ -172,11 +166,11 @@ func (n *NetStore) Close() { // or all fetcher contexts are done. // It returns a chunk, a fetcher function and an error // If chunk is nil, the returned fetch function needs to be called with a context to return the chunk. -func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) { +func (n *NetStore) get(ctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, func(context.Context) (Chunk, error), error) { n.mu.Lock() defer n.mu.Unlock() - chunk, err := n.store.Get(ctx, ref) + chunk, err := n.Store.Get(ctx, mode, ref) if err != nil { // TODO: Fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped. if err != ErrChunkNotFound && err != leveldb.ErrNotFound { @@ -192,13 +186,6 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co return chunk, nil, nil } -// Has is the storage layer entry point to query the underlying -// database to return if it has a chunk or not. -// Called from the DebugAPI -func (n *NetStore) Has(ctx context.Context, ref Address) bool { - return n.store.Has(ctx, ref) -} - // getOrCreateFetcher attempts at retrieving an existing fetchers // if none exists, creates one and saves it in the fetchers cache // caller must hold the lock |