aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/localstore.go')
-rw-r--r--swarm/storage/localstore.go202
1 files changed, 167 insertions, 35 deletions
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
index ece0c8615..4c57086fa 100644
--- a/swarm/storage/localstore.go
+++ b/swarm/storage/localstore.go
@@ -18,76 +18,208 @@ package storage
import (
"encoding/binary"
+ "fmt"
+ "path/filepath"
+ "sync"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
)
-//metrics variables
-var (
- dbStorePutCounter = metrics.NewRegisteredCounter("storage.db.dbstore.put.count", nil)
-)
+type LocalStoreParams struct {
+ *StoreParams
+ ChunkDbPath string
+ Validators []ChunkValidator `toml:"-"`
+}
+
+func NewDefaultLocalStoreParams() *LocalStoreParams {
+ return &LocalStoreParams{
+ StoreParams: NewDefaultStoreParams(),
+ }
+}
+
+//this can only finally be set after all config options (file, cmd line, env vars)
+//have been evaluated
+func (p *LocalStoreParams) Init(path string) {
+ if p.ChunkDbPath == "" {
+ p.ChunkDbPath = filepath.Join(path, "chunks")
+ }
+}
// LocalStore is a combination of inmemory db over a disk persisted db
// implements a Get/Put with fallback (caching) logic using any 2 ChunkStores
type LocalStore struct {
- memStore ChunkStore
- DbStore ChunkStore
+ Validators []ChunkValidator
+ memStore *MemStore
+ DbStore *LDBStore
+ mu sync.Mutex
}
// This constructor uses MemStore and DbStore as components
-func NewLocalStore(hash SwarmHasher, params *StoreParams) (*LocalStore, error) {
- dbStore, err := NewDbStore(params.ChunkDbPath, hash, params.DbCapacity, params.Radius)
+func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalStore, error) {
+ ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath)
+ dbStore, err := NewMockDbStore(ldbparams, mockStore)
if err != nil {
return nil, err
}
return &LocalStore{
- memStore: NewMemStore(dbStore, params.CacheCapacity),
- DbStore: dbStore,
+ memStore: NewMemStore(params.StoreParams, dbStore),
+ DbStore: dbStore,
+ Validators: params.Validators,
}, nil
}
-func (self *LocalStore) CacheCounter() uint64 {
- return uint64(self.memStore.(*MemStore).Counter())
-}
-
-func (self *LocalStore) DbCounter() uint64 {
- return self.DbStore.(*DbStore).Counter()
+func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) {
+ ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath)
+ dbStore, err := NewLDBStore(ldbparams)
+ if err != nil {
+ return nil, err
+ }
+ localStore := &LocalStore{
+ memStore: NewMemStore(params.StoreParams, dbStore),
+ DbStore: dbStore,
+ Validators: params.Validators,
+ }
+ return localStore, nil
}
-// LocalStore is itself a chunk store
-// unsafe, in that the data is not integrity checked
-func (self *LocalStore) Put(chunk *Chunk) {
- chunk.dbStored = make(chan bool)
- self.memStore.Put(chunk)
- if chunk.wg != nil {
- chunk.wg.Add(1)
+// Put is responsible for doing validation and storage of the chunk
+// by using configured ChunkValidators, MemStore and LDBStore.
+// If the chunk is not valid, its GetErrored function will
+// return ErrChunkInvalid.
+// This method will check if the chunk is already in the MemStore
+// and it will return it if it is. If there is an error from
+// the MemStore.Get, it will be returned by calling GetErrored
+// on the chunk.
+// This method is responsible for closing Chunk.ReqC channel
+// when the chunk is stored in memstore.
+// After the LDBStore.Put, it is ensured that the MemStore
+// contains the chunk with the same data, but nil ReqC channel.
+func (ls *LocalStore) Put(chunk *Chunk) {
+ if l := len(chunk.SData); l < 9 {
+ log.Debug("incomplete chunk data", "addr", chunk.Addr, "length", l)
+ chunk.SetErrored(ErrChunkInvalid)
+ chunk.markAsStored()
+ return
+ }
+ valid := true
+ for _, v := range ls.Validators {
+ if valid = v.Validate(chunk.Addr, chunk.SData); valid {
+ break
+ }
}
- go func() {
- dbStorePutCounter.Inc(1)
- self.DbStore.Put(chunk)
- if chunk.wg != nil {
- chunk.wg.Done()
+ if !valid {
+ log.Trace("invalid content address", "addr", chunk.Addr)
+ chunk.SetErrored(ErrChunkInvalid)
+ chunk.markAsStored()
+ return
+ }
+
+ log.Trace("localstore.put", "addr", chunk.Addr)
+
+ ls.mu.Lock()
+ defer ls.mu.Unlock()
+
+ chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+
+ memChunk, err := ls.memStore.Get(chunk.Addr)
+ switch err {
+ case nil:
+ if memChunk.ReqC == nil {
+ chunk.markAsStored()
+ return
}
- }()
+ case ErrChunkNotFound:
+ default:
+ chunk.SetErrored(err)
+ return
+ }
+
+ ls.DbStore.Put(chunk)
+
+ // chunk is no longer a request, but a chunk with data, so replace it in memStore
+ newc := NewChunk(chunk.Addr, nil)
+ newc.SData = chunk.SData
+ newc.Size = chunk.Size
+ newc.dbStoredC = chunk.dbStoredC
+
+ ls.memStore.Put(newc)
+
+ if memChunk != nil && memChunk.ReqC != nil {
+ close(memChunk.ReqC)
+ }
}
// Get(chunk *Chunk) looks up a chunk in the local stores
// This method is blocking until the chunk is retrieved
// so additional timeout may be needed to wrap this call if
// ChunkStores are remote and can have long latency
-func (self *LocalStore) Get(key Key) (chunk *Chunk, err error) {
- chunk, err = self.memStore.Get(key)
+func (ls *LocalStore) Get(addr Address) (chunk *Chunk, err error) {
+ ls.mu.Lock()
+ defer ls.mu.Unlock()
+
+ return ls.get(addr)
+}
+
+func (ls *LocalStore) get(addr Address) (chunk *Chunk, err error) {
+ chunk, err = ls.memStore.Get(addr)
if err == nil {
+ if chunk.ReqC != nil {
+ select {
+ case <-chunk.ReqC:
+ default:
+ metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1)
+ return chunk, ErrFetching
+ }
+ }
+ metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
return
}
- chunk, err = self.DbStore.Get(key)
+ metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1)
+ chunk, err = ls.DbStore.Get(addr)
if err != nil {
+ metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
return
}
chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
- self.memStore.Put(chunk)
+ ls.memStore.Put(chunk)
return
}
-// Close local store
-func (self *LocalStore) Close() {}
+// retrieve logic common for local and network chunk retrieval requests
+func (ls *LocalStore) GetOrCreateRequest(addr Address) (chunk *Chunk, created bool) {
+ metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1)
+
+ ls.mu.Lock()
+ defer ls.mu.Unlock()
+
+ var err error
+ chunk, err = ls.get(addr)
+ if err == nil && chunk.GetErrored() == nil {
+ metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1)
+ log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr))
+ return chunk, false
+ }
+ if err == ErrFetching && chunk.GetErrored() == nil {
+ metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1)
+ log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC))
+ return chunk, false
+ }
+ // no data and no request status
+ metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1)
+ log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr))
+ chunk = NewChunk(addr, make(chan bool))
+ ls.memStore.Put(chunk)
+ return chunk, true
+}
+
+// RequestsCacheLen returns the current number of outgoing requests stored in the cache
+func (ls *LocalStore) RequestsCacheLen() int {
+ return ls.memStore.requests.Len()
+}
+
+// Close the local store
+func (ls *LocalStore) Close() {
+ ls.DbStore.Close()
+}