diff options
author | Ferenc Szabo <frncmx@gmail.com> | 2018-11-13 14:41:01 +0800 |
---|---|---|
committer | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-11-13 14:41:01 +0800 |
commit | 8080265f3f591d33e127a924724a8bfe5ced6475 (patch) | |
tree | f5556b59dee64d399d61b6f7cb4e97bde69f29d0 /swarm/storage | |
parent | 1212c7b844e3ef13cbb5476b088eae27782535b4 (diff) | |
download | dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.gz dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.bz2 dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.lz dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.xz dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.zst dexon-8080265f3f591d33e127a924724a8bfe5ced6475.zip |
swarm/storage: fix access count on dbstore after cache hit (#17978)
Access count was not incremented when chunk was retrieved
from cache. So the garbage collector might have deleted the most
frequently accessed chunk from disk.
Co-authored-by: Ferenc Szabo <ferenc.szabo@ethereum.org>
Diffstat (limited to 'swarm/storage')
-rw-r--r-- | swarm/storage/ldbstore.go | 48 | ||||
-rw-r--r-- | swarm/storage/ldbstore_test.go | 41 | ||||
-rw-r--r-- | swarm/storage/localstore.go | 1 | ||||
-rw-r--r-- | swarm/storage/localstore_test.go | 65 |
4 files changed, 138 insertions, 17 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 9feb68741..46e040250 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -186,6 +186,20 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { return s, nil } +// MarkAccessed increments the access counter as a best effort for a chunk, so +// the chunk won't get garbage collected. +func (s *LDBStore) MarkAccessed(addr Address) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.closed { + return + } + + proximity := s.po(addr) + s.tryAccessIdx(addr, proximity) +} + // initialize and set values for processing of gc round func (s *LDBStore) startGC(c int) { @@ -349,6 +363,7 @@ func (s *LDBStore) collectGarbage() error { s.delete(s.gc.batch.Batch, index, keyIdx, po) singleIterationCount++ s.gc.count++ + log.Trace("garbage collect enqueued chunk for deletion", "key", hash) // break if target is not on max garbage batch boundary if s.gc.count >= s.gc.target { @@ -685,12 +700,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { idata, err := s.db.Get(ikey) if err != nil { s.doPut(chunk, &index, po) - } else { - log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po) - decodeIndex(idata, &index) } - index.Access = s.accessCnt - s.accessCnt++ idata = encodeIndex(&index) s.batch.Put(ikey, idata) @@ -723,7 +733,8 @@ func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { s.entryCnt++ dbEntryCount.Inc(1) s.dataIdx++ - + index.Access = s.accessCnt + s.accessCnt++ cntKey := make([]byte, 2) cntKey[0] = keyDistanceCnt cntKey[1] = po @@ -796,18 +807,23 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { } } -// try to find index; if found, update access cnt and return true -func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool { +// tryAccessIdx tries to find index entry. If found then increments the access +// count for garbage collection and returns the index entry and true for found, +// otherwise returns nil and false. +func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) { + ikey := getIndexKey(addr) idata, err := s.db.Get(ikey) if err != nil { - return false + return nil, false } + + index := new(dpaDBIndex) decodeIndex(idata, index) oldGCIdxKey := getGCIdxKey(index) s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) - s.accessCnt++ index.Access = s.accessCnt idata = encodeIndex(index) + s.accessCnt++ s.batch.Put(ikey, idata) newGCIdxKey := getGCIdxKey(index) newGCIdxData := getGCIdxValue(index, po, ikey) @@ -817,7 +833,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool { case s.batchesC <- struct{}{}: default: } - return true + return index, true } // GetSchema is returning the current named schema of the datastore as read from LevelDB @@ -858,12 +874,12 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) // TODO: To conform with other private methods of this object indices should not be updated func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { - var indx dpaDBIndex if s.closed { return nil, ErrDBClosed } proximity := s.po(addr) - if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) { + index, found := s.tryAccessIdx(addr, proximity) + if found { var data []byte if s.getDataFunc != nil { // if getDataFunc is defined, use it to retrieve the chunk data @@ -874,12 +890,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { } } else { // default DbStore functionality to retrieve chunk data - datakey := getDataKey(indx.Idx, proximity) + datakey := getDataKey(index.Idx, proximity) data, err = s.db.Get(datakey) - log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity) + log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity) if err != nil { log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err) - s.deleteNow(&indx, getIndexKey(addr), s.po(addr)) + s.deleteNow(index, getIndexKey(addr), s.po(addr)) return } } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 48af8c57c..22213b12d 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -31,7 +31,6 @@ import ( ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" - ldberrors "github.com/syndtr/goleveldb/leveldb/errors" ) @@ -105,6 +104,46 @@ func testDbStoreCorrect(n int, chunksize int64, mock bool, t *testing.T) { testStoreCorrect(db, n, chunksize, t) } +func TestMarkAccessed(t *testing.T) { + db, cleanup, err := newTestDbStore(false, true) + defer cleanup() + if err != nil { + t.Fatalf("init dbStore failed: %v", err) + } + + h := GenerateRandomChunk(ch.DefaultSize) + + db.Put(context.Background(), h) + + var index dpaDBIndex + addr := h.Address() + idxk := getIndexKey(addr) + + idata, err := db.db.Get(idxk) + if err != nil { + t.Fatal(err) + } + decodeIndex(idata, &index) + + if index.Access != 0 { + t.Fatalf("Expected the access index to be %d, but it is %d", 0, index.Access) + } + + db.MarkAccessed(addr) + db.writeCurrentBatch() + + idata, err = db.db.Get(idxk) + if err != nil { + t.Fatal(err) + } + decodeIndex(idata, &index) + + if index.Access != 1 { + t.Fatalf("Expected the access index to be %d, but it is %d", 1, index.Access) + } + +} + func TestDbStoreRandom_1(t *testing.T) { testDbStoreRandom(1, 0, false, t) } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 4fa6fb2f6..6971d759e 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -153,6 +153,7 @@ func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err e if err == nil { metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) + go ls.DbStore.MarkAccessed(addr) return chunk, nil } diff --git a/swarm/storage/localstore_test.go b/swarm/storage/localstore_test.go index 10f43f30f..7a07726d1 100644 --- a/swarm/storage/localstore_test.go +++ b/swarm/storage/localstore_test.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "testing" + "time" ch "github.com/ethereum/go-ethereum/swarm/chunk" ) @@ -144,3 +145,67 @@ func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs [] } return hs, errs } + +// TestGetFrequentlyAccessedChunkWontGetGarbageCollected tests that the most +// frequently accessed chunk is not garbage collected from LDBStore, i.e., +// from disk when we are at the capacity and garbage collector runs. For that +// we start putting random chunks into the DB while continuously accessing the +// chunk we care about then check if we can still retrieve it from disk. +func TestGetFrequentlyAccessedChunkWontGetGarbageCollected(t *testing.T) { + ldbCap := defaultGCRatio + store, cleanup := setupLocalStore(t, ldbCap) + defer cleanup() + + var chunks []Chunk + for i := 0; i < ldbCap; i++ { + chunks = append(chunks, GenerateRandomChunk(ch.DefaultSize)) + } + + mostAccessed := chunks[0].Address() + for _, chunk := range chunks { + if err := store.Put(context.Background(), chunk); err != nil { + t.Fatal(err) + } + + if _, err := store.Get(context.Background(), mostAccessed); err != nil { + t.Fatal(err) + } + // Add time for MarkAccessed() to be able to finish in a separate Goroutine + time.Sleep(1 * time.Millisecond) + } + + store.DbStore.collectGarbage() + if _, err := store.DbStore.Get(context.Background(), mostAccessed); err != nil { + t.Logf("most frequntly accessed chunk not found on disk (key: %v)", mostAccessed) + t.Fatal(err) + } + +} + +func setupLocalStore(t *testing.T, ldbCap int) (ls *LocalStore, cleanup func()) { + t.Helper() + + var err error + datadir, err := ioutil.TempDir("", "storage") + if err != nil { + t.Fatal(err) + } + + params := &LocalStoreParams{ + StoreParams: NewStoreParams(uint64(ldbCap), uint(ldbCap), nil, nil), + } + params.Init(datadir) + + store, err := NewLocalStore(params, nil) + if err != nil { + _ = os.RemoveAll(datadir) + t.Fatal(err) + } + + cleanup = func() { + store.Close() + _ = os.RemoveAll(datadir) + } + + return store, cleanup +} |