aboutsummaryrefslogtreecommitdiffstats
path: root/swarm
diff options
context:
space:
mode:
Diffstat (limited to 'swarm')
-rw-r--r--swarm/storage/ldbstore.go122
-rw-r--r--swarm/storage/ldbstore_test.go140
-rw-r--r--swarm/storage/localstore.go47
-rw-r--r--swarm/storage/schema.go13
4 files changed, 268 insertions, 54 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 46e040250..fbae59fac 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -57,7 +57,6 @@ var (
var (
keyIndex = byte(0)
- keyOldData = byte(1)
keyAccessCnt = []byte{2}
keyEntryCnt = []byte{3}
keyDataIdx = []byte{4}
@@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
return val
}
+func parseGCIdxKey(key []byte) (byte, []byte) {
+ return key[0], key[1:]
+}
+
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
index = &dpaDBIndex{
Idx: binary.BigEndian.Uint64(val[1:]),
@@ -504,7 +507,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
}
}
-//Cleanup iterates over the database and deletes chunks if they pass the `f` condition
+// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
var errorsFound, removed, total int
@@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) {
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
}
-func (s *LDBStore) ReIndex() {
- //Iterates over the database and checks that there are no faulty chunks
+// CleanGCIndex rebuilds the garbage collector index from scratch, while
+// removing inconsistent elements, e.g., indices with missing data chunks.
+// WARN: it's a pretty heavy, long running function.
+func (s *LDBStore) CleanGCIndex() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ batch := leveldb.Batch{}
+
+ var okEntryCount uint64
+ var totalEntryCount uint64
+
+ // throw out all gc indices, we will rebuild from cleaned index
it := s.db.NewIterator()
- startPosition := []byte{keyOldData}
- it.Seek(startPosition)
- var key []byte
- var errorsFound, total int
+ it.Seek([]byte{keyGCIdx})
+ var gcDeletes int
+ for it.Valid() {
+ rowType, _ := parseGCIdxKey(it.Key())
+ if rowType != keyGCIdx {
+ break
+ }
+ batch.Delete(it.Key())
+ gcDeletes++
+ it.Next()
+ }
+ log.Debug("gc", "deletes", gcDeletes)
+ if err := s.db.Write(&batch); err != nil {
+ return err
+ }
+
+ it.Seek([]byte{keyIndex})
+ var idx dpaDBIndex
+ var poPtrs [256]uint64
for it.Valid() {
- key = it.Key()
- if (key == nil) || (key[0] != keyOldData) {
+ rowType, chunkHash := parseGCIdxKey(it.Key())
+ if rowType != keyIndex {
break
}
- data := it.Value()
- hasher := s.hashfunc()
- hasher.Write(data)
- hash := hasher.Sum(nil)
-
- newKey := make([]byte, 10)
- oldCntKey := make([]byte, 2)
- newCntKey := make([]byte, 2)
- oldCntKey[0] = keyDistanceCnt
- newCntKey[0] = keyDistanceCnt
- key[0] = keyData
- key[1] = s.po(Address(key[1:]))
- oldCntKey[1] = key[1]
- newCntKey[1] = s.po(Address(newKey[1:]))
- copy(newKey[2:], key[1:])
- newValue := append(hash, data...)
-
- batch := new(leveldb.Batch)
- batch.Delete(key)
- s.bucketCnt[oldCntKey[1]]--
- batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
- batch.Put(newKey, newValue)
- s.bucketCnt[newCntKey[1]]++
- batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
- s.db.Write(batch)
+ err := decodeIndex(it.Value(), &idx)
+ if err != nil {
+ return fmt.Errorf("corrupt index: %v", err)
+ }
+ po := s.po(chunkHash)
+
+ // if we don't find the data key, remove the entry
+ dataKey := getDataKey(idx.Idx, po)
+ _, err = s.db.Get(dataKey)
+ if err != nil {
+ log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
+ batch.Delete(it.Key())
+ } else {
+ gcIdxKey := getGCIdxKey(&idx)
+ gcIdxData := getGCIdxValue(&idx, po, chunkHash)
+ batch.Put(gcIdxKey, gcIdxData)
+ log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
+ okEntryCount++
+ if idx.Idx > poPtrs[po] {
+ poPtrs[po] = idx.Idx
+ }
+ }
+ totalEntryCount++
it.Next()
}
+
it.Release()
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
+ log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
+
+ var entryCount [8]byte
+ binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
+ batch.Put(keyEntryCnt, entryCount[:])
+ var poKey [2]byte
+ poKey[0] = keyDistanceCnt
+ for i, poPtr := range poPtrs {
+ poKey[1] = uint8(i)
+ if poPtr == 0 {
+ batch.Delete(poKey[:])
+ } else {
+ var idxCount [8]byte
+ binary.BigEndian.PutUint64(idxCount[:], poPtr)
+ batch.Put(poKey[:], idxCount[:])
+ }
+ }
+
+ return s.db.Write(&batch)
}
// Delete is removes a chunk and updates indices.
@@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
s.accessCnt++
s.batch.Put(ikey, idata)
newGCIdxKey := getGCIdxKey(index)
- newGCIdxData := getGCIdxValue(index, po, ikey)
+ newGCIdxData := getGCIdxValue(index, po, ikey[1:])
s.batch.Delete(oldGCIdxKey)
s.batch.Put(newGCIdxKey, newGCIdxData)
select {
@@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) {
data, err := s.db.Get(keySchema)
if err != nil {
if err == leveldb.ErrNotFound {
- return "", nil
+ return DbSchemaNone, nil
}
return "", err
}
diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go
index 22213b12d..07557980c 100644
--- a/swarm/storage/ldbstore_test.go
+++ b/swarm/storage/ldbstore_test.go
@@ -19,6 +19,7 @@ package storage
import (
"bytes"
"context"
+ "encoding/binary"
"fmt"
"io/ioutil"
"os"
@@ -623,6 +624,145 @@ func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
}
+func TestCleanIndex(t *testing.T) {
+ capacity := 5000
+ n := 3
+
+ ldb, cleanup := newLDBStore(t)
+ ldb.setCapacity(uint64(capacity))
+ defer cleanup()
+
+ chunks, err := mputRandomChunks(ldb, n, 4096)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove the data of the first chunk
+ po := ldb.po(chunks[0].Address()[:])
+ dataKey := make([]byte, 10)
+ dataKey[0] = keyData
+ dataKey[1] = byte(po)
+ // dataKey[2:10] = first chunk has storageIdx 0 on [2:10]
+ if _, err := ldb.db.Get(dataKey); err != nil {
+ t.Fatal(err)
+ }
+ if err := ldb.db.Delete(dataKey); err != nil {
+ t.Fatal(err)
+ }
+
+ // remove the gc index row for the first chunk
+ gcFirstCorrectKey := make([]byte, 9)
+ gcFirstCorrectKey[0] = keyGCIdx
+ if err := ldb.db.Delete(gcFirstCorrectKey); err != nil {
+ t.Fatal(err)
+ }
+
+ // warp the gc data of the second chunk
+ // this data should be correct again after the clean
+ gcSecondCorrectKey := make([]byte, 9)
+ gcSecondCorrectKey[0] = keyGCIdx
+ binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1))
+ gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+ warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1)
+ copy(warpedGCVal[1:], gcSecondCorrectVal)
+ if err := ldb.db.Delete(gcSecondCorrectKey); err != nil {
+ t.Fatal(err)
+ }
+ if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := ldb.CleanGCIndex(); err != nil {
+ t.Fatal(err)
+ }
+
+ // the index without corresponding data should have been deleted
+ idxKey := make([]byte, 33)
+ idxKey[0] = keyIndex
+ copy(idxKey[1:], chunks[0].Address())
+ if _, err := ldb.db.Get(idxKey); err == nil {
+ t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey)
+ }
+
+ // the two other indices should be present
+ copy(idxKey[1:], chunks[1].Address())
+ if _, err := ldb.db.Get(idxKey); err != nil {
+ t.Fatalf("expected chunk 1 idx to be present: %v", idxKey)
+ }
+
+ copy(idxKey[1:], chunks[2].Address())
+ if _, err := ldb.db.Get(idxKey); err != nil {
+ t.Fatalf("expected chunk 2 idx to be present: %v", idxKey)
+ }
+
+ // first gc index should still be gone
+ if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil {
+ t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey)
+ }
+
+ // second gc index should still be fixed
+ if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
+ t.Fatalf("expected gc 1 idx to be present: %v", idxKey)
+ }
+
+ // third gc index should be unchanged
+ binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2))
+ if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
+ t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
+ }
+
+ c, err := ldb.db.Get(keyEntryCnt)
+ if err != nil {
+ t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
+ }
+
+ // entrycount should now be one less
+ entryCount := binary.BigEndian.Uint64(c)
+ if entryCount != 2 {
+ t.Fatalf("expected entrycnt to be 2, was %d", c)
+ }
+
+ // the chunks might accidentally be in the same bin
+ // if so that bin counter will now be 2 - the highest added index.
+ // if not, the total of them will be 3
+ poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())}
+ if poBins[0] == poBins[1] {
+ poBins = poBins[:1]
+ }
+
+ var binTotal uint64
+ var currentBin [2]byte
+ currentBin[0] = keyDistanceCnt
+ if len(poBins) == 1 {
+ currentBin[1] = poBins[0]
+ c, err := ldb.db.Get(currentBin[:])
+ if err != nil {
+ t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
+ }
+ binCount := binary.BigEndian.Uint64(c)
+ if binCount != 2 {
+ t.Fatalf("expected entrycnt to be 2, was %d", binCount)
+ }
+ } else {
+ for _, bin := range poBins {
+ currentBin[1] = bin
+ c, err := ldb.db.Get(currentBin[:])
+ if err != nil {
+ t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
+ }
+ binCount := binary.BigEndian.Uint64(c)
+ binTotal += binCount
+
+ }
+ if binTotal != 3 {
+ t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
+ }
+ }
+}
+
func waitGc(ctx context.Context, ldb *LDBStore) {
<-ldb.gc.runC
ldb.gc.runC <- struct{}{}
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
index 6971d759e..fa98848dd 100644
--- a/swarm/storage/localstore.go
+++ b/swarm/storage/localstore.go
@@ -196,31 +196,48 @@ func (ls *LocalStore) Close() {
// Migrate checks the datastore schema vs the runtime schema, and runs migrations if they don't match
func (ls *LocalStore) Migrate() error {
- schema, err := ls.DbStore.GetSchema()
+ actualDbSchema, err := ls.DbStore.GetSchema()
if err != nil {
log.Error(err.Error())
return err
}
- log.Debug("found schema", "schema", schema, "runtime-schema", CurrentDbSchema)
- if schema != CurrentDbSchema {
- // run migrations
+ log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema)
- if schema == "" {
- log.Debug("running migrations for", "schema", schema, "runtime-schema", CurrentDbSchema)
+ if actualDbSchema == CurrentDbSchema {
+ return nil
+ }
+
+ if actualDbSchema == DbSchemaNone {
+ ls.migrateFromNoneToPurity()
+ actualDbSchema = DbSchemaPurity
+ }
- // delete chunks that are not valid, i.e. chunks that do not pass any of the ls.Validators
- ls.DbStore.Cleanup(func(c *chunk) bool {
- return !ls.isValid(c)
- })
+ if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
+ return err
+ }
- err := ls.DbStore.PutSchema(DbSchemaPurity)
- if err != nil {
- log.Error(err.Error())
- return err
- }
+ if actualDbSchema == DbSchemaPurity {
+ if err := ls.migrateFromPurityToHalloween(); err != nil {
+ return err
}
+ actualDbSchema = DbSchemaHalloween
}
+ if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
+ return err
+ }
return nil
}
+
+func (ls *LocalStore) migrateFromNoneToPurity() {
+ // delete chunks that are not valid, i.e. chunks that do not pass
+ // any of the ls.Validators
+ ls.DbStore.Cleanup(func(c *chunk) bool {
+ return !ls.isValid(c)
+ })
+}
+
+func (ls *LocalStore) migrateFromPurityToHalloween() error {
+ return ls.DbStore.CleanGCIndex()
+}
diff --git a/swarm/storage/schema.go b/swarm/storage/schema.go
index fb8498a29..91847ca0f 100644
--- a/swarm/storage/schema.go
+++ b/swarm/storage/schema.go
@@ -1,6 +1,17 @@
package storage
+// The DB schema we want to use. The actual/current DB schema might differ
+// until migrations are run.
+const CurrentDbSchema = DbSchemaHalloween
+
+// There was a time when we had no schema at all.
+const DbSchemaNone = ""
+
// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
const DbSchemaPurity = "purity"
-const CurrentDbSchema = DbSchemaPurity
+// "halloween" is here because we had a screw in the garbage collector index.
+// Because of that we had to rebuild the GC index to get rid of erroneous
+// entries and that takes a long time. This schema is used for bookkeeping,
+// so rebuild index will run just once.
+const DbSchemaHalloween = "halloween"