aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage
diff options
context:
space:
mode:
authorFerenc Szabo <frncmx@gmail.com>2018-11-13 14:41:01 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2018-11-13 14:41:01 +0800
commit8080265f3f591d33e127a924724a8bfe5ced6475 (patch)
treef5556b59dee64d399d61b6f7cb4e97bde69f29d0 /swarm/storage
parent1212c7b844e3ef13cbb5476b088eae27782535b4 (diff)
downloaddexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar
dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.gz
dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.bz2
dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.lz
dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.xz
dexon-8080265f3f591d33e127a924724a8bfe5ced6475.tar.zst
dexon-8080265f3f591d33e127a924724a8bfe5ced6475.zip
swarm/storage: fix access count on dbstore after cache hit (#17978)
Access count was not incremented when chunk was retrieved from cache. So the garbage collector might have deleted the most frequently accessed chunk from disk. Co-authored-by: Ferenc Szabo <ferenc.szabo@ethereum.org>
Diffstat (limited to 'swarm/storage')
-rw-r--r--swarm/storage/ldbstore.go48
-rw-r--r--swarm/storage/ldbstore_test.go41
-rw-r--r--swarm/storage/localstore.go1
-rw-r--r--swarm/storage/localstore_test.go65
4 files changed, 138 insertions, 17 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 9feb68741..46e040250 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -186,6 +186,20 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
return s, nil
}
+// MarkAccessed increments the access counter as a best effort for a chunk, so
+// the chunk won't get garbage collected.
+func (s *LDBStore) MarkAccessed(addr Address) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.closed {
+ return
+ }
+
+ proximity := s.po(addr)
+ s.tryAccessIdx(addr, proximity)
+}
+
// initialize and set values for processing of gc round
func (s *LDBStore) startGC(c int) {
@@ -349,6 +363,7 @@ func (s *LDBStore) collectGarbage() error {
s.delete(s.gc.batch.Batch, index, keyIdx, po)
singleIterationCount++
s.gc.count++
+ log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
// break if target is not on max garbage batch boundary
if s.gc.count >= s.gc.target {
@@ -685,12 +700,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
idata, err := s.db.Get(ikey)
if err != nil {
s.doPut(chunk, &index, po)
- } else {
- log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po)
- decodeIndex(idata, &index)
}
- index.Access = s.accessCnt
- s.accessCnt++
idata = encodeIndex(&index)
s.batch.Put(ikey, idata)
@@ -723,7 +733,8 @@ func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
s.entryCnt++
dbEntryCount.Inc(1)
s.dataIdx++
-
+ index.Access = s.accessCnt
+ s.accessCnt++
cntKey := make([]byte, 2)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
@@ -796,18 +807,23 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
}
}
-// try to find index; if found, update access cnt and return true
-func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool {
+// tryAccessIdx tries to find index entry. If found then increments the access
+// count for garbage collection and returns the index entry and true for found,
+// otherwise returns nil and false.
+func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
+ ikey := getIndexKey(addr)
idata, err := s.db.Get(ikey)
if err != nil {
- return false
+ return nil, false
}
+
+ index := new(dpaDBIndex)
decodeIndex(idata, index)
oldGCIdxKey := getGCIdxKey(index)
s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- s.accessCnt++
index.Access = s.accessCnt
idata = encodeIndex(index)
+ s.accessCnt++
s.batch.Put(ikey, idata)
newGCIdxKey := getGCIdxKey(index)
newGCIdxData := getGCIdxValue(index, po, ikey)
@@ -817,7 +833,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool {
case s.batchesC <- struct{}{}:
default:
}
- return true
+ return index, true
}
// GetSchema is returning the current named schema of the datastore as read from LevelDB
@@ -858,12 +874,12 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error)
// TODO: To conform with other private methods of this object indices should not be updated
func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
- var indx dpaDBIndex
if s.closed {
return nil, ErrDBClosed
}
proximity := s.po(addr)
- if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) {
+ index, found := s.tryAccessIdx(addr, proximity)
+ if found {
var data []byte
if s.getDataFunc != nil {
// if getDataFunc is defined, use it to retrieve the chunk data
@@ -874,12 +890,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
}
} else {
// default DbStore functionality to retrieve chunk data
- datakey := getDataKey(indx.Idx, proximity)
+ datakey := getDataKey(index.Idx, proximity)
data, err = s.db.Get(datakey)
- log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
+ log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
if err != nil {
log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
- s.deleteNow(&indx, getIndexKey(addr), s.po(addr))
+ s.deleteNow(index, getIndexKey(addr), s.po(addr))
return
}
}
diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go
index 48af8c57c..22213b12d 100644
--- a/swarm/storage/ldbstore_test.go
+++ b/swarm/storage/ldbstore_test.go
@@ -31,7 +31,6 @@ import (
ch "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
-
ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
)
@@ -105,6 +104,46 @@ func testDbStoreCorrect(n int, chunksize int64, mock bool, t *testing.T) {
testStoreCorrect(db, n, chunksize, t)
}
+func TestMarkAccessed(t *testing.T) {
+ db, cleanup, err := newTestDbStore(false, true)
+ defer cleanup()
+ if err != nil {
+ t.Fatalf("init dbStore failed: %v", err)
+ }
+
+ h := GenerateRandomChunk(ch.DefaultSize)
+
+ db.Put(context.Background(), h)
+
+ var index dpaDBIndex
+ addr := h.Address()
+ idxk := getIndexKey(addr)
+
+ idata, err := db.db.Get(idxk)
+ if err != nil {
+ t.Fatal(err)
+ }
+ decodeIndex(idata, &index)
+
+ if index.Access != 0 {
+ t.Fatalf("Expected the access index to be %d, but it is %d", 0, index.Access)
+ }
+
+ db.MarkAccessed(addr)
+ db.writeCurrentBatch()
+
+ idata, err = db.db.Get(idxk)
+ if err != nil {
+ t.Fatal(err)
+ }
+ decodeIndex(idata, &index)
+
+ if index.Access != 1 {
+ t.Fatalf("Expected the access index to be %d, but it is %d", 1, index.Access)
+ }
+
+}
+
func TestDbStoreRandom_1(t *testing.T) {
testDbStoreRandom(1, 0, false, t)
}
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
index 4fa6fb2f6..6971d759e 100644
--- a/swarm/storage/localstore.go
+++ b/swarm/storage/localstore.go
@@ -153,6 +153,7 @@ func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err e
if err == nil {
metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
+ go ls.DbStore.MarkAccessed(addr)
return chunk, nil
}
diff --git a/swarm/storage/localstore_test.go b/swarm/storage/localstore_test.go
index 10f43f30f..7a07726d1 100644
--- a/swarm/storage/localstore_test.go
+++ b/swarm/storage/localstore_test.go
@@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"testing"
+ "time"
ch "github.com/ethereum/go-ethereum/swarm/chunk"
)
@@ -144,3 +145,67 @@ func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs []
}
return hs, errs
}
+
+// TestGetFrequentlyAccessedChunkWontGetGarbageCollected tests that the most
+// frequently accessed chunk is not garbage collected from LDBStore, i.e.,
+// from disk when we are at the capacity and garbage collector runs. For that
+// we start putting random chunks into the DB while continuously accessing the
+// chunk we care about then check if we can still retrieve it from disk.
+func TestGetFrequentlyAccessedChunkWontGetGarbageCollected(t *testing.T) {
+ ldbCap := defaultGCRatio
+ store, cleanup := setupLocalStore(t, ldbCap)
+ defer cleanup()
+
+ var chunks []Chunk
+ for i := 0; i < ldbCap; i++ {
+ chunks = append(chunks, GenerateRandomChunk(ch.DefaultSize))
+ }
+
+ mostAccessed := chunks[0].Address()
+ for _, chunk := range chunks {
+ if err := store.Put(context.Background(), chunk); err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := store.Get(context.Background(), mostAccessed); err != nil {
+ t.Fatal(err)
+ }
+ // Add time for MarkAccessed() to be able to finish in a separate Goroutine
+ time.Sleep(1 * time.Millisecond)
+ }
+
+ store.DbStore.collectGarbage()
+ if _, err := store.DbStore.Get(context.Background(), mostAccessed); err != nil {
+ t.Logf("most frequntly accessed chunk not found on disk (key: %v)", mostAccessed)
+ t.Fatal(err)
+ }
+
+}
+
+func setupLocalStore(t *testing.T, ldbCap int) (ls *LocalStore, cleanup func()) {
+ t.Helper()
+
+ var err error
+ datadir, err := ioutil.TempDir("", "storage")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ params := &LocalStoreParams{
+ StoreParams: NewStoreParams(uint64(ldbCap), uint(ldbCap), nil, nil),
+ }
+ params.Init(datadir)
+
+ store, err := NewLocalStore(params, nil)
+ if err != nil {
+ _ = os.RemoveAll(datadir)
+ t.Fatal(err)
+ }
+
+ cleanup = func() {
+ store.Close()
+ _ = os.RemoveAll(datadir)
+ }
+
+ return store, cleanup
+}