diff options
Diffstat (limited to 'swarm/storage/ldbstore.go')
-rw-r--r-- | swarm/storage/ldbstore.go | 282 |
1 files changed, 182 insertions, 100 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 2a7f51cb3..49508911f 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -32,7 +32,6 @@ import ( "fmt" "io" "io/ioutil" - "sort" "sync" "github.com/ethereum/go-ethereum/metrics" @@ -44,8 +43,13 @@ import ( ) const ( - gcArrayFreeRatio = 0.1 - maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage() + defaultGCRatio = 10 + defaultMaxGCRound = 10000 + defaultMaxGCBatch = 5000 + + wEntryCnt = 1 << 0 + wIndexCnt = 1 << 1 + wAccessCnt = 1 << 2 ) var ( @@ -61,6 +65,7 @@ var ( keyData = byte(6) keyDistanceCnt = byte(7) keySchema = []byte{8} + keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry ) var ( @@ -68,7 +73,7 @@ var ( ) type gcItem struct { - idx uint64 + idx *dpaDBIndex value uint64 idxKey []byte po uint8 @@ -89,6 +94,16 @@ func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams { } } +type garbage struct { + maxRound int // maximum number of chunks to delete in one garbage collection round + maxBatch int // maximum number of chunks to delete in one db request batch + ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db + count int // number of chunks deleted in running round + target int // number of chunks to delete in running round + batch *dbBatch // the delete batch + runC chan struct{} // struct in chan means gc is NOT running +} + type LDBStore struct { db *LDBDatabase @@ -102,12 +117,12 @@ type LDBStore struct { hashfunc SwarmHasher po func(Address) uint8 - batchC chan bool batchesC chan struct{} closed bool batch *dbBatch lock sync.RWMutex quit chan struct{} + gc *garbage // Functions encodeDataFunc is used to bypass // the default functionality of DbStore with @@ -166,9 +181,33 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { data, _ = s.db.Get(keyDataIdx) s.dataIdx = BytesToU64(data) + // set up garbage collection + s.gc = &garbage{ + maxBatch: defaultMaxGCBatch, + maxRound: defaultMaxGCRound, + ratio: defaultGCRatio, + } + + s.gc.runC = make(chan struct{}, 1) + s.gc.runC <- struct{}{} + return s, nil } +// initialize and set values for processing of gc round +func (s *LDBStore) startGC(c int) { + + s.gc.count = 0 + // calculate the target number of deletions + if c >= s.gc.maxRound { + s.gc.target = s.gc.maxRound + } else { + s.gc.target = c / s.gc.ratio + } + s.gc.batch = newBatch() + log.Debug("startgc", "requested", c, "target", s.gc.target) +} + // NewMockDbStore creates a new instance of DbStore with // mockStore set to a provided value. If mockStore argument is nil, // this function behaves exactly as NewDbStore. @@ -225,6 +264,31 @@ func getDataKey(idx uint64, po uint8) []byte { return key } +func getGCIdxKey(index *dpaDBIndex) []byte { + key := make([]byte, 9) + key[0] = keyGCIdx + binary.BigEndian.PutUint64(key[1:], index.Access) + return key +} + +func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte { + val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32 + val[0] = po + binary.BigEndian.PutUint64(val[1:], index.Idx) + copy(val[9:], addr) + return val +} + +func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) { + index = &dpaDBIndex{ + Idx: binary.BigEndian.Uint64(val[1:]), + Access: binary.BigEndian.Uint64(accessCnt), + } + po = val[0] + addr = val[9:] + return +} + func encodeIndex(index *dpaDBIndex) []byte { data, _ := rlp.EncodeToBytes(index) return data @@ -247,55 +311,70 @@ func decodeData(addr Address, data []byte) (*chunk, error) { return NewChunk(addr, data[32:]), nil } -func (s *LDBStore) collectGarbage(ratio float32) { - log.Trace("collectGarbage", "ratio", ratio) +func (s *LDBStore) collectGarbage() error { - metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1) + // prevent duplicate gc from starting when one is already running + select { + case <-s.gc.runC: + default: + return nil + } - it := s.db.NewIterator() - defer it.Release() + s.lock.Lock() + entryCnt := s.entryCnt + s.lock.Unlock() - garbage := []*gcItem{} - gcnt := 0 + metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1) - for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() { - itkey := it.Key() + // calculate the amount of chunks to collect and reset counter + s.startGC(int(entryCnt)) + log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt) - if (itkey == nil) || (itkey[0] != keyIndex) { - break - } + var totalDeleted int + for s.gc.count < s.gc.target { + it := s.db.NewIterator() + ok := it.Seek([]byte{keyGCIdx}) + var singleIterationCount int - // it.Key() contents change on next call to it.Next(), so we must copy it - key := make([]byte, len(it.Key())) - copy(key, it.Key()) + // every batch needs a lock so we avoid entries changing accessidx in the meantime + s.lock.Lock() + for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() { - val := it.Value() + // quit if no more access index keys + itkey := it.Key() + if (itkey == nil) || (itkey[0] != keyGCIdx) { + break + } - var index dpaDBIndex + // get chunk data entry from access index + val := it.Value() + index, po, hash := parseGCIdxEntry(itkey[1:], val) + keyIdx := make([]byte, 33) + keyIdx[0] = keyIndex + copy(keyIdx[1:], hash) - hash := key[1:] - decodeIndex(val, &index) - po := s.po(hash) + // add delete operation to batch + s.delete(s.gc.batch.Batch, index, keyIdx, po) + singleIterationCount++ + s.gc.count++ - gci := &gcItem{ - idxKey: key, - idx: index.Idx, - value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below. - po: po, + // break if target is not on max garbage batch boundary + if s.gc.count >= s.gc.target { + break + } } - garbage = append(garbage, gci) - gcnt++ + s.writeBatch(s.gc.batch, wEntryCnt) + s.lock.Unlock() + it.Release() + log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count) } - sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value }) - - cutoff := int(float32(gcnt) * ratio) - metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(cutoff)) + s.gc.runC <- struct{}{} + log.Debug("garbage collect done", "c", s.gc.count) - for i := 0; i < cutoff; i++ { - s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po) - } + metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(totalDeleted)) + return nil } // Export writes all chunks from the store to a tar archive, returning the @@ -474,7 +553,7 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) { // if chunk is to be removed if f(c) { log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) - s.delete(index.Idx, getIndexKey(key[1:]), po) + s.deleteNow(&index, getIndexKey(key[1:]), po) removed++ errorsFound++ } @@ -526,24 +605,43 @@ func (s *LDBStore) ReIndex() { log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) } -func (s *LDBStore) Delete(addr Address) { +// Delete is removes a chunk and updates indices. +// Is thread safe +func (s *LDBStore) Delete(addr Address) error { s.lock.Lock() defer s.lock.Unlock() ikey := getIndexKey(addr) - var indx dpaDBIndex - s.tryAccessIdx(ikey, &indx) + idata, err := s.db.Get(ikey) + if err != nil { + return err + } - s.delete(indx.Idx, ikey, s.po(addr)) + var idx dpaDBIndex + decodeIndex(idata, &idx) + proximity := s.po(addr) + return s.deleteNow(&idx, ikey, proximity) } -func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { +// executes one delete operation immediately +// see *LDBStore.delete +func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error { + batch := new(leveldb.Batch) + s.delete(batch, idx, idxKey, po) + return s.db.Write(batch) +} + +// adds a delete chunk operation to the provided batch +// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64 +func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) { metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1) - batch := new(leveldb.Batch) + gcIdxKey := getGCIdxKey(idx) + batch.Delete(gcIdxKey) + dataKey := getDataKey(idx.Idx, po) + batch.Delete(dataKey) batch.Delete(idxKey) - batch.Delete(getDataKey(idx, po)) s.entryCnt-- dbEntryCount.Dec(1) cntKey := make([]byte, 2) @@ -551,7 +649,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { cntKey[1] = po batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) batch.Put(cntKey, U64ToBytes(s.bucketCnt[po])) - s.db.Write(batch) } func (s *LDBStore) BinIndex(po uint8) uint64 { @@ -572,6 +669,9 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } +// Put adds a chunk to the database, adding indices and incrementing global counters. +// If it already exists, it merely increments the access count of the existing entry. +// Is thread safe func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) log.Trace("ldbstore.put", "key", chunk.Address()) @@ -594,7 +694,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { if err != nil { s.doPut(chunk, &index, po) } else { - log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address) + log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po) decodeIndex(idata, &index) } index.Access = s.accessCnt @@ -602,6 +702,10 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { idata = encodeIndex(&index) s.batch.Put(ikey, idata) + // add the access-chunkindex index for garbage collection + gcIdxKey := getGCIdxKey(&index) + gcIdxData := getGCIdxValue(&index, po, chunk.Address()) + s.batch.Put(gcIdxKey, gcIdxData) s.lock.Unlock() select { @@ -617,7 +721,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { } } -// force putting into db, does not check access index +// force putting into db, does not check or update necessary indices func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { data := s.encodeDataFunc(chunk) dkey := getDataKey(s.dataIdx, po) @@ -659,38 +763,26 @@ func (s *LDBStore) writeCurrentBatch() error { if l == 0 { return nil } - e := s.entryCnt - d := s.dataIdx - a := s.accessCnt s.batch = newBatch() - b.err = s.writeBatch(b, e, d, a) + b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt) close(b.c) - for e > s.capacity { - log.Trace("for >", "e", e, "s.capacity", s.capacity) - // Collect garbage in a separate goroutine - // to be able to interrupt this loop by s.quit. - done := make(chan struct{}) - go func() { - s.collectGarbage(gcArrayFreeRatio) - log.Trace("collectGarbage closing done") - close(done) - }() - - select { - case <-s.quit: - return errors.New("CollectGarbage terminated due to quit") - case <-done: - } - e = s.entryCnt + if s.entryCnt >= s.capacity { + go s.collectGarbage() } return nil } // must be called non concurrently -func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error { - b.Put(keyEntryCnt, U64ToBytes(entryCnt)) - b.Put(keyDataIdx, U64ToBytes(dataIdx)) - b.Put(keyAccessCnt, U64ToBytes(accessCnt)) +func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error { + if wFlag&wEntryCnt > 0 { + b.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) + } + if wFlag&wIndexCnt > 0 { + b.Put(keyDataIdx, U64ToBytes(s.dataIdx)) + } + if wFlag&wAccessCnt > 0 { + b.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) + } l := b.Len() if err := s.db.Write(b.Batch); err != nil { return fmt.Errorf("unable to write batch: %v", err) @@ -713,17 +805,22 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { } // try to find index; if found, update access cnt and return true -func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { +func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool { idata, err := s.db.Get(ikey) if err != nil { return false } decodeIndex(idata, index) + oldGCIdxKey := getGCIdxKey(index) s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) s.accessCnt++ index.Access = s.accessCnt idata = encodeIndex(index) s.batch.Put(ikey, idata) + newGCIdxKey := getGCIdxKey(index) + newGCIdxData := getGCIdxValue(index, po, ikey) + s.batch.Delete(oldGCIdxKey) + s.batch.Put(newGCIdxKey, newGCIdxData) select { case s.batchesC <- struct{}{}: default: @@ -755,6 +852,9 @@ func (s *LDBStore) PutSchema(schema string) error { return s.db.Put(keySchema, []byte(schema)) } +// Get retrieves the chunk matching the provided key from the database. +// If the chunk entry does not exist, it returns an error +// Updates access count and is thread safe func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) @@ -764,12 +864,14 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) return s.get(addr) } +// TODO: To conform with other private methods of this object indices should not be updated func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { var indx dpaDBIndex if s.closed { return nil, ErrDBClosed } - if s.tryAccessIdx(getIndexKey(addr), &indx) { + proximity := s.po(addr) + if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) { var data []byte if s.getDataFunc != nil { // if getDataFunc is defined, use it to retrieve the chunk data @@ -780,13 +882,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { } } else { // default DbStore functionality to retrieve chunk data - proximity := s.po(addr) datakey := getDataKey(indx.Idx, proximity) data, err = s.db.Get(datakey) log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity) if err != nil { log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err) - s.delete(indx.Idx, getIndexKey(addr), s.po(addr)) + s.deleteNow(&indx, getIndexKey(addr), s.po(addr)) return } } @@ -813,33 +914,14 @@ func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []by } } -func (s *LDBStore) updateAccessCnt(addr Address) { - - s.lock.Lock() - defer s.lock.Unlock() - - var index dpaDBIndex - s.tryAccessIdx(getIndexKey(addr), &index) // result_chn == nil, only update access cnt - -} - func (s *LDBStore) setCapacity(c uint64) { s.lock.Lock() defer s.lock.Unlock() s.capacity = c - if s.entryCnt > c { - ratio := float32(1.01) - float32(c)/float32(s.entryCnt) - if ratio < gcArrayFreeRatio { - ratio = gcArrayFreeRatio - } - if ratio > 1 { - ratio = 1 - } - for s.entryCnt > c { - s.collectGarbage(ratio) - } + for s.entryCnt > c { + s.collectGarbage() } } |