diff options
Diffstat (limited to 'swarm/storage/ldbstore.go')
-rw-r--r-- | swarm/storage/ldbstore.go | 122 |
1 files changed, 84 insertions, 38 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 46e040250..fbae59fac 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -57,7 +57,6 @@ var ( var ( keyIndex = byte(0) - keyOldData = byte(1) keyAccessCnt = []byte{2} keyEntryCnt = []byte{3} keyDataIdx = []byte{4} @@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte { return val } +func parseGCIdxKey(key []byte) (byte, []byte) { + return key[0], key[1:] +} + func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) { index = &dpaDBIndex{ Idx: binary.BigEndian.Uint64(val[1:]), @@ -504,7 +507,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { } } -//Cleanup iterates over the database and deletes chunks if they pass the `f` condition +// Cleanup iterates over the database and deletes chunks if they pass the `f` condition func (s *LDBStore) Cleanup(f func(*chunk) bool) { var errorsFound, removed, total int @@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) { log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed)) } -func (s *LDBStore) ReIndex() { - //Iterates over the database and checks that there are no faulty chunks +// CleanGCIndex rebuilds the garbage collector index from scratch, while +// removing inconsistent elements, e.g., indices with missing data chunks. +// WARN: it's a pretty heavy, long running function. +func (s *LDBStore) CleanGCIndex() error { + s.lock.Lock() + defer s.lock.Unlock() + + batch := leveldb.Batch{} + + var okEntryCount uint64 + var totalEntryCount uint64 + + // throw out all gc indices, we will rebuild from cleaned index it := s.db.NewIterator() - startPosition := []byte{keyOldData} - it.Seek(startPosition) - var key []byte - var errorsFound, total int + it.Seek([]byte{keyGCIdx}) + var gcDeletes int + for it.Valid() { + rowType, _ := parseGCIdxKey(it.Key()) + if rowType != keyGCIdx { + break + } + batch.Delete(it.Key()) + gcDeletes++ + it.Next() + } + log.Debug("gc", "deletes", gcDeletes) + if err := s.db.Write(&batch); err != nil { + return err + } + + it.Seek([]byte{keyIndex}) + var idx dpaDBIndex + var poPtrs [256]uint64 for it.Valid() { - key = it.Key() - if (key == nil) || (key[0] != keyOldData) { + rowType, chunkHash := parseGCIdxKey(it.Key()) + if rowType != keyIndex { break } - data := it.Value() - hasher := s.hashfunc() - hasher.Write(data) - hash := hasher.Sum(nil) - - newKey := make([]byte, 10) - oldCntKey := make([]byte, 2) - newCntKey := make([]byte, 2) - oldCntKey[0] = keyDistanceCnt - newCntKey[0] = keyDistanceCnt - key[0] = keyData - key[1] = s.po(Address(key[1:])) - oldCntKey[1] = key[1] - newCntKey[1] = s.po(Address(newKey[1:])) - copy(newKey[2:], key[1:]) - newValue := append(hash, data...) - - batch := new(leveldb.Batch) - batch.Delete(key) - s.bucketCnt[oldCntKey[1]]-- - batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]])) - batch.Put(newKey, newValue) - s.bucketCnt[newCntKey[1]]++ - batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]])) - s.db.Write(batch) + err := decodeIndex(it.Value(), &idx) + if err != nil { + return fmt.Errorf("corrupt index: %v", err) + } + po := s.po(chunkHash) + + // if we don't find the data key, remove the entry + dataKey := getDataKey(idx.Idx, po) + _, err = s.db.Get(dataKey) + if err != nil { + log.Warn("deleting inconsistent index (missing data)", "key", chunkHash) + batch.Delete(it.Key()) + } else { + gcIdxKey := getGCIdxKey(&idx) + gcIdxData := getGCIdxValue(&idx, po, chunkHash) + batch.Put(gcIdxKey, gcIdxData) + log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData) + okEntryCount++ + if idx.Idx > poPtrs[po] { + poPtrs[po] = idx.Idx + } + } + totalEntryCount++ it.Next() } + it.Release() - log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) + log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len()) + + var entryCount [8]byte + binary.BigEndian.PutUint64(entryCount[:], okEntryCount) + batch.Put(keyEntryCnt, entryCount[:]) + var poKey [2]byte + poKey[0] = keyDistanceCnt + for i, poPtr := range poPtrs { + poKey[1] = uint8(i) + if poPtr == 0 { + batch.Delete(poKey[:]) + } else { + var idxCount [8]byte + binary.BigEndian.PutUint64(idxCount[:], poPtr) + batch.Put(poKey[:], idxCount[:]) + } + } + + return s.db.Write(&batch) } // Delete is removes a chunk and updates indices. @@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) { s.accessCnt++ s.batch.Put(ikey, idata) newGCIdxKey := getGCIdxKey(index) - newGCIdxData := getGCIdxValue(index, po, ikey) + newGCIdxData := getGCIdxValue(index, po, ikey[1:]) s.batch.Delete(oldGCIdxKey) s.batch.Put(newGCIdxKey, newGCIdxData) select { @@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) { data, err := s.db.Get(keySchema) if err != nil { if err == leveldb.ErrNotFound { - return "", nil + return DbSchemaNone, nil } return "", err } |