aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/ldbstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/ldbstore.go')
-rw-r--r--swarm/storage/ldbstore.go122
1 files changed, 84 insertions, 38 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 46e040250..fbae59fac 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -57,7 +57,6 @@ var (
var (
keyIndex = byte(0)
- keyOldData = byte(1)
keyAccessCnt = []byte{2}
keyEntryCnt = []byte{3}
keyDataIdx = []byte{4}
@@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
return val
}
+func parseGCIdxKey(key []byte) (byte, []byte) {
+ return key[0], key[1:]
+}
+
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
index = &dpaDBIndex{
Idx: binary.BigEndian.Uint64(val[1:]),
@@ -504,7 +507,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
}
}
-//Cleanup iterates over the database and deletes chunks if they pass the `f` condition
+// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
var errorsFound, removed, total int
@@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) {
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
}
-func (s *LDBStore) ReIndex() {
- //Iterates over the database and checks that there are no faulty chunks
+// CleanGCIndex rebuilds the garbage collector index from scratch, while
+// removing inconsistent elements, e.g., indices with missing data chunks.
+// WARN: it's a pretty heavy, long running function.
+func (s *LDBStore) CleanGCIndex() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ batch := leveldb.Batch{}
+
+ var okEntryCount uint64
+ var totalEntryCount uint64
+
+ // throw out all gc indices, we will rebuild from cleaned index
it := s.db.NewIterator()
- startPosition := []byte{keyOldData}
- it.Seek(startPosition)
- var key []byte
- var errorsFound, total int
+ it.Seek([]byte{keyGCIdx})
+ var gcDeletes int
+ for it.Valid() {
+ rowType, _ := parseGCIdxKey(it.Key())
+ if rowType != keyGCIdx {
+ break
+ }
+ batch.Delete(it.Key())
+ gcDeletes++
+ it.Next()
+ }
+ log.Debug("gc", "deletes", gcDeletes)
+ if err := s.db.Write(&batch); err != nil {
+ return err
+ }
+
+ it.Seek([]byte{keyIndex})
+ var idx dpaDBIndex
+ var poPtrs [256]uint64
for it.Valid() {
- key = it.Key()
- if (key == nil) || (key[0] != keyOldData) {
+ rowType, chunkHash := parseGCIdxKey(it.Key())
+ if rowType != keyIndex {
break
}
- data := it.Value()
- hasher := s.hashfunc()
- hasher.Write(data)
- hash := hasher.Sum(nil)
-
- newKey := make([]byte, 10)
- oldCntKey := make([]byte, 2)
- newCntKey := make([]byte, 2)
- oldCntKey[0] = keyDistanceCnt
- newCntKey[0] = keyDistanceCnt
- key[0] = keyData
- key[1] = s.po(Address(key[1:]))
- oldCntKey[1] = key[1]
- newCntKey[1] = s.po(Address(newKey[1:]))
- copy(newKey[2:], key[1:])
- newValue := append(hash, data...)
-
- batch := new(leveldb.Batch)
- batch.Delete(key)
- s.bucketCnt[oldCntKey[1]]--
- batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
- batch.Put(newKey, newValue)
- s.bucketCnt[newCntKey[1]]++
- batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
- s.db.Write(batch)
+ err := decodeIndex(it.Value(), &idx)
+ if err != nil {
+ return fmt.Errorf("corrupt index: %v", err)
+ }
+ po := s.po(chunkHash)
+
+ // if we don't find the data key, remove the entry
+ dataKey := getDataKey(idx.Idx, po)
+ _, err = s.db.Get(dataKey)
+ if err != nil {
+ log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
+ batch.Delete(it.Key())
+ } else {
+ gcIdxKey := getGCIdxKey(&idx)
+ gcIdxData := getGCIdxValue(&idx, po, chunkHash)
+ batch.Put(gcIdxKey, gcIdxData)
+ log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
+ okEntryCount++
+ if idx.Idx > poPtrs[po] {
+ poPtrs[po] = idx.Idx
+ }
+ }
+ totalEntryCount++
it.Next()
}
+
it.Release()
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
+ log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
+
+ var entryCount [8]byte
+ binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
+ batch.Put(keyEntryCnt, entryCount[:])
+ var poKey [2]byte
+ poKey[0] = keyDistanceCnt
+ for i, poPtr := range poPtrs {
+ poKey[1] = uint8(i)
+ if poPtr == 0 {
+ batch.Delete(poKey[:])
+ } else {
+ var idxCount [8]byte
+ binary.BigEndian.PutUint64(idxCount[:], poPtr)
+ batch.Put(poKey[:], idxCount[:])
+ }
+ }
+
+ return s.db.Write(&batch)
}
// Delete is removes a chunk and updates indices.
@@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
s.accessCnt++
s.batch.Put(ikey, idata)
newGCIdxKey := getGCIdxKey(index)
- newGCIdxData := getGCIdxValue(index, po, ikey)
+ newGCIdxData := getGCIdxValue(index, po, ikey[1:])
s.batch.Delete(oldGCIdxKey)
s.batch.Put(newGCIdxKey, newGCIdxData)
select {
@@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) {
data, err := s.db.Get(keySchema)
if err != nil {
if err == leveldb.ErrNotFound {
- return "", nil
+ return DbSchemaNone, nil
}
return "", err
}