diff options
Diffstat (limited to 'swarm')
-rw-r--r-- | swarm/storage/ldbstore.go | 122 | ||||
-rw-r--r-- | swarm/storage/ldbstore_test.go | 140 | ||||
-rw-r--r-- | swarm/storage/localstore.go | 47 | ||||
-rw-r--r-- | swarm/storage/schema.go | 13 |
4 files changed, 268 insertions, 54 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 46e040250..fbae59fac 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -57,7 +57,6 @@ var ( var ( keyIndex = byte(0) - keyOldData = byte(1) keyAccessCnt = []byte{2} keyEntryCnt = []byte{3} keyDataIdx = []byte{4} @@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte { return val } +func parseGCIdxKey(key []byte) (byte, []byte) { + return key[0], key[1:] +} + func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) { index = &dpaDBIndex{ Idx: binary.BigEndian.Uint64(val[1:]), @@ -504,7 +507,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { } } -//Cleanup iterates over the database and deletes chunks if they pass the `f` condition +// Cleanup iterates over the database and deletes chunks if they pass the `f` condition func (s *LDBStore) Cleanup(f func(*chunk) bool) { var errorsFound, removed, total int @@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) { log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed)) } -func (s *LDBStore) ReIndex() { - //Iterates over the database and checks that there are no faulty chunks +// CleanGCIndex rebuilds the garbage collector index from scratch, while +// removing inconsistent elements, e.g., indices with missing data chunks. +// WARN: it's a pretty heavy, long running function. +func (s *LDBStore) CleanGCIndex() error { + s.lock.Lock() + defer s.lock.Unlock() + + batch := leveldb.Batch{} + + var okEntryCount uint64 + var totalEntryCount uint64 + + // throw out all gc indices, we will rebuild from cleaned index it := s.db.NewIterator() - startPosition := []byte{keyOldData} - it.Seek(startPosition) - var key []byte - var errorsFound, total int + it.Seek([]byte{keyGCIdx}) + var gcDeletes int + for it.Valid() { + rowType, _ := parseGCIdxKey(it.Key()) + if rowType != keyGCIdx { + break + } + batch.Delete(it.Key()) + gcDeletes++ + it.Next() + } + log.Debug("gc", "deletes", gcDeletes) + if err := s.db.Write(&batch); err != nil { + return err + } + + it.Seek([]byte{keyIndex}) + var idx dpaDBIndex + var poPtrs [256]uint64 for it.Valid() { - key = it.Key() - if (key == nil) || (key[0] != keyOldData) { + rowType, chunkHash := parseGCIdxKey(it.Key()) + if rowType != keyIndex { break } - data := it.Value() - hasher := s.hashfunc() - hasher.Write(data) - hash := hasher.Sum(nil) - - newKey := make([]byte, 10) - oldCntKey := make([]byte, 2) - newCntKey := make([]byte, 2) - oldCntKey[0] = keyDistanceCnt - newCntKey[0] = keyDistanceCnt - key[0] = keyData - key[1] = s.po(Address(key[1:])) - oldCntKey[1] = key[1] - newCntKey[1] = s.po(Address(newKey[1:])) - copy(newKey[2:], key[1:]) - newValue := append(hash, data...) - - batch := new(leveldb.Batch) - batch.Delete(key) - s.bucketCnt[oldCntKey[1]]-- - batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]])) - batch.Put(newKey, newValue) - s.bucketCnt[newCntKey[1]]++ - batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]])) - s.db.Write(batch) + err := decodeIndex(it.Value(), &idx) + if err != nil { + return fmt.Errorf("corrupt index: %v", err) + } + po := s.po(chunkHash) + + // if we don't find the data key, remove the entry + dataKey := getDataKey(idx.Idx, po) + _, err = s.db.Get(dataKey) + if err != nil { + log.Warn("deleting inconsistent index (missing data)", "key", chunkHash) + batch.Delete(it.Key()) + } else { + gcIdxKey := getGCIdxKey(&idx) + gcIdxData := getGCIdxValue(&idx, po, chunkHash) + batch.Put(gcIdxKey, gcIdxData) + log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData) + okEntryCount++ + if idx.Idx > poPtrs[po] { + poPtrs[po] = idx.Idx + } + } + totalEntryCount++ it.Next() } + it.Release() - log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) + log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len()) + + var entryCount [8]byte + binary.BigEndian.PutUint64(entryCount[:], okEntryCount) + batch.Put(keyEntryCnt, entryCount[:]) + var poKey [2]byte + poKey[0] = keyDistanceCnt + for i, poPtr := range poPtrs { + poKey[1] = uint8(i) + if poPtr == 0 { + batch.Delete(poKey[:]) + } else { + var idxCount [8]byte + binary.BigEndian.PutUint64(idxCount[:], poPtr) + batch.Put(poKey[:], idxCount[:]) + } + } + + return s.db.Write(&batch) } // Delete is removes a chunk and updates indices. @@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) { s.accessCnt++ s.batch.Put(ikey, idata) newGCIdxKey := getGCIdxKey(index) - newGCIdxData := getGCIdxValue(index, po, ikey) + newGCIdxData := getGCIdxValue(index, po, ikey[1:]) s.batch.Delete(oldGCIdxKey) s.batch.Put(newGCIdxKey, newGCIdxData) select { @@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) { data, err := s.db.Get(keySchema) if err != nil { if err == leveldb.ErrNotFound { - return "", nil + return DbSchemaNone, nil } return "", err } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 22213b12d..07557980c 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -19,6 +19,7 @@ package storage import ( "bytes" "context" + "encoding/binary" "fmt" "io/ioutil" "os" @@ -623,6 +624,145 @@ func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) { log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) } +func TestCleanIndex(t *testing.T) { + capacity := 5000 + n := 3 + + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(uint64(capacity)) + defer cleanup() + + chunks, err := mputRandomChunks(ldb, n, 4096) + if err != nil { + t.Fatal(err) + } + + // remove the data of the first chunk + po := ldb.po(chunks[0].Address()[:]) + dataKey := make([]byte, 10) + dataKey[0] = keyData + dataKey[1] = byte(po) + // dataKey[2:10] = first chunk has storageIdx 0 on [2:10] + if _, err := ldb.db.Get(dataKey); err != nil { + t.Fatal(err) + } + if err := ldb.db.Delete(dataKey); err != nil { + t.Fatal(err) + } + + // remove the gc index row for the first chunk + gcFirstCorrectKey := make([]byte, 9) + gcFirstCorrectKey[0] = keyGCIdx + if err := ldb.db.Delete(gcFirstCorrectKey); err != nil { + t.Fatal(err) + } + + // warp the gc data of the second chunk + // this data should be correct again after the clean + gcSecondCorrectKey := make([]byte, 9) + gcSecondCorrectKey[0] = keyGCIdx + binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1)) + gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey) + if err != nil { + t.Fatal(err) + } + warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1) + copy(warpedGCVal[1:], gcSecondCorrectVal) + if err := ldb.db.Delete(gcSecondCorrectKey); err != nil { + t.Fatal(err) + } + if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil { + t.Fatal(err) + } + + if err := ldb.CleanGCIndex(); err != nil { + t.Fatal(err) + } + + // the index without corresponding data should have been deleted + idxKey := make([]byte, 33) + idxKey[0] = keyIndex + copy(idxKey[1:], chunks[0].Address()) + if _, err := ldb.db.Get(idxKey); err == nil { + t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey) + } + + // the two other indices should be present + copy(idxKey[1:], chunks[1].Address()) + if _, err := ldb.db.Get(idxKey); err != nil { + t.Fatalf("expected chunk 1 idx to be present: %v", idxKey) + } + + copy(idxKey[1:], chunks[2].Address()) + if _, err := ldb.db.Get(idxKey); err != nil { + t.Fatalf("expected chunk 2 idx to be present: %v", idxKey) + } + + // first gc index should still be gone + if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil { + t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey) + } + + // second gc index should still be fixed + if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil { + t.Fatalf("expected gc 1 idx to be present: %v", idxKey) + } + + // third gc index should be unchanged + binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2)) + if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil { + t.Fatalf("expected gc 2 idx to be present: %v", idxKey) + } + + c, err := ldb.db.Get(keyEntryCnt) + if err != nil { + t.Fatalf("expected gc 2 idx to be present: %v", idxKey) + } + + // entrycount should now be one less + entryCount := binary.BigEndian.Uint64(c) + if entryCount != 2 { + t.Fatalf("expected entrycnt to be 2, was %d", c) + } + + // the chunks might accidentally be in the same bin + // if so that bin counter will now be 2 - the highest added index. + // if not, the total of them will be 3 + poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())} + if poBins[0] == poBins[1] { + poBins = poBins[:1] + } + + var binTotal uint64 + var currentBin [2]byte + currentBin[0] = keyDistanceCnt + if len(poBins) == 1 { + currentBin[1] = poBins[0] + c, err := ldb.db.Get(currentBin[:]) + if err != nil { + t.Fatalf("expected gc 2 idx to be present: %v", idxKey) + } + binCount := binary.BigEndian.Uint64(c) + if binCount != 2 { + t.Fatalf("expected entrycnt to be 2, was %d", binCount) + } + } else { + for _, bin := range poBins { + currentBin[1] = bin + c, err := ldb.db.Get(currentBin[:]) + if err != nil { + t.Fatalf("expected gc 2 idx to be present: %v", idxKey) + } + binCount := binary.BigEndian.Uint64(c) + binTotal += binCount + + } + if binTotal != 3 { + t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal) + } + } +} + func waitGc(ctx context.Context, ldb *LDBStore) { <-ldb.gc.runC ldb.gc.runC <- struct{}{} diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 6971d759e..fa98848dd 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -196,31 +196,48 @@ func (ls *LocalStore) Close() { // Migrate checks the datastore schema vs the runtime schema, and runs migrations if they don't match func (ls *LocalStore) Migrate() error { - schema, err := ls.DbStore.GetSchema() + actualDbSchema, err := ls.DbStore.GetSchema() if err != nil { log.Error(err.Error()) return err } - log.Debug("found schema", "schema", schema, "runtime-schema", CurrentDbSchema) - if schema != CurrentDbSchema { - // run migrations + log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema) - if schema == "" { - log.Debug("running migrations for", "schema", schema, "runtime-schema", CurrentDbSchema) + if actualDbSchema == CurrentDbSchema { + return nil + } + + if actualDbSchema == DbSchemaNone { + ls.migrateFromNoneToPurity() + actualDbSchema = DbSchemaPurity + } - // delete chunks that are not valid, i.e. chunks that do not pass any of the ls.Validators - ls.DbStore.Cleanup(func(c *chunk) bool { - return !ls.isValid(c) - }) + if err := ls.DbStore.PutSchema(actualDbSchema); err != nil { + return err + } - err := ls.DbStore.PutSchema(DbSchemaPurity) - if err != nil { - log.Error(err.Error()) - return err - } + if actualDbSchema == DbSchemaPurity { + if err := ls.migrateFromPurityToHalloween(); err != nil { + return err } + actualDbSchema = DbSchemaHalloween } + if err := ls.DbStore.PutSchema(actualDbSchema); err != nil { + return err + } return nil } + +func (ls *LocalStore) migrateFromNoneToPurity() { + // delete chunks that are not valid, i.e. chunks that do not pass + // any of the ls.Validators + ls.DbStore.Cleanup(func(c *chunk) bool { + return !ls.isValid(c) + }) +} + +func (ls *LocalStore) migrateFromPurityToHalloween() error { + return ls.DbStore.CleanGCIndex() +} diff --git a/swarm/storage/schema.go b/swarm/storage/schema.go index fb8498a29..91847ca0f 100644 --- a/swarm/storage/schema.go +++ b/swarm/storage/schema.go @@ -1,6 +1,17 @@ package storage +// The DB schema we want to use. The actual/current DB schema might differ +// until migrations are run. +const CurrentDbSchema = DbSchemaHalloween + +// There was a time when we had no schema at all. +const DbSchemaNone = "" + // "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5 const DbSchemaPurity = "purity" -const CurrentDbSchema = DbSchemaPurity +// "halloween" is here because we had a screw in the garbage collector index. +// Because of that we had to rebuild the GC index to get rid of erroneous +// entries and that takes a long time. This schema is used for bookkeeping, +// so rebuild index will run just once. +const DbSchemaHalloween = "halloween" |