diff options
author | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-06-22 05:00:43 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-22 05:00:43 +0800 |
commit | eaff89291ce998ba4bf9b9816ca8a15c8b85f440 (patch) | |
tree | c77d7a06627a1a7f578d0fec8e39788e66672e53 /swarm/storage/localstore.go | |
parent | d926bf2c7e3182d694c15829a37a0ca7331cd03c (diff) | |
parent | e187711c6545487d4cac3701f0f506bb536234e2 (diff) | |
download | dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.gz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.bz2 dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.lz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.xz dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.tar.zst dexon-eaff89291ce998ba4bf9b9816ca8a15c8b85f440.zip |
Merge pull request #17041 from ethersphere/swarm-network-rewrite-merge
Swarm POC3 - happy solstice
Diffstat (limited to 'swarm/storage/localstore.go')
-rw-r--r-- | swarm/storage/localstore.go | 202 |
1 files changed, 167 insertions, 35 deletions
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index ece0c8615..4c57086fa 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -18,76 +18,208 @@ package storage import ( "encoding/binary" + "fmt" + "path/filepath" + "sync" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage/mock" ) -//metrics variables -var ( - dbStorePutCounter = metrics.NewRegisteredCounter("storage.db.dbstore.put.count", nil) -) +type LocalStoreParams struct { + *StoreParams + ChunkDbPath string + Validators []ChunkValidator `toml:"-"` +} + +func NewDefaultLocalStoreParams() *LocalStoreParams { + return &LocalStoreParams{ + StoreParams: NewDefaultStoreParams(), + } +} + +//this can only finally be set after all config options (file, cmd line, env vars) +//have been evaluated +func (p *LocalStoreParams) Init(path string) { + if p.ChunkDbPath == "" { + p.ChunkDbPath = filepath.Join(path, "chunks") + } +} // LocalStore is a combination of inmemory db over a disk persisted db // implements a Get/Put with fallback (caching) logic using any 2 ChunkStores type LocalStore struct { - memStore ChunkStore - DbStore ChunkStore + Validators []ChunkValidator + memStore *MemStore + DbStore *LDBStore + mu sync.Mutex } // This constructor uses MemStore and DbStore as components -func NewLocalStore(hash SwarmHasher, params *StoreParams) (*LocalStore, error) { - dbStore, err := NewDbStore(params.ChunkDbPath, hash, params.DbCapacity, params.Radius) +func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalStore, error) { + ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath) + dbStore, err := NewMockDbStore(ldbparams, mockStore) if err != nil { return nil, err } return &LocalStore{ - memStore: NewMemStore(dbStore, params.CacheCapacity), - DbStore: dbStore, + memStore: NewMemStore(params.StoreParams, dbStore), + DbStore: dbStore, + Validators: params.Validators, }, nil } -func (self *LocalStore) CacheCounter() uint64 { - return uint64(self.memStore.(*MemStore).Counter()) -} - -func (self *LocalStore) DbCounter() uint64 { - return self.DbStore.(*DbStore).Counter() +func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { + ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath) + dbStore, err := NewLDBStore(ldbparams) + if err != nil { + return nil, err + } + localStore := &LocalStore{ + memStore: NewMemStore(params.StoreParams, dbStore), + DbStore: dbStore, + Validators: params.Validators, + } + return localStore, nil } -// LocalStore is itself a chunk store -// unsafe, in that the data is not integrity checked -func (self *LocalStore) Put(chunk *Chunk) { - chunk.dbStored = make(chan bool) - self.memStore.Put(chunk) - if chunk.wg != nil { - chunk.wg.Add(1) +// Put is responsible for doing validation and storage of the chunk +// by using configured ChunkValidators, MemStore and LDBStore. +// If the chunk is not valid, its GetErrored function will +// return ErrChunkInvalid. +// This method will check if the chunk is already in the MemStore +// and it will return it if it is. If there is an error from +// the MemStore.Get, it will be returned by calling GetErrored +// on the chunk. +// This method is responsible for closing Chunk.ReqC channel +// when the chunk is stored in memstore. +// After the LDBStore.Put, it is ensured that the MemStore +// contains the chunk with the same data, but nil ReqC channel. +func (ls *LocalStore) Put(chunk *Chunk) { + if l := len(chunk.SData); l < 9 { + log.Debug("incomplete chunk data", "addr", chunk.Addr, "length", l) + chunk.SetErrored(ErrChunkInvalid) + chunk.markAsStored() + return + } + valid := true + for _, v := range ls.Validators { + if valid = v.Validate(chunk.Addr, chunk.SData); valid { + break + } } - go func() { - dbStorePutCounter.Inc(1) - self.DbStore.Put(chunk) - if chunk.wg != nil { - chunk.wg.Done() + if !valid { + log.Trace("invalid content address", "addr", chunk.Addr) + chunk.SetErrored(ErrChunkInvalid) + chunk.markAsStored() + return + } + + log.Trace("localstore.put", "addr", chunk.Addr) + + ls.mu.Lock() + defer ls.mu.Unlock() + + chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) + + memChunk, err := ls.memStore.Get(chunk.Addr) + switch err { + case nil: + if memChunk.ReqC == nil { + chunk.markAsStored() + return } - }() + case ErrChunkNotFound: + default: + chunk.SetErrored(err) + return + } + + ls.DbStore.Put(chunk) + + // chunk is no longer a request, but a chunk with data, so replace it in memStore + newc := NewChunk(chunk.Addr, nil) + newc.SData = chunk.SData + newc.Size = chunk.Size + newc.dbStoredC = chunk.dbStoredC + + ls.memStore.Put(newc) + + if memChunk != nil && memChunk.ReqC != nil { + close(memChunk.ReqC) + } } // Get(chunk *Chunk) looks up a chunk in the local stores // This method is blocking until the chunk is retrieved // so additional timeout may be needed to wrap this call if // ChunkStores are remote and can have long latency -func (self *LocalStore) Get(key Key) (chunk *Chunk, err error) { - chunk, err = self.memStore.Get(key) +func (ls *LocalStore) Get(addr Address) (chunk *Chunk, err error) { + ls.mu.Lock() + defer ls.mu.Unlock() + + return ls.get(addr) +} + +func (ls *LocalStore) get(addr Address) (chunk *Chunk, err error) { + chunk, err = ls.memStore.Get(addr) if err == nil { + if chunk.ReqC != nil { + select { + case <-chunk.ReqC: + default: + metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1) + return chunk, ErrFetching + } + } + metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) return } - chunk, err = self.DbStore.Get(key) + metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) + chunk, err = ls.DbStore.Get(addr) if err != nil { + metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) return } chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - self.memStore.Put(chunk) + ls.memStore.Put(chunk) return } -// Close local store -func (self *LocalStore) Close() {} +// retrieve logic common for local and network chunk retrieval requests +func (ls *LocalStore) GetOrCreateRequest(addr Address) (chunk *Chunk, created bool) { + metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1) + + ls.mu.Lock() + defer ls.mu.Unlock() + + var err error + chunk, err = ls.get(addr) + if err == nil && chunk.GetErrored() == nil { + metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1) + log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr)) + return chunk, false + } + if err == ErrFetching && chunk.GetErrored() == nil { + metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1) + log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC)) + return chunk, false + } + // no data and no request status + metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1) + log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr)) + chunk = NewChunk(addr, make(chan bool)) + ls.memStore.Put(chunk) + return chunk, true +} + +// RequestsCacheLen returns the current number of outgoing requests stored in the cache +func (ls *LocalStore) RequestsCacheLen() int { + return ls.memStore.requests.Len() +} + +// Close the local store +func (ls *LocalStore) Close() { + ls.DbStore.Close() +} |