aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/localstore.go')
-rw-r--r--swarm/storage/localstore.go110
1 files changed, 37 insertions, 73 deletions
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
index 9e3474979..04701ee69 100644
--- a/swarm/storage/localstore.go
+++ b/swarm/storage/localstore.go
@@ -18,8 +18,6 @@ package storage
import (
"context"
- "encoding/binary"
- "fmt"
"path/filepath"
"sync"
@@ -97,123 +95,89 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) {
// when the chunk is stored in memstore.
// After the LDBStore.Put, it is ensured that the MemStore
// contains the chunk with the same data, but nil ReqC channel.
-func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) {
+func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error {
valid := true
// ls.Validators contains a list of one validator per chunk type.
// if one validator succeeds, then the chunk is valid
for _, v := range ls.Validators {
- if valid = v.Validate(chunk.Addr, chunk.SData); valid {
+ if valid = v.Validate(chunk.Address(), chunk.Data()); valid {
break
}
}
if !valid {
- log.Trace("invalid chunk", "addr", chunk.Addr, "len", len(chunk.SData))
- chunk.SetErrored(ErrChunkInvalid)
- chunk.markAsStored()
- return
+ return ErrChunkInvalid
}
- log.Trace("localstore.put", "addr", chunk.Addr)
-
+ log.Trace("localstore.put", "key", chunk.Address())
ls.mu.Lock()
defer ls.mu.Unlock()
- chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
-
- memChunk, err := ls.memStore.Get(ctx, chunk.Addr)
- switch err {
- case nil:
- if memChunk.ReqC == nil {
- chunk.markAsStored()
- return
- }
- case ErrChunkNotFound:
- default:
- chunk.SetErrored(err)
- return
+ _, err := ls.memStore.Get(ctx, chunk.Address())
+ if err == nil {
+ return nil
}
-
- ls.DbStore.Put(ctx, chunk)
-
- // chunk is no longer a request, but a chunk with data, so replace it in memStore
- newc := NewChunk(chunk.Addr, nil)
- newc.SData = chunk.SData
- newc.Size = chunk.Size
- newc.dbStoredC = chunk.dbStoredC
-
- ls.memStore.Put(ctx, newc)
-
- if memChunk != nil && memChunk.ReqC != nil {
- close(memChunk.ReqC)
+ if err != nil && err != ErrChunkNotFound {
+ return err
}
+ ls.memStore.Put(ctx, chunk)
+ err = ls.DbStore.Put(ctx, chunk)
+ return err
}
// Get(chunk *Chunk) looks up a chunk in the local stores
// This method is blocking until the chunk is retrieved
// so additional timeout may be needed to wrap this call if
// ChunkStores are remote and can have long latency
-func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
+func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.get(ctx, addr)
}
-func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
+func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) {
chunk, err = ls.memStore.Get(ctx, addr)
+
+ if err != nil && err != ErrChunkNotFound {
+ metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
+ return nil, err
+ }
+
if err == nil {
- if chunk.ReqC != nil {
- select {
- case <-chunk.ReqC:
- default:
- metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1)
- return chunk, ErrFetching
- }
- }
metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
- return
+ return chunk, nil
}
+
metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1)
chunk, err = ls.DbStore.Get(ctx, addr)
if err != nil {
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
- return
+ return nil, err
}
- chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+
ls.memStore.Put(ctx, chunk)
- return
+ return chunk, nil
}
-// retrieve logic common for local and network chunk retrieval requests
-func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) {
- metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1)
-
+func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error {
ls.mu.Lock()
defer ls.mu.Unlock()
- var err error
- chunk, err = ls.get(ctx, addr)
- if err == nil && chunk.GetErrored() == nil {
- metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1)
- log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr))
- return chunk, false
+ _, err := ls.get(ctx, addr)
+ if err == nil {
+ return nil
}
- if err == ErrFetching && chunk.GetErrored() == nil {
- metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1)
- log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC))
- return chunk, false
+ return func(context.Context) error {
+ return err
}
- // no data and no request status
- metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1)
- log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr))
- chunk = NewChunk(addr, make(chan bool))
- ls.memStore.Put(ctx, chunk)
- return chunk, true
}
-// RequestsCacheLen returns the current number of outgoing requests stored in the cache
-func (ls *LocalStore) RequestsCacheLen() int {
- return ls.memStore.requests.Len()
+func (ls *LocalStore) BinIndex(po uint8) uint64 {
+ return ls.DbStore.BinIndex(po)
+}
+
+func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
+ return ls.DbStore.SyncIterator(from, to, po, f)
}
// Close the local store