diff options
Diffstat (limited to 'swarm/storage/localstore.go')
-rw-r--r-- | swarm/storage/localstore.go | 110 |
1 files changed, 37 insertions, 73 deletions
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 9e3474979..04701ee69 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -18,8 +18,6 @@ package storage import ( "context" - "encoding/binary" - "fmt" "path/filepath" "sync" @@ -97,123 +95,89 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { // when the chunk is stored in memstore. // After the LDBStore.Put, it is ensured that the MemStore // contains the chunk with the same data, but nil ReqC channel. -func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) { +func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error { valid := true // ls.Validators contains a list of one validator per chunk type. // if one validator succeeds, then the chunk is valid for _, v := range ls.Validators { - if valid = v.Validate(chunk.Addr, chunk.SData); valid { + if valid = v.Validate(chunk.Address(), chunk.Data()); valid { break } } if !valid { - log.Trace("invalid chunk", "addr", chunk.Addr, "len", len(chunk.SData)) - chunk.SetErrored(ErrChunkInvalid) - chunk.markAsStored() - return + return ErrChunkInvalid } - log.Trace("localstore.put", "addr", chunk.Addr) - + log.Trace("localstore.put", "key", chunk.Address()) ls.mu.Lock() defer ls.mu.Unlock() - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - - memChunk, err := ls.memStore.Get(ctx, chunk.Addr) - switch err { - case nil: - if memChunk.ReqC == nil { - chunk.markAsStored() - return - } - case ErrChunkNotFound: - default: - chunk.SetErrored(err) - return + _, err := ls.memStore.Get(ctx, chunk.Address()) + if err == nil { + return nil } - - ls.DbStore.Put(ctx, chunk) - - // chunk is no longer a request, but a chunk with data, so replace it in memStore - newc := NewChunk(chunk.Addr, nil) - newc.SData = chunk.SData - newc.Size = chunk.Size - newc.dbStoredC = chunk.dbStoredC - - ls.memStore.Put(ctx, newc) - - if memChunk != nil && memChunk.ReqC != nil { - close(memChunk.ReqC) + if err != nil && err != ErrChunkNotFound { + return err } + ls.memStore.Put(ctx, chunk) + err = ls.DbStore.Put(ctx, chunk) + return err } // Get(chunk *Chunk) looks up a chunk in the local stores // This method is blocking until the chunk is retrieved // so additional timeout may be needed to wrap this call if // ChunkStores are remote and can have long latency -func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) { ls.mu.Lock() defer ls.mu.Unlock() return ls.get(ctx, addr) } -func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) { chunk, err = ls.memStore.Get(ctx, addr) + + if err != nil && err != ErrChunkNotFound { + metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) + return nil, err + } + if err == nil { - if chunk.ReqC != nil { - select { - case <-chunk.ReqC: - default: - metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1) - return chunk, ErrFetching - } - } metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) - return + return chunk, nil } + metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) chunk, err = ls.DbStore.Get(ctx, addr) if err != nil { metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) - return + return nil, err } - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) + ls.memStore.Put(ctx, chunk) - return + return chunk, nil } -// retrieve logic common for local and network chunk retrieval requests -func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1) - +func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error { ls.mu.Lock() defer ls.mu.Unlock() - var err error - chunk, err = ls.get(ctx, addr) - if err == nil && chunk.GetErrored() == nil { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr)) - return chunk, false + _, err := ls.get(ctx, addr) + if err == nil { + return nil } - if err == ErrFetching && chunk.GetErrored() == nil { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC)) - return chunk, false + return func(context.Context) error { + return err } - // no data and no request status - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr)) - chunk = NewChunk(addr, make(chan bool)) - ls.memStore.Put(ctx, chunk) - return chunk, true } -// RequestsCacheLen returns the current number of outgoing requests stored in the cache -func (ls *LocalStore) RequestsCacheLen() int { - return ls.memStore.requests.Len() +func (ls *LocalStore) BinIndex(po uint8) uint64 { + return ls.DbStore.BinIndex(po) +} + +func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { + return ls.DbStore.SyncIterator(from, to, po, f) } // Close the local store |