aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--swarm/storage/ldbstore.go282
-rw-r--r--swarm/storage/ldbstore_test.go219
2 files changed, 358 insertions, 143 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 2a7f51cb3..49508911f 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -32,7 +32,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "sort"
"sync"
"github.com/ethereum/go-ethereum/metrics"
@@ -44,8 +43,13 @@ import (
)
const (
- gcArrayFreeRatio = 0.1
- maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage()
+ defaultGCRatio = 10
+ defaultMaxGCRound = 10000
+ defaultMaxGCBatch = 5000
+
+ wEntryCnt = 1 << 0
+ wIndexCnt = 1 << 1
+ wAccessCnt = 1 << 2
)
var (
@@ -61,6 +65,7 @@ var (
keyData = byte(6)
keyDistanceCnt = byte(7)
keySchema = []byte{8}
+ keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry
)
var (
@@ -68,7 +73,7 @@ var (
)
type gcItem struct {
- idx uint64
+ idx *dpaDBIndex
value uint64
idxKey []byte
po uint8
@@ -89,6 +94,16 @@ func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
}
}
+type garbage struct {
+ maxRound int // maximum number of chunks to delete in one garbage collection round
+ maxBatch int // maximum number of chunks to delete in one db request batch
+ ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db
+ count int // number of chunks deleted in running round
+ target int // number of chunks to delete in running round
+ batch *dbBatch // the delete batch
+ runC chan struct{} // struct in chan means gc is NOT running
+}
+
type LDBStore struct {
db *LDBDatabase
@@ -102,12 +117,12 @@ type LDBStore struct {
hashfunc SwarmHasher
po func(Address) uint8
- batchC chan bool
batchesC chan struct{}
closed bool
batch *dbBatch
lock sync.RWMutex
quit chan struct{}
+ gc *garbage
// Functions encodeDataFunc is used to bypass
// the default functionality of DbStore with
@@ -166,9 +181,33 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
data, _ = s.db.Get(keyDataIdx)
s.dataIdx = BytesToU64(data)
+ // set up garbage collection
+ s.gc = &garbage{
+ maxBatch: defaultMaxGCBatch,
+ maxRound: defaultMaxGCRound,
+ ratio: defaultGCRatio,
+ }
+
+ s.gc.runC = make(chan struct{}, 1)
+ s.gc.runC <- struct{}{}
+
return s, nil
}
+// initialize and set values for processing of gc round
+func (s *LDBStore) startGC(c int) {
+
+ s.gc.count = 0
+ // calculate the target number of deletions
+ if c >= s.gc.maxRound {
+ s.gc.target = s.gc.maxRound
+ } else {
+ s.gc.target = c / s.gc.ratio
+ }
+ s.gc.batch = newBatch()
+ log.Debug("startgc", "requested", c, "target", s.gc.target)
+}
+
// NewMockDbStore creates a new instance of DbStore with
// mockStore set to a provided value. If mockStore argument is nil,
// this function behaves exactly as NewDbStore.
@@ -225,6 +264,31 @@ func getDataKey(idx uint64, po uint8) []byte {
return key
}
+func getGCIdxKey(index *dpaDBIndex) []byte {
+ key := make([]byte, 9)
+ key[0] = keyGCIdx
+ binary.BigEndian.PutUint64(key[1:], index.Access)
+ return key
+}
+
+func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
+ val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32
+ val[0] = po
+ binary.BigEndian.PutUint64(val[1:], index.Idx)
+ copy(val[9:], addr)
+ return val
+}
+
+func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
+ index = &dpaDBIndex{
+ Idx: binary.BigEndian.Uint64(val[1:]),
+ Access: binary.BigEndian.Uint64(accessCnt),
+ }
+ po = val[0]
+ addr = val[9:]
+ return
+}
+
func encodeIndex(index *dpaDBIndex) []byte {
data, _ := rlp.EncodeToBytes(index)
return data
@@ -247,55 +311,70 @@ func decodeData(addr Address, data []byte) (*chunk, error) {
return NewChunk(addr, data[32:]), nil
}
-func (s *LDBStore) collectGarbage(ratio float32) {
- log.Trace("collectGarbage", "ratio", ratio)
+func (s *LDBStore) collectGarbage() error {
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
+ // prevent duplicate gc from starting when one is already running
+ select {
+ case <-s.gc.runC:
+ default:
+ return nil
+ }
- it := s.db.NewIterator()
- defer it.Release()
+ s.lock.Lock()
+ entryCnt := s.entryCnt
+ s.lock.Unlock()
- garbage := []*gcItem{}
- gcnt := 0
+ metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
- for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() {
- itkey := it.Key()
+ // calculate the amount of chunks to collect and reset counter
+ s.startGC(int(entryCnt))
+ log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt)
- if (itkey == nil) || (itkey[0] != keyIndex) {
- break
- }
+ var totalDeleted int
+ for s.gc.count < s.gc.target {
+ it := s.db.NewIterator()
+ ok := it.Seek([]byte{keyGCIdx})
+ var singleIterationCount int
- // it.Key() contents change on next call to it.Next(), so we must copy it
- key := make([]byte, len(it.Key()))
- copy(key, it.Key())
+ // every batch needs a lock so we avoid entries changing accessidx in the meantime
+ s.lock.Lock()
+ for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() {
- val := it.Value()
+ // quit if no more access index keys
+ itkey := it.Key()
+ if (itkey == nil) || (itkey[0] != keyGCIdx) {
+ break
+ }
- var index dpaDBIndex
+ // get chunk data entry from access index
+ val := it.Value()
+ index, po, hash := parseGCIdxEntry(itkey[1:], val)
+ keyIdx := make([]byte, 33)
+ keyIdx[0] = keyIndex
+ copy(keyIdx[1:], hash)
- hash := key[1:]
- decodeIndex(val, &index)
- po := s.po(hash)
+ // add delete operation to batch
+ s.delete(s.gc.batch.Batch, index, keyIdx, po)
+ singleIterationCount++
+ s.gc.count++
- gci := &gcItem{
- idxKey: key,
- idx: index.Idx,
- value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below.
- po: po,
+ // break if target is not on max garbage batch boundary
+ if s.gc.count >= s.gc.target {
+ break
+ }
}
- garbage = append(garbage, gci)
- gcnt++
+ s.writeBatch(s.gc.batch, wEntryCnt)
+ s.lock.Unlock()
+ it.Release()
+ log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count)
}
- sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value })
-
- cutoff := int(float32(gcnt) * ratio)
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(cutoff))
+ s.gc.runC <- struct{}{}
+ log.Debug("garbage collect done", "c", s.gc.count)
- for i := 0; i < cutoff; i++ {
- s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po)
- }
+ metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(totalDeleted))
+ return nil
}
// Export writes all chunks from the store to a tar archive, returning the
@@ -474,7 +553,7 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) {
// if chunk is to be removed
if f(c) {
log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
- s.delete(index.Idx, getIndexKey(key[1:]), po)
+ s.deleteNow(&index, getIndexKey(key[1:]), po)
removed++
errorsFound++
}
@@ -526,24 +605,43 @@ func (s *LDBStore) ReIndex() {
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
}
-func (s *LDBStore) Delete(addr Address) {
+// Delete is removes a chunk and updates indices.
+// Is thread safe
+func (s *LDBStore) Delete(addr Address) error {
s.lock.Lock()
defer s.lock.Unlock()
ikey := getIndexKey(addr)
- var indx dpaDBIndex
- s.tryAccessIdx(ikey, &indx)
+ idata, err := s.db.Get(ikey)
+ if err != nil {
+ return err
+ }
- s.delete(indx.Idx, ikey, s.po(addr))
+ var idx dpaDBIndex
+ decodeIndex(idata, &idx)
+ proximity := s.po(addr)
+ return s.deleteNow(&idx, ikey, proximity)
}
-func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
+// executes one delete operation immediately
+// see *LDBStore.delete
+func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error {
+ batch := new(leveldb.Batch)
+ s.delete(batch, idx, idxKey, po)
+ return s.db.Write(batch)
+}
+
+// adds a delete chunk operation to the provided batch
+// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64
+func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) {
metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
- batch := new(leveldb.Batch)
+ gcIdxKey := getGCIdxKey(idx)
+ batch.Delete(gcIdxKey)
+ dataKey := getDataKey(idx.Idx, po)
+ batch.Delete(dataKey)
batch.Delete(idxKey)
- batch.Delete(getDataKey(idx, po))
s.entryCnt--
dbEntryCount.Dec(1)
cntKey := make([]byte, 2)
@@ -551,7 +649,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
cntKey[1] = po
batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
- s.db.Write(batch)
}
func (s *LDBStore) BinIndex(po uint8) uint64 {
@@ -572,6 +669,9 @@ func (s *LDBStore) CurrentStorageIndex() uint64 {
return s.dataIdx
}
+// Put adds a chunk to the database, adding indices and incrementing global counters.
+// If it already exists, it merely increments the access count of the existing entry.
+// Is thread safe
func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
log.Trace("ldbstore.put", "key", chunk.Address())
@@ -594,7 +694,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
if err != nil {
s.doPut(chunk, &index, po)
} else {
- log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address)
+ log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po)
decodeIndex(idata, &index)
}
index.Access = s.accessCnt
@@ -602,6 +702,10 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
idata = encodeIndex(&index)
s.batch.Put(ikey, idata)
+ // add the access-chunkindex index for garbage collection
+ gcIdxKey := getGCIdxKey(&index)
+ gcIdxData := getGCIdxValue(&index, po, chunk.Address())
+ s.batch.Put(gcIdxKey, gcIdxData)
s.lock.Unlock()
select {
@@ -617,7 +721,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
}
}
-// force putting into db, does not check access index
+// force putting into db, does not check or update necessary indices
func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
data := s.encodeDataFunc(chunk)
dkey := getDataKey(s.dataIdx, po)
@@ -659,38 +763,26 @@ func (s *LDBStore) writeCurrentBatch() error {
if l == 0 {
return nil
}
- e := s.entryCnt
- d := s.dataIdx
- a := s.accessCnt
s.batch = newBatch()
- b.err = s.writeBatch(b, e, d, a)
+ b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt)
close(b.c)
- for e > s.capacity {
- log.Trace("for >", "e", e, "s.capacity", s.capacity)
- // Collect garbage in a separate goroutine
- // to be able to interrupt this loop by s.quit.
- done := make(chan struct{})
- go func() {
- s.collectGarbage(gcArrayFreeRatio)
- log.Trace("collectGarbage closing done")
- close(done)
- }()
-
- select {
- case <-s.quit:
- return errors.New("CollectGarbage terminated due to quit")
- case <-done:
- }
- e = s.entryCnt
+ if s.entryCnt >= s.capacity {
+ go s.collectGarbage()
}
return nil
}
// must be called non concurrently
-func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error {
- b.Put(keyEntryCnt, U64ToBytes(entryCnt))
- b.Put(keyDataIdx, U64ToBytes(dataIdx))
- b.Put(keyAccessCnt, U64ToBytes(accessCnt))
+func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error {
+ if wFlag&wEntryCnt > 0 {
+ b.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+ }
+ if wFlag&wIndexCnt > 0 {
+ b.Put(keyDataIdx, U64ToBytes(s.dataIdx))
+ }
+ if wFlag&wAccessCnt > 0 {
+ b.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
+ }
l := b.Len()
if err := s.db.Write(b.Batch); err != nil {
return fmt.Errorf("unable to write batch: %v", err)
@@ -713,17 +805,22 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
}
// try to find index; if found, update access cnt and return true
-func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
+func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool {
idata, err := s.db.Get(ikey)
if err != nil {
return false
}
decodeIndex(idata, index)
+ oldGCIdxKey := getGCIdxKey(index)
s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
s.accessCnt++
index.Access = s.accessCnt
idata = encodeIndex(index)
s.batch.Put(ikey, idata)
+ newGCIdxKey := getGCIdxKey(index)
+ newGCIdxData := getGCIdxValue(index, po, ikey)
+ s.batch.Delete(oldGCIdxKey)
+ s.batch.Put(newGCIdxKey, newGCIdxData)
select {
case s.batchesC <- struct{}{}:
default:
@@ -755,6 +852,9 @@ func (s *LDBStore) PutSchema(schema string) error {
return s.db.Put(keySchema, []byte(schema))
}
+// Get retrieves the chunk matching the provided key from the database.
+// If the chunk entry does not exist, it returns an error
+// Updates access count and is thread safe
func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
log.Trace("ldbstore.get", "key", addr)
@@ -764,12 +864,14 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error)
return s.get(addr)
}
+// TODO: To conform with other private methods of this object indices should not be updated
func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
var indx dpaDBIndex
if s.closed {
return nil, ErrDBClosed
}
- if s.tryAccessIdx(getIndexKey(addr), &indx) {
+ proximity := s.po(addr)
+ if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) {
var data []byte
if s.getDataFunc != nil {
// if getDataFunc is defined, use it to retrieve the chunk data
@@ -780,13 +882,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
}
} else {
// default DbStore functionality to retrieve chunk data
- proximity := s.po(addr)
datakey := getDataKey(indx.Idx, proximity)
data, err = s.db.Get(datakey)
log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
if err != nil {
log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
- s.delete(indx.Idx, getIndexKey(addr), s.po(addr))
+ s.deleteNow(&indx, getIndexKey(addr), s.po(addr))
return
}
}
@@ -813,33 +914,14 @@ func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []by
}
}
-func (s *LDBStore) updateAccessCnt(addr Address) {
-
- s.lock.Lock()
- defer s.lock.Unlock()
-
- var index dpaDBIndex
- s.tryAccessIdx(getIndexKey(addr), &index) // result_chn == nil, only update access cnt
-
-}
-
func (s *LDBStore) setCapacity(c uint64) {
s.lock.Lock()
defer s.lock.Unlock()
s.capacity = c
- if s.entryCnt > c {
- ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
- if ratio < gcArrayFreeRatio {
- ratio = gcArrayFreeRatio
- }
- if ratio > 1 {
- ratio = 1
- }
- for s.entryCnt > c {
- s.collectGarbage(ratio)
- }
+ for s.entryCnt > c {
+ s.collectGarbage()
}
}
diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go
index 14a42b5e3..48af8c57c 100644
--- a/swarm/storage/ldbstore_test.go
+++ b/swarm/storage/ldbstore_test.go
@@ -22,6 +22,8 @@ import (
"fmt"
"io/ioutil"
"os"
+ "strconv"
+ "strings"
"testing"
"time"
@@ -297,27 +299,73 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) {
}
// TestLDBStoreCollectGarbage tests that we can put more chunks than LevelDB's capacity, and
-// retrieve only some of them, because garbage collection must have cleared some of them
+// retrieve only some of them, because garbage collection must have partially cleared the store
+// Also tests that we can delete chunks and that we can trigger garbage collection
func TestLDBStoreCollectGarbage(t *testing.T) {
- capacity := 500
- n := 2000
+
+ // below max ronud
+ cap := defaultMaxGCRound / 2
+ t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage)
+ t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage)
+
+ // at max round
+ cap = defaultMaxGCRound
+ t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage)
+ t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage)
+
+ // more than max around, not on threshold
+ cap = defaultMaxGCRound * 1.1
+ t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage)
+ t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage)
+
+}
+
+func testLDBStoreCollectGarbage(t *testing.T) {
+ params := strings.Split(t.Name(), "/")
+ capacity, err := strconv.Atoi(params[2])
+ if err != nil {
+ t.Fatal(err)
+ }
+ n, err := strconv.Atoi(params[3])
+ if err != nil {
+ t.Fatal(err)
+ }
ldb, cleanup := newLDBStore(t)
ldb.setCapacity(uint64(capacity))
defer cleanup()
- chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize))
- if err != nil {
- t.Fatal(err.Error())
- }
- log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
+ // retrieve the gc round target count for the db capacity
+ ldb.startGC(capacity)
+ roundTarget := ldb.gc.target
+
+ // split put counts to gc target count threshold, and wait for gc to finish in between
+ var allChunks []Chunk
+ remaining := n
+ for remaining > 0 {
+ var putCount int
+ if remaining < roundTarget {
+ putCount = remaining
+ } else {
+ putCount = roundTarget
+ }
+ remaining -= putCount
+ chunks, err := mputRandomChunks(ldb, putCount, int64(ch.DefaultSize))
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ allChunks = append(allChunks, chunks...)
+ log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n)
- // wait for garbage collection to kick in on the responsible actor
- time.Sleep(1 * time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ waitGc(ctx, ldb)
+ }
+ // attempt gets on all put chunks
var missing int
- for _, ch := range chunks {
- ret, err := ldb.Get(context.Background(), ch.Address())
+ for _, ch := range allChunks {
+ ret, err := ldb.Get(context.TODO(), ch.Address())
if err == ErrChunkNotFound || err == ldberrors.ErrNotFound {
missing++
continue
@@ -333,8 +381,10 @@ func TestLDBStoreCollectGarbage(t *testing.T) {
log.Trace("got back chunk", "chunk", ret)
}
- if missing < n-capacity {
- t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", n-capacity, missing)
+ // all surplus chunks should be missing
+ expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget)
+ if missing != expectMissing {
+ t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", expectMissing, missing)
}
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
@@ -367,7 +417,6 @@ func TestLDBStoreAddRemove(t *testing.T) {
if i%2 == 0 {
// expect even chunks to be missing
if err == nil {
- // if err != ErrChunkNotFound {
t.Fatal("expected chunk to be missing, but got no error")
}
} else {
@@ -383,30 +432,48 @@ func TestLDBStoreAddRemove(t *testing.T) {
}
}
-// TestLDBStoreRemoveThenCollectGarbage tests that we can delete chunks and that we can trigger garbage collection
-func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) {
- capacity := 11
- surplus := 4
+func testLDBStoreRemoveThenCollectGarbage(t *testing.T) {
+
+ params := strings.Split(t.Name(), "/")
+ capacity, err := strconv.Atoi(params[2])
+ if err != nil {
+ t.Fatal(err)
+ }
+ n, err := strconv.Atoi(params[3])
+ if err != nil {
+ t.Fatal(err)
+ }
ldb, cleanup := newLDBStore(t)
+ defer cleanup()
ldb.setCapacity(uint64(capacity))
- n := capacity
-
- chunks := []Chunk{}
- for i := 0; i < n+surplus; i++ {
+ // put capacity count number of chunks
+ chunks := make([]Chunk, n)
+ for i := 0; i < n; i++ {
c := GenerateRandomChunk(ch.DefaultSize)
- chunks = append(chunks, c)
+ chunks[i] = c
log.Trace("generate random chunk", "idx", i, "chunk", c)
}
for i := 0; i < n; i++ {
- ldb.Put(context.TODO(), chunks[i])
+ err := ldb.Put(context.TODO(), chunks[i])
+ if err != nil {
+ t.Fatal(err)
+ }
}
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ waitGc(ctx, ldb)
+
// delete all chunks
+ // (only count the ones actually deleted, the rest will have been gc'd)
+ deletes := 0
for i := 0; i < n; i++ {
- ldb.Delete(chunks[i].Address())
+ if ldb.Delete(chunks[i].Address()) == nil {
+ deletes++
+ }
}
log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
@@ -415,37 +482,49 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) {
t.Fatalf("ldb.entrCnt expected 0 got %v", ldb.entryCnt)
}
- expAccessCnt := uint64(n * 2)
+ // the manual deletes will have increased accesscnt, so we need to add this when we verify the current count
+ expAccessCnt := uint64(n)
if ldb.accessCnt != expAccessCnt {
- t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.entryCnt)
+ t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.accessCnt)
}
- cleanup()
+ // retrieve the gc round target count for the db capacity
+ ldb.startGC(capacity)
+ roundTarget := ldb.gc.target
- ldb, cleanup = newLDBStore(t)
- capacity = 10
- ldb.setCapacity(uint64(capacity))
- defer cleanup()
-
- n = capacity + surplus
+ remaining := n
+ var puts int
+ for remaining > 0 {
+ var putCount int
+ if remaining < roundTarget {
+ putCount = remaining
+ } else {
+ putCount = roundTarget
+ }
+ remaining -= putCount
+ for putCount > 0 {
+ ldb.Put(context.TODO(), chunks[puts])
+ log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n, "puts", puts, "remaining", remaining, "roundtarget", roundTarget)
+ puts++
+ putCount--
+ }
- for i := 0; i < n; i++ {
- ldb.Put(context.TODO(), chunks[i])
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ waitGc(ctx, ldb)
}
- // wait for garbage collection
- time.Sleep(1 * time.Second)
-
// expect first surplus chunks to be missing, because they have the smallest access value
- for i := 0; i < surplus; i++ {
+ expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget)
+ for i := 0; i < expectMissing; i++ {
_, err := ldb.Get(context.TODO(), chunks[i].Address())
if err == nil {
- t.Fatal("expected surplus chunk to be missing, but got no error")
+ t.Fatalf("expected surplus chunk %d to be missing, but got no error", i)
}
}
// expect last chunks to be present, as they have the largest access value
- for i := surplus; i < surplus+capacity; i++ {
+ for i := expectMissing; i < n; i++ {
ret, err := ldb.Get(context.TODO(), chunks[i].Address())
if err != nil {
t.Fatalf("chunk %v: expected no error, but got %s", i, err)
@@ -455,3 +534,57 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) {
}
}
}
+
+// TestLDBStoreCollectGarbageAccessUnlikeIndex tests garbage collection where accesscount differs from indexcount
+func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
+
+ capacity := defaultMaxGCRound * 2
+ n := capacity - 1
+
+ ldb, cleanup := newLDBStore(t)
+ ldb.setCapacity(uint64(capacity))
+ defer cleanup()
+
+ chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize))
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
+
+ // set first added capacity/2 chunks to highest accesscount
+ for i := 0; i < capacity/2; i++ {
+ _, err := ldb.Get(context.TODO(), chunks[i].Address())
+ if err != nil {
+ t.Fatalf("fail add chunk #%d - %s: %v", i, chunks[i].Address(), err)
+ }
+ }
+ _, err = mputRandomChunks(ldb, 2, int64(ch.DefaultSize))
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+
+ // wait for garbage collection to kick in on the responsible actor
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ waitGc(ctx, ldb)
+
+ var missing int
+ for i, ch := range chunks[2 : capacity/2] {
+ ret, err := ldb.Get(context.TODO(), ch.Address())
+ if err == ErrChunkNotFound || err == ldberrors.ErrNotFound {
+ t.Fatalf("fail find chunk #%d - %s: %v", i, ch.Address(), err)
+ }
+
+ if !bytes.Equal(ret.Data(), ch.Data()) {
+ t.Fatal("expected to get the same data back, but got smth else")
+ }
+ log.Trace("got back chunk", "chunk", ret)
+ }
+
+ log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
+}
+
+func waitGc(ctx context.Context, ldb *LDBStore) {
+ <-ldb.gc.runC
+ ldb.gc.runC <- struct{}{}
+}