diff options
Diffstat (limited to 'swarm/storage/localstore')
21 files changed, 986 insertions, 516 deletions
diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go new file mode 100644 index 000000000..411392b4e --- /dev/null +++ b/swarm/storage/localstore/export.go @@ -0,0 +1,204 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package localstore + +import ( + "archive/tar" + "context" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "sync" + + "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/shed" +) + +const ( + // filename in tar archive that holds the information + // about exported data format version + exportVersionFilename = ".swarm-export-version" + // legacy version for previous LDBStore + legacyExportVersion = "1" + // current export format version + currentExportVersion = "2" +) + +// Export writes a tar structured data to the writer of +// all chunks in the retrieval data index. It returns the +// number of chunks exported. +func (db *DB) Export(w io.Writer) (count int64, err error) { + tw := tar.NewWriter(w) + defer tw.Close() + + if err := tw.WriteHeader(&tar.Header{ + Name: exportVersionFilename, + Mode: 0644, + Size: int64(len(currentExportVersion)), + }); err != nil { + return 0, err + } + if _, err := tw.Write([]byte(currentExportVersion)); err != nil { + return 0, err + } + + err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + hdr := &tar.Header{ + Name: hex.EncodeToString(item.Address), + Mode: 0644, + Size: int64(len(item.Data)), + } + if err := tw.WriteHeader(hdr); err != nil { + return false, err + } + if _, err := tw.Write(item.Data); err != nil { + return false, err + } + count++ + return false, nil + }, nil) + + return count, err +} + +// Import reads a tar structured data from the reader and +// stores chunks in the database. It returns the number of +// chunks imported. +func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { + tr := tar.NewReader(r) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errC := make(chan error) + doneC := make(chan struct{}) + tokenPool := make(chan struct{}, 100) + var wg sync.WaitGroup + go func() { + var ( + firstFile = true + // if exportVersionFilename file is not present + // assume legacy version + version = legacyExportVersion + ) + for { + hdr, err := tr.Next() + if err != nil { + if err == io.EOF { + break + } + select { + case errC <- err: + case <-ctx.Done(): + } + } + if firstFile { + firstFile = false + if hdr.Name == exportVersionFilename { + data, err := ioutil.ReadAll(tr) + if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } + version = string(data) + continue + } + } + + if len(hdr.Name) != 64 { + log.Warn("ignoring non-chunk file", "name", hdr.Name) + continue + } + + keybytes, err := hex.DecodeString(hdr.Name) + if err != nil { + log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) + continue + } + + data, err := ioutil.ReadAll(tr) + if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } + key := chunk.Address(keybytes) + + var ch chunk.Chunk + switch version { + case legacyExportVersion: + // LDBStore Export exported chunk data prefixed with the chunk key. + // That is not necessary, as the key is in the chunk filename, + // but backward compatibility needs to be preserved. + ch = chunk.NewChunk(key, data[32:]) + case currentExportVersion: + ch = chunk.NewChunk(key, data) + default: + select { + case errC <- fmt.Errorf("unsupported export data version %q", version): + case <-ctx.Done(): + } + } + tokenPool <- struct{}{} + wg.Add(1) + + go func() { + _, err := db.Put(ctx, chunk.ModePutUpload, ch) + select { + case errC <- err: + case <-ctx.Done(): + wg.Done() + <-tokenPool + default: + _, err := db.Put(ctx, chunk.ModePutUpload, ch) + if err != nil { + errC <- err + } + wg.Done() + <-tokenPool + } + }() + + count++ + } + wg.Wait() + close(doneC) + }() + + // wait for all chunks to be stored + for { + select { + case err := <-errC: + if err != nil { + return count, err + } + case <-ctx.Done(): + return count, ctx.Err() + default: + select { + case <-doneC: + return count, nil + default: + } + } + } +} diff --git a/swarm/storage/localstore/export_test.go b/swarm/storage/localstore/export_test.go new file mode 100644 index 000000000..d7f848f80 --- /dev/null +++ b/swarm/storage/localstore/export_test.go @@ -0,0 +1,80 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package localstore + +import ( + "bytes" + "context" + "testing" + + "github.com/ethereum/go-ethereum/swarm/chunk" +) + +// TestExportImport constructs two databases, one to put and export +// chunks and another one to import and validate that all chunks are +// imported. +func TestExportImport(t *testing.T) { + db1, cleanup1 := newTestDB(t, nil) + defer cleanup1() + + var chunkCount = 100 + + chunks := make(map[string][]byte, chunkCount) + for i := 0; i < chunkCount; i++ { + ch := generateTestRandomChunk() + + _, err := db1.Put(context.Background(), chunk.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + chunks[string(ch.Address())] = ch.Data() + } + + var buf bytes.Buffer + + c, err := db1.Export(&buf) + if err != nil { + t.Fatal(err) + } + wantChunksCount := int64(len(chunks)) + if c != wantChunksCount { + t.Errorf("got export count %v, want %v", c, wantChunksCount) + } + + db2, cleanup2 := newTestDB(t, nil) + defer cleanup2() + + c, err = db2.Import(&buf, false) + if err != nil { + t.Fatal(err) + } + if c != wantChunksCount { + t.Errorf("got import count %v, want %v", c, wantChunksCount) + } + + for a, want := range chunks { + addr := chunk.Address([]byte(a)) + ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr) + if err != nil { + t.Fatal(err) + } + got := ch.Data() + if !bytes.Equal(got, want) { + t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want) + } + } +} diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go index 84c4f596d..748e0d663 100644 --- a/swarm/storage/localstore/gc.go +++ b/swarm/storage/localstore/gc.go @@ -17,7 +17,10 @@ package localstore import ( + "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) @@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() { // the rest of the garbage as the batch size limit is reached. // This function is called in collectGarbageWorker. func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { + metricName := "localstore.gc" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + defer func() { + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + }() + batch := new(leveldb.Batch) target := db.gcTarget() @@ -86,12 +98,17 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { if err != nil { return 0, true, err } + metrics.GetOrRegisterGauge(metricName+".gcsize", nil).Update(int64(gcSize)) done = true err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { if gcSize-collectedCount <= target { return true, nil } + + metrics.GetOrRegisterGauge(metricName+".storets", nil).Update(item.StoreTimestamp) + metrics.GetOrRegisterGauge(metricName+".accessts", nil).Update(item.AccessTimestamp) + // delete from retrieve, pull, gc db.retrievalDataIndex.DeleteInBatch(batch, item) db.retrievalAccessIndex.DeleteInBatch(batch, item) @@ -109,11 +126,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { if err != nil { return 0, false, err } + metrics.GetOrRegisterCounter(metricName+".collected-count", nil).Inc(int64(collectedCount)) db.gcSize.PutInBatch(batch, gcSize-collectedCount) err = db.shed.WriteBatch(batch) if err != nil { + metrics.GetOrRegisterCounter(metricName+".writebatch.err", nil).Inc(1) return 0, false, err } return collectedCount, done, nil diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go index 081e0af80..4a6e0a5f4 100644 --- a/swarm/storage/localstore/gc_test.go +++ b/swarm/storage/localstore/gc_test.go @@ -17,6 +17,7 @@ package localstore import ( + "context" "io/ioutil" "math/rand" "os" @@ -63,26 +64,23 @@ func testDB_collectGarbageWorker(t *testing.T) { })() defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - syncer := db.NewSetter(ModeSetSync) - addrs := make([]chunk.Address, 0) // upload random chunks for i := 0; i < chunkCount; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = syncer.Set(chunk.Address()) + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } - addrs = append(addrs, chunk.Address()) + addrs = append(addrs, ch.Address()) } gcTarget := db.gcTarget() @@ -110,7 +108,7 @@ func testDB_collectGarbageWorker(t *testing.T) { // the first synced chunk should be removed t.Run("get the first synced chunk", func(t *testing.T) { - _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0]) if err != chunk.ErrChunkNotFound { t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound) } @@ -118,7 +116,7 @@ func testDB_collectGarbageWorker(t *testing.T) { // last synced chunk should not be removed t.Run("get most recent synced chunk", func(t *testing.T) { - _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1]) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1]) if err != nil { t.Fatal(err) } @@ -134,9 +132,6 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { }) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - syncer := db.NewSetter(ModeSetSync) - testHookCollectGarbageChan := make(chan uint64) defer setTestHookCollectGarbage(func(collectedCount uint64) { testHookCollectGarbageChan <- collectedCount @@ -146,19 +141,19 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // upload random chunks just up to the capacity for i := 0; i < int(db.capacity)-1; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = syncer.Set(chunk.Address()) + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } - addrs = append(addrs, chunk.Address()) + addrs = append(addrs, ch.Address()) } // set update gc test hook to signal when @@ -172,7 +167,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // request the latest synced chunk // to prioritize it in the gc index // not to be collected - _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0]) if err != nil { t.Fatal(err) } @@ -191,11 +186,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // upload and sync another chunk to trigger // garbage collection ch := generateTestRandomChunk() - err = uploader.Put(ch) + _, err = db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = syncer.Set(ch.Address()) + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } @@ -235,7 +230,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // requested chunk should not be removed t.Run("get requested chunk", func(t *testing.T) { - _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0]) if err != nil { t.Fatal(err) } @@ -243,7 +238,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // the second synced chunk should be removed t.Run("get gc-ed chunk", func(t *testing.T) { - _, err := db.NewGetter(ModeGetRequest).Get(addrs[1]) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[1]) if err != chunk.ErrChunkNotFound { t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound) } @@ -251,7 +246,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // last synced chunk should not be removed t.Run("get most recent synced chunk", func(t *testing.T) { - _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1]) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1]) if err != nil { t.Fatal(err) } @@ -275,20 +270,17 @@ func TestDB_gcSize(t *testing.T) { t.Fatal(err) } - uploader := db.NewPutter(ModePutUpload) - syncer := db.NewSetter(ModeSetSync) - count := 100 for i := 0; i < count; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = syncer.Set(chunk.Address()) + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go index cf19e4f6c..0f23aa10a 100644 --- a/swarm/storage/localstore/index_test.go +++ b/swarm/storage/localstore/index_test.go @@ -18,6 +18,7 @@ package localstore import ( "bytes" + "context" "math/rand" "testing" @@ -35,29 +36,22 @@ func TestDB_pullIndex(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - chunkCount := 50 chunks := make([]testIndexChunk, chunkCount) // upload random chunks for i := 0; i < chunkCount; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } chunks[i] = testIndexChunk{ - Chunk: chunk, - // this timestamp is not the same as in - // the index, but given that uploads - // are sequential and that only ordering - // of events matter, this information is - // sufficient - storeTimestamp: now(), + Chunk: ch, + binID: uint64(i), } } @@ -70,10 +64,10 @@ func TestDB_pullIndex(t *testing.T) { if poi > poj { return false } - if chunks[i].storeTimestamp < chunks[j].storeTimestamp { + if chunks[i].binID < chunks[j].binID { return true } - if chunks[i].storeTimestamp > chunks[j].storeTimestamp { + if chunks[i].binID > chunks[j].binID { return false } return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1 @@ -87,23 +81,21 @@ func TestDB_gcIndex(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - chunkCount := 50 chunks := make([]testIndexChunk, chunkCount) // upload random chunks for i := 0; i < chunkCount; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } chunks[i] = testIndexChunk{ - Chunk: chunk, + Chunk: ch, } } @@ -123,9 +115,9 @@ func TestDB_gcIndex(t *testing.T) { })() t.Run("request unsynced", func(t *testing.T) { - chunk := chunks[1] + ch := chunks[1] - _, err := db.NewGetter(ModeGetRequest).Get(chunk.Address()) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } @@ -140,9 +132,9 @@ func TestDB_gcIndex(t *testing.T) { }) t.Run("sync one chunk", func(t *testing.T) { - chunk := chunks[0] + ch := chunks[0] - err := db.NewSetter(ModeSetSync).Set(chunk.Address()) + err := db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } @@ -154,10 +146,8 @@ func TestDB_gcIndex(t *testing.T) { }) t.Run("sync all chunks", func(t *testing.T) { - setter := db.NewSetter(ModeSetSync) - for i := range chunks { - err := setter.Set(chunks[i].Address()) + err := db.Set(context.Background(), chunk.ModeSetSync, chunks[i].Address()) if err != nil { t.Fatal(err) } @@ -171,7 +161,7 @@ func TestDB_gcIndex(t *testing.T) { t.Run("request one chunk", func(t *testing.T) { i := 6 - _, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address()) + _, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address()) if err != nil { t.Fatal(err) } @@ -189,14 +179,13 @@ func TestDB_gcIndex(t *testing.T) { }) t.Run("random chunk request", func(t *testing.T) { - requester := db.NewGetter(ModeGetRequest) rand.Shuffle(len(chunks), func(i, j int) { chunks[i], chunks[j] = chunks[j], chunks[i] }) - for _, chunk := range chunks { - _, err := requester.Get(chunk.Address()) + for _, ch := range chunks { + _, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } @@ -212,7 +201,7 @@ func TestDB_gcIndex(t *testing.T) { t.Run("remove one chunk", func(t *testing.T) { i := 3 - err := db.NewSetter(modeSetRemove).Set(chunks[i].Address()) + err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address()) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go index 98d4c7881..3b0bd8a93 100644 --- a/swarm/storage/localstore/localstore.go +++ b/swarm/storage/localstore/localstore.go @@ -23,11 +23,15 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/ethereum/go-ethereum/swarm/storage/mock" ) +// DB implements chunk.Store. +var _ chunk.Store = &DB{} + var ( // ErrInvalidMode is retuned when an unknown Mode // is provided to the function. @@ -69,6 +73,10 @@ type DB struct { pullTriggers map[uint8][]chan struct{} pullTriggersMu sync.RWMutex + // binIDs stores the latest chunk serial ID for every + // proximity order bin + binIDs shed.Uint64Vector + // garbage collection index gcIndex shed.Index @@ -124,7 +132,10 @@ type Options struct { // One goroutine for writing batches is created. func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if o == nil { - o = new(Options) + // default options + o = &Options{ + Capacity: 5000000, + } } db = &DB{ capacity: o.Capacity, @@ -148,11 +159,23 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if err != nil { return nil, err } + // Identify current storage schema by arbitrary name. db.schemaName, err = db.shed.NewStringField("schema-name") if err != nil { return nil, err } + schemaName, err := db.schemaName.Get() + if err != nil { + return nil, err + } + if schemaName == "" { + // initial new localstore run + err := db.schemaName.Put(DbSchemaSanctuary) + if err != nil { + return nil, err + } + } // Persist gc size. db.gcSize, err = db.shed.NewUint64Field("gc-size") if err != nil { @@ -165,8 +188,9 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ) if o.MockStore != nil { encodeValueFunc = func(fields shed.Item) (value []byte, err error) { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) err = o.MockStore.Put(fields.Address, fields.Data) if err != nil { return nil, err @@ -174,25 +198,28 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { return b, nil } decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) e.Data, err = o.MockStore.Get(keyItem.Address) return e, err } } else { encodeValueFunc = func(fields shed.Item) (value []byte, err error) { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) value = append(b, fields.Data...) return value, nil } decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) - e.Data = value[8:] + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + e.Data = value[16:] return e, nil } } - // Index storing actual chunk address, data and store timestamp. - db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ + // Index storing actual chunk address, data and bin id. + db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, @@ -230,33 +257,37 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { return nil, err } // pull index allows history and live syncing per po bin - db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { key = make([]byte, 41) key[0] = db.po(fields.Address) - binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp)) - copy(key[9:], fields.Address[:]) + binary.BigEndian.PutUint64(key[1:9], fields.BinID) return key, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { - e.Address = key[9:] - e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9])) + e.BinID = binary.BigEndian.Uint64(key[1:9]) return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { - return nil, nil + return fields.Address, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.Address = value return e, nil }, }) if err != nil { return nil, err } + // create a vector for bin IDs + db.binIDs, err = db.shed.NewUint64Vector("bin-ids") + if err != nil { + return nil, err + } // create a pull syncing triggers used by SubscribePull function db.pullTriggers = make(map[uint8][]chan struct{}) // push index contains as yet unsynced chunks - db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{ + db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { key = make([]byte, 40) binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp)) @@ -281,17 +312,17 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { // create a push syncing triggers used by SubscribePush function db.pushTriggers = make([]chan struct{}, 0) // gc index for removable chunk ordered by ascending last access time - db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { b := make([]byte, 16, 16+len(fields.Address)) binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) - binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + binary.BigEndian.PutUint64(b[8:16], fields.BinID) key = append(b, fields.Address...) return key, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) - e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) + e.BinID = binary.BigEndian.Uint64(key[8:16]) e.Address = key[16:] return e, nil }, @@ -358,3 +389,12 @@ func init() { return time.Now().UTC().UnixNano() } } + +// totalTimeMetric logs a message about time between provided start time +// and the time when the function is called and sends a resetting timer metric +// with provided name appended with ".total-time". +func totalTimeMetric(name string, start time.Time) { + totalTime := time.Since(start) + log.Trace(name+" total time", "time", totalTime) + metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) +} diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go index 42e762587..6dbc4b7ad 100644 --- a/swarm/storage/localstore/localstore_test.go +++ b/swarm/storage/localstore/localstore_test.go @@ -18,6 +18,7 @@ package localstore import ( "bytes" + "context" "fmt" "io/ioutil" "math/rand" @@ -59,23 +60,23 @@ func TestDB(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - got, err := db.NewGetter(ModeGetRequest).Get(chunk.Address()) + got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(got.Address(), chunk.Address()) { - t.Errorf("got address %x, want %x", got.Address(), chunk.Address()) + if !bytes.Equal(got.Address(), ch.Address()) { + t.Errorf("got address %x, want %x", got.Address(), ch.Address()) } - if !bytes.Equal(got.Data(), chunk.Data()) { - t.Errorf("got data %x, want %x", got.Data(), chunk.Data()) + if !bytes.Equal(got.Data(), ch.Data()) { + t.Errorf("got data %x, want %x", got.Data(), ch.Data()) } } @@ -113,19 +114,17 @@ func TestDB_updateGCSem(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - getter := db.NewGetter(ModeGetRequest) - // get more chunks then maxParallelUpdateGC // in time shorter then updateGCSleep for i := 0; i < 5; i++ { - _, err = getter.Get(chunk.Address()) + _, err = db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } @@ -237,71 +236,71 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim // newRetrieveIndexesTestWithAccess returns a test function that validates if the right // chunk values are in the retrieval indexes when access time must be stored. -func newRetrieveIndexesTestWithAccess(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { +func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { return func(t *testing.T) { - item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) + item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address())) if err != nil { t.Fatal(err) } - validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0) + validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0) if accessTimestamp > 0 { - item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) + item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address())) if err != nil { t.Fatal(err) } - validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp) + validateItem(t, item, ch.Address(), nil, 0, accessTimestamp) } } } // newPullIndexTest returns a test function that validates if the right // chunk values are in the pull index. -func newPullIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { +func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) { return func(t *testing.T) { item, err := db.pullIndex.Get(shed.Item{ - Address: chunk.Address(), - StoreTimestamp: storeTimestamp, + Address: ch.Address(), + BinID: binID, }) if err != wantError { t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0) + validateItem(t, item, ch.Address(), nil, 0, 0) } } } // newPushIndexTest returns a test function that validates if the right // chunk values are in the push index. -func newPushIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { +func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { return func(t *testing.T) { item, err := db.pushIndex.Get(shed.Item{ - Address: chunk.Address(), + Address: ch.Address(), StoreTimestamp: storeTimestamp, }) if err != wantError { t.Errorf("got error %v, want %v", err, wantError) } if err == nil { - validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0) + validateItem(t, item, ch.Address(), nil, storeTimestamp, 0) } } } // newGCIndexTest returns a test function that validates if the right // chunk values are in the push index. -func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { +func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) { return func(t *testing.T) { item, err := db.gcIndex.Get(shed.Item{ Address: chunk.Address(), - StoreTimestamp: storeTimestamp, + BinID: binID, AccessTimestamp: accessTimestamp, }) if err != nil { t.Fatal(err) } - validateItem(t, item, chunk.Address(), nil, storeTimestamp, accessTimestamp) + validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp) } } @@ -349,7 +348,7 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) { // in database. It is used for index values validations. type testIndexChunk struct { chunk.Chunk - storeTimestamp int64 + binID uint64 } // testItemsOrder tests the order of chunks in the index. If sortFunc is not nil, diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go index a6353e141..efef82858 100644 --- a/swarm/storage/localstore/mode_get.go +++ b/swarm/storage/localstore/mode_get.go @@ -17,45 +17,35 @@ package localstore import ( + "context" + "fmt" + "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) -// ModeGet enumerates different Getter modes. -type ModeGet int - -// Getter modes. -const ( - // ModeGetRequest: when accessed for retrieval - ModeGetRequest ModeGet = iota - // ModeGetSync: when accessed for syncing or proof of custody request - ModeGetSync -) - -// Getter provides Get method to retrieve Chunks -// from database. -type Getter struct { - db *DB - mode ModeGet -} - -// NewGetter returns a new Getter on database -// with a specific Mode. -func (db *DB) NewGetter(mode ModeGet) *Getter { - return &Getter{ - mode: mode, - db: db, - } -} - // Get returns a chunk from the database. If the chunk is // not found chunk.ErrChunkNotFound will be returned. // All required indexes will be updated required by the -// Getter Mode. -func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) { - out, err := g.db.get(g.mode, addr) +// Getter Mode. Get is required to implement chunk.Store +// interface. +func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) { + metricName := fmt.Sprintf("localstore.Get.%s", mode) + + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + defer func() { + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + }() + + out, err := db.get(mode, addr) if err != nil { if err == leveldb.ErrNotFound { return nil, chunk.ErrChunkNotFound @@ -67,7 +57,7 @@ func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) { // get returns Item from the retrieval index // and updates other indexes. -func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { +func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err error) { item := addressToItem(addr) out, err = db.retrievalDataIndex.Get(item) @@ -76,7 +66,7 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { } switch mode { // update the access timestamp and gc index - case ModeGetRequest: + case chunk.ModeGetRequest: if db.updateGCSem != nil { // wait before creating new goroutines // if updateGCSem buffer id full @@ -90,8 +80,14 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { // for a new goroutine defer func() { <-db.updateGCSem }() } + + metricName := "localstore.updateGC" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + err := db.updateGC(out) if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) log.Error("localstore update gc", "err", err) } // if gc update hook is defined, call it @@ -101,7 +97,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { }() // no updates to indexes - case ModeGetSync: + case chunk.ModeGetSync: + case chunk.ModeGetLookup: default: return out, ErrInvalidMode } diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go index 28a70ee0c..217fa5d2d 100644 --- a/swarm/storage/localstore/mode_get_test.go +++ b/swarm/storage/localstore/mode_get_test.go @@ -18,8 +18,11 @@ package localstore import ( "bytes" + "context" "testing" "time" + + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestModeGetRequest validates ModeGetRequest index values on the provided DB. @@ -32,15 +35,13 @@ func TestModeGetRequest(t *testing.T) { return uploadTimestamp })() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - requester := db.NewGetter(ModeGetRequest) - // set update gc test hook to signal when // update gc goroutine is done by sending to // testHookUpdateGCChan channel, which is @@ -52,22 +53,22 @@ func TestModeGetRequest(t *testing.T) { })() t.Run("get unsynced", func(t *testing.T) { - got, err := requester.Get(chunk.Address()) + got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } // wait for update gc goroutine to be done <-testHookUpdateGCChan - if !bytes.Equal(got.Address(), chunk.Address()) { - t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + if !bytes.Equal(got.Address(), ch.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address()) } - if !bytes.Equal(got.Data(), chunk.Data()) { - t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + if !bytes.Equal(got.Data(), ch.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data()) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) @@ -75,30 +76,30 @@ func TestModeGetRequest(t *testing.T) { }) // set chunk to synced state - err = db.NewSetter(ModeSetSync).Set(chunk.Address()) + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } t.Run("first get", func(t *testing.T) { - got, err := requester.Get(chunk.Address()) + got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } // wait for update gc goroutine to be done <-testHookUpdateGCChan - if !bytes.Equal(got.Address(), chunk.Address()) { - t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + if !bytes.Equal(got.Address(), ch.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address()) } - if !bytes.Equal(got.Data(), chunk.Data()) { - t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + if !bytes.Equal(got.Data(), ch.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data()) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, uploadTimestamp)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp)) - t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, uploadTimestamp)) + t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) @@ -111,24 +112,24 @@ func TestModeGetRequest(t *testing.T) { return accessTimestamp })() - got, err := requester.Get(chunk.Address()) + got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } // wait for update gc goroutine to be done <-testHookUpdateGCChan - if !bytes.Equal(got.Address(), chunk.Address()) { - t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + if !bytes.Equal(got.Address(), ch.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address()) } - if !bytes.Equal(got.Data(), chunk.Data()) { - t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + if !bytes.Equal(got.Data(), ch.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data()) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, accessTimestamp)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, accessTimestamp)) - t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, accessTimestamp)) + t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) @@ -146,27 +147,27 @@ func TestModeGetSync(t *testing.T) { return uploadTimestamp })() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - got, err := db.NewGetter(ModeGetSync).Get(chunk.Address()) + got, err := db.Get(context.Background(), chunk.ModeGetSync, ch.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(got.Address(), chunk.Address()) { - t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + if !bytes.Equal(got.Address(), ch.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address()) } - if !bytes.Equal(got.Data(), chunk.Data()) { - t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + if !bytes.Equal(got.Data(), ch.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data()) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) diff --git a/swarm/storage/localstore/mode_has.go b/swarm/storage/localstore/mode_has.go index 90feaceef..a70ee31b2 100644 --- a/swarm/storage/localstore/mode_has.go +++ b/swarm/storage/localstore/mode_has.go @@ -17,23 +17,23 @@ package localstore import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" ) -// Hasser provides Has method to retrieve Chunks -// from database. -type Hasser struct { - db *DB -} +// Has returns true if the chunk is stored in database. +func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) { + metricName := "localstore.Has" -// NewHasser returns a new Hasser on database. -func (db *DB) NewHasser() *Hasser { - return &Hasser{ - db: db, - } -} + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) -// Has returns true if the chunk is stored in database. -func (h *Hasser) Has(addr chunk.Address) (bool, error) { - return h.db.retrievalDataIndex.Has(addressToItem(addr)) + has, err := db.retrievalDataIndex.Has(addressToItem(addr)) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + return has, err } diff --git a/swarm/storage/localstore/mode_has_test.go b/swarm/storage/localstore/mode_has_test.go index 332616ca2..043b21a2b 100644 --- a/swarm/storage/localstore/mode_has_test.go +++ b/swarm/storage/localstore/mode_has_test.go @@ -17,7 +17,10 @@ package localstore import ( + "context" "testing" + + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestHas validates that Hasser is returning true for @@ -26,16 +29,14 @@ func TestHas(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - hasser := db.NewHasser() - - has, err := hasser.Has(chunk.Address()) + has, err := db.Has(context.Background(), ch.Address()) if err != nil { t.Fatal(err) } @@ -45,7 +46,7 @@ func TestHas(t *testing.T) { missingChunk := generateTestRandomChunk() - has, err = hasser.Has(missingChunk.Address()) + has, err = db.Has(context.Background(), missingChunk.Address()) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go index 1599ca8e3..a8e355ad0 100644 --- a/swarm/storage/localstore/mode_put.go +++ b/swarm/storage/localstore/mode_put.go @@ -17,44 +17,31 @@ package localstore import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) -// ModePut enumerates different Putter modes. -type ModePut int - -// Putter modes. -const ( - // ModePutRequest: when a chunk is received as a result of retrieve request and delivery - ModePutRequest ModePut = iota - // ModePutSync: when a chunk is received via syncing - ModePutSync - // ModePutUpload: when a chunk is created by local upload - ModePutUpload -) +// Put stores the Chunk to database and depending +// on the Putter mode, it updates required indexes. +// Put is required to implement chunk.Store +// interface. +func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) { + metricName := fmt.Sprintf("localstore.Put.%s", mode) -// Putter provides Put method to store Chunks -// to database. -type Putter struct { - db *DB - mode ModePut -} + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) -// NewPutter returns a new Putter on database -// with a specific Mode. -func (db *DB) NewPutter(mode ModePut) *Putter { - return &Putter{ - mode: mode, - db: db, + exists, err = db.put(mode, chunkToItem(ch)) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) } -} - -// Put stores the Chunk to database and depending -// on the Putter mode, it updates required indexes. -func (p *Putter) Put(ch chunk.Chunk) (err error) { - return p.db.put(p.mode, chunkToItem(ch)) + return exists, err } // put stores Item to database and updates other @@ -62,7 +49,7 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) { // of this function for the same address in parallel. // Item fields Address and Data must not be // with their nil values. -func (db *DB) put(mode ModePut, item shed.Item) (err error) { +func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) { // protect parallel updates db.batchMu.Lock() defer db.batchMu.Unlock() @@ -76,7 +63,7 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { var triggerPushFeed bool // signal push feed subscriptions to iterate switch mode { - case ModePutRequest: + case chunk.ModePutRequest: // put to indexes: retrieve, gc; it does not enter the syncpool // check if the chunk already is in the database @@ -84,20 +71,25 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { i, err := db.retrievalAccessIndex.Get(item) switch err { case nil: + exists = true item.AccessTimestamp = i.AccessTimestamp case leveldb.ErrNotFound: + exists = false // no chunk accesses default: - return err + return false, err } i, err = db.retrievalDataIndex.Get(item) switch err { case nil: + exists = true item.StoreTimestamp = i.StoreTimestamp + item.BinID = i.BinID case leveldb.ErrNotFound: // no chunk accesses + exists = false default: - return err + return false, err } if item.AccessTimestamp != 0 { // delete current entry from the gc index @@ -107,6 +99,12 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { if item.StoreTimestamp == 0 { item.StoreTimestamp = now() } + if item.BinID == 0 { + item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address))) + if err != nil { + return false, err + } + } // update access timestamp item.AccessTimestamp = now() // update retrieve access index @@ -117,36 +115,56 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { db.retrievalDataIndex.PutInBatch(batch, item) - case ModePutUpload: + case chunk.ModePutUpload: // put to indexes: retrieve, push, pull - item.StoreTimestamp = now() - db.retrievalDataIndex.PutInBatch(batch, item) - db.pullIndex.PutInBatch(batch, item) - triggerPullFeed = true - db.pushIndex.PutInBatch(batch, item) - triggerPushFeed = true + exists, err = db.retrievalDataIndex.Has(item) + if err != nil { + return false, err + } + if !exists { + item.StoreTimestamp = now() + item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address))) + if err != nil { + return false, err + } + db.retrievalDataIndex.PutInBatch(batch, item) + db.pullIndex.PutInBatch(batch, item) + triggerPullFeed = true + db.pushIndex.PutInBatch(batch, item) + triggerPushFeed = true + } - case ModePutSync: + case chunk.ModePutSync: // put to indexes: retrieve, pull - item.StoreTimestamp = now() - db.retrievalDataIndex.PutInBatch(batch, item) - db.pullIndex.PutInBatch(batch, item) - triggerPullFeed = true + exists, err = db.retrievalDataIndex.Has(item) + if err != nil { + return exists, err + } + if !exists { + item.StoreTimestamp = now() + item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address))) + if err != nil { + return false, err + } + db.retrievalDataIndex.PutInBatch(batch, item) + db.pullIndex.PutInBatch(batch, item) + triggerPullFeed = true + } default: - return ErrInvalidMode + return false, ErrInvalidMode } err = db.incGCSizeInBatch(batch, gcSizeChange) if err != nil { - return err + return false, err } err = db.shed.WriteBatch(batch) if err != nil { - return err + return false, err } if triggerPullFeed { db.triggerPullSubscriptions(db.po(item.Address)) @@ -154,5 +172,5 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { if triggerPushFeed { db.triggerPushSubscriptions() } - return nil + return exists, nil } diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go index 8ecae1d2e..5376aa8b3 100644 --- a/swarm/storage/localstore/mode_put_test.go +++ b/swarm/storage/localstore/mode_put_test.go @@ -18,6 +18,7 @@ package localstore import ( "bytes" + "context" "fmt" "sync" "testing" @@ -31,9 +32,7 @@ func TestModePutRequest(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - putter := db.NewPutter(ModePutRequest) - - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() // keep the record when the chunk is stored var storeTimestamp int64 @@ -46,12 +45,12 @@ func TestModePutRequest(t *testing.T) { storeTimestamp = wantTimestamp - err := putter.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutRequest, ch) if err != nil { t.Fatal(err) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) @@ -64,12 +63,12 @@ func TestModePutRequest(t *testing.T) { return wantTimestamp })() - err := putter.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutRequest, ch) if err != nil { t.Fatal(err) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, storeTimestamp, wantTimestamp)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, storeTimestamp, wantTimestamp)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) @@ -87,16 +86,16 @@ func TestModePutSync(t *testing.T) { return wantTimestamp })() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutSync).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutSync, ch) if err != nil { t.Fatal(err) } - t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0)) + t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0)) - t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil)) + t.Run("pull index", newPullIndexTest(db, ch, 1, nil)) } // TestModePutUpload validates ModePutUpload index values on the provided DB. @@ -109,18 +108,18 @@ func TestModePutUpload(t *testing.T) { return wantTimestamp })() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0)) + t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0)) - t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil)) + t.Run("pull index", newPullIndexTest(db, ch, 1, nil)) - t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, nil)) + t.Run("push index", newPushIndexTest(db, ch, wantTimestamp, nil)) } // TestModePutUpload_parallel uploads chunks in parallel @@ -140,14 +139,13 @@ func TestModePutUpload_parallel(t *testing.T) { // start uploader workers for i := 0; i < workerCount; i++ { go func(i int) { - uploader := db.NewPutter(ModePutUpload) for { select { - case chunk, ok := <-chunkChan: + case ch, ok := <-chunkChan: if !ok { return } - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) select { case errChan <- err: case <-doneChan: @@ -188,21 +186,85 @@ func TestModePutUpload_parallel(t *testing.T) { } // get every chunk and validate its data - getter := db.NewGetter(ModeGetRequest) - chunksMu.Lock() defer chunksMu.Unlock() - for _, chunk := range chunks { - got, err := getter.Get(chunk.Address()) + for _, ch := range chunks { + got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(got.Data(), chunk.Data()) { - t.Fatalf("got chunk %s data %x, want %x", chunk.Address().Hex(), got.Data(), chunk.Data()) + if !bytes.Equal(got.Data(), ch.Data()) { + t.Fatalf("got chunk %s data %x, want %x", ch.Address().Hex(), got.Data(), ch.Data()) } } } +// TestModePut_sameChunk puts the same chunk multiple times +// and validates that all relevant indexes have only one item +// in them. +func TestModePut_sameChunk(t *testing.T) { + ch := generateTestRandomChunk() + + for _, tc := range []struct { + name string + mode chunk.ModePut + pullIndex bool + pushIndex bool + }{ + { + name: "ModePutRequest", + mode: chunk.ModePutRequest, + pullIndex: false, + pushIndex: false, + }, + { + name: "ModePutUpload", + mode: chunk.ModePutUpload, + pullIndex: true, + pushIndex: true, + }, + { + name: "ModePutSync", + mode: chunk.ModePutSync, + pullIndex: true, + pushIndex: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + for i := 0; i < 10; i++ { + exists, err := db.Put(context.Background(), tc.mode, ch) + if err != nil { + t.Fatal(err) + } + switch exists { + case false: + if i != 0 { + t.Fatal("should not exist only on first Put") + } + case true: + if i == 0 { + t.Fatal("should exist on all cases other than the first one") + } + } + + count := func(b bool) (c int) { + if b { + return 1 + } + return 0 + } + + newItemsCountTest(db.retrievalDataIndex, 1)(t) + newItemsCountTest(db.pullIndex, count(tc.pullIndex))(t) + newItemsCountTest(db.pushIndex, count(tc.pushIndex))(t) + } + }) + } +} + // BenchmarkPutUpload runs a series of benchmarks that upload // a specific number of chunks in parallel. // @@ -270,7 +332,6 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) db, cleanupFunc := newTestDB(b, o) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) chunks := make([]chunk.Chunk, count) for i := 0; i < count; i++ { chunks[i] = generateTestRandomChunk() @@ -286,7 +347,8 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) go func(i int) { defer func() { <-sem }() - errs <- uploader.Put(chunks[i]) + _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i]) + errs <- err }(i) } }() diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go index 83fcbea52..14b48a22e 100644 --- a/swarm/storage/localstore/mode_set.go +++ b/swarm/storage/localstore/mode_set.go @@ -17,51 +17,37 @@ package localstore import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/syndtr/goleveldb/leveldb" ) -// ModeSet enumerates different Setter modes. -type ModeSet int - -// Setter modes. -const ( - // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery - ModeSetAccess ModeSet = iota - // ModeSetSync: when push sync receipt is received - ModeSetSync - // modeSetRemove: when GC-d - // unexported as no external packages should remove chunks from database - modeSetRemove -) +// Set updates database indexes for a specific +// chunk represented by the address. +// Set is required to implement chunk.Store +// interface. +func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) { + metricName := fmt.Sprintf("localstore.Set.%s", mode) -// Setter sets the state of a particular -// Chunk in database by changing indexes. -type Setter struct { - db *DB - mode ModeSet -} + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) -// NewSetter returns a new Setter on database -// with a specific Mode. -func (db *DB) NewSetter(mode ModeSet) *Setter { - return &Setter{ - mode: mode, - db: db, + err = db.set(mode, addr) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) } -} - -// Set updates database indexes for a specific -// chunk represented by the address. -func (s *Setter) Set(addr chunk.Address) (err error) { - return s.db.set(s.mode, addr) + return err } // set updates database indexes for a specific // chunk represented by the address. // It acquires lockAddr to protect two calls // of this function for the same address in parallel. -func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { +func (db *DB) set(mode chunk.ModeSet, addr chunk.Address) (err error) { // protect parallel updates db.batchMu.Lock() defer db.batchMu.Unlock() @@ -76,7 +62,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { item := addressToItem(addr) switch mode { - case ModeSetAccess: + case chunk.ModeSetAccess: // add to pull, insert to gc // need to get access timestamp here as it is not @@ -87,9 +73,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { switch err { case nil: item.StoreTimestamp = i.StoreTimestamp + item.BinID = i.BinID case leveldb.ErrNotFound: db.pushIndex.DeleteInBatch(batch, item) item.StoreTimestamp = now() + item.BinID, err = db.binIDs.Inc(uint64(db.po(item.Address))) + if err != nil { + return err + } default: return err } @@ -112,7 +103,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { db.gcIndex.PutInBatch(batch, item) gcSizeChange++ - case ModeSetSync: + case chunk.ModeSetSync: // delete from push, insert to gc // need to get access timestamp here as it is not @@ -131,6 +122,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { return err } item.StoreTimestamp = i.StoreTimestamp + item.BinID = i.BinID i, err = db.retrievalAccessIndex.Get(item) switch err { @@ -149,7 +141,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { db.gcIndex.PutInBatch(batch, item) gcSizeChange++ - case modeSetRemove: + case chunk.ModeSetRemove: // delete from retrieve, pull, gc // need to get access timestamp here as it is not @@ -169,6 +161,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { return err } item.StoreTimestamp = i.StoreTimestamp + item.BinID = i.BinID db.retrievalDataIndex.DeleteInBatch(batch, item) db.retrievalAccessIndex.DeleteInBatch(batch, item) diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go index 674aaabec..9ba62cd20 100644 --- a/swarm/storage/localstore/mode_set_test.go +++ b/swarm/storage/localstore/mode_set_test.go @@ -17,9 +17,11 @@ package localstore import ( + "context" "testing" "time" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/syndtr/goleveldb/leveldb" ) @@ -28,23 +30,23 @@ func TestModeSetAccess(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() wantTimestamp := time.Now().UTC().UnixNano() defer setNow(func() (t int64) { return wantTimestamp })() - err := db.NewSetter(ModeSetAccess).Set(chunk.Address()) + err := db.Set(context.Background(), chunk.ModeSetAccess, ch.Address()) if err != nil { t.Fatal(err) } - t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil)) + t.Run("pull index", newPullIndexTest(db, ch, 1, nil)) t.Run("pull index count", newItemsCountTest(db.pullIndex, 1)) - t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp)) + t.Run("gc index", newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, 1)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) @@ -56,28 +58,28 @@ func TestModeSetSync(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() wantTimestamp := time.Now().UTC().UnixNano() defer setNow(func() (t int64) { return wantTimestamp })() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.NewSetter(ModeSetSync).Set(chunk.Address()) + err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address()) if err != nil { t.Fatal(err) } - t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp)) + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)) - t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, leveldb.ErrNotFound)) + t.Run("push index", newPushIndexTest(db, ch, wantTimestamp, leveldb.ErrNotFound)) - t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp)) + t.Run("gc index", newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, 1)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) @@ -89,40 +91,39 @@ func TestModeSetRemove(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := db.NewPutter(ModePutUpload).Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - err = db.NewSetter(modeSetRemove).Set(chunk.Address()) + err = db.Set(context.Background(), chunk.ModeSetRemove, ch.Address()) if err != nil { t.Fatal(err) } t.Run("retrieve indexes", func(t *testing.T) { wantErr := leveldb.ErrNotFound - _, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) + _, err := db.retrievalDataIndex.Get(addressToItem(ch.Address())) if err != wantErr { t.Errorf("got error %v, want %v", err, wantErr) } t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0)) // access index should not be set - _, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) + _, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address())) if err != wantErr { t.Errorf("got error %v, want %v", err, wantErr) } t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0)) }) - t.Run("pull index", newPullIndexTest(db, chunk, 0, leveldb.ErrNotFound)) + t.Run("pull index", newPullIndexTest(db, ch, 0, leveldb.ErrNotFound)) t.Run("pull index count", newItemsCountTest(db.pullIndex, 0)) t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) t.Run("gc size", newIndexGCSizeTest(db)) - } diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go index b08790124..4ca2e32e6 100644 --- a/swarm/storage/localstore/retrieval_index_test.go +++ b/swarm/storage/localstore/retrieval_index_test.go @@ -17,6 +17,7 @@ package localstore import ( + "context" "strconv" "testing" @@ -61,17 +62,14 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) { b.StopTimer() db, cleanupFunc := newTestDB(b, o) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - syncer := db.NewSetter(ModeSetSync) - requester := db.NewGetter(ModeGetRequest) addrs := make([]chunk.Address, count) for i := 0; i < count; i++ { - chunk := generateTestRandomChunk() - err := uploader.Put(chunk) + ch := generateTestRandomChunk() + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { b.Fatal(err) } - addrs[i] = chunk.Address() + addrs[i] = ch.Address() } // set update gc test hook to signal when // update gc goroutine is done by sending to @@ -85,12 +83,12 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) { b.StartTimer() for i := 0; i < count; i++ { - err := syncer.Set(addrs[i]) + err := db.Set(context.Background(), chunk.ModeSetSync, addrs[i]) if err != nil { b.Fatal(err) } - _, err = requester.Get(addrs[i]) + _, err = db.Get(context.Background(), chunk.ModeGetRequest, addrs[i]) if err != nil { b.Fatal(err) } @@ -133,7 +131,6 @@ func benchmarkUpload(b *testing.B, o *Options, count int) { b.StopTimer() db, cleanupFunc := newTestDB(b, o) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) chunks := make([]chunk.Chunk, count) for i := 0; i < count; i++ { chunk := generateTestRandomChunk() @@ -142,7 +139,7 @@ func benchmarkUpload(b *testing.B, o *Options, count int) { b.StartTimer() for i := 0; i < count; i++ { - err := uploader.Put(chunks[i]) + _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i]) if err != nil { b.Fatal(err) } diff --git a/swarm/storage/localstore/schema.go b/swarm/storage/localstore/schema.go new file mode 100644 index 000000000..538c75d1f --- /dev/null +++ b/swarm/storage/localstore/schema.go @@ -0,0 +1,52 @@ +package localstore + +import ( + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +// The DB schema we want to use. The actual/current DB schema might differ +// until migrations are run. +const CurrentDbSchema = DbSchemaSanctuary + +// There was a time when we had no schema at all. +const DbSchemaNone = "" + +// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5 +const DbSchemaPurity = "purity" + +// "halloween" is here because we had a screw in the garbage collector index. +// Because of that we had to rebuild the GC index to get rid of erroneous +// entries and that takes a long time. This schema is used for bookkeeping, +// so rebuild index will run just once. +const DbSchemaHalloween = "halloween" + +const DbSchemaSanctuary = "sanctuary" + +// returns true if legacy database is in the datadir +func IsLegacyDatabase(datadir string) bool { + + var ( + legacyDbSchemaKey = []byte{8} + ) + + db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128}) + if err != nil { + log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err) + return false + } + defer db.Close() + + data, err := db.Get(legacyDbSchemaKey, nil) + if err != nil { + if err == leveldb.ErrNotFound { + // if we haven't found anything under the legacy db schema key- we are not on legacy + return false + } + + log.Error("got an unexpected error fetching legacy name from the database", "err", err) + } + log.Trace("checking if database scheme is legacy", "schema name", string(data)) + return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity +} diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go index 0b96102e3..dd07add53 100644 --- a/swarm/storage/localstore/subscription_pull.go +++ b/swarm/storage/localstore/subscription_pull.go @@ -17,28 +17,34 @@ package localstore import ( - "bytes" "context" "errors" - "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" ) // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index. // Pull syncing index can be only subscribed to a particular proximity order bin. If since -// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil, -// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be -// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop +// is not 0, the iteration will start from the first item stored after that id. If until is not 0, +// only chunks stored up to this id will be sent to the channel, and the returned channel will be +// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop // function will terminate current and further iterations without errors, and also close the returned channel. // Make sure that you check the second returned parameter from the channel to stop iteration when its value // is false. -func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) { - chunkDescriptors := make(chan ChunkDescriptor) +func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) { + metricName := "localstore.SubscribePull" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + + chunkDescriptors := make(chan chunk.Descriptor) trigger := make(chan struct{}, 1) db.pullTriggersMu.Lock() @@ -59,18 +65,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD var errStopSubscription = errors.New("stop subscription") go func() { - // close the returned ChunkDescriptor channel at the end to + defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1) + // close the returned chunk.Descriptor channel at the end to // signal that the subscription is done defer close(chunkDescriptors) // sinceItem is the Item from which the next iteration // should start. The first iteration starts from the first Item. var sinceItem *shed.Item - if since != nil { + if since > 0 { sinceItem = &shed.Item{ - Address: since.Address, - StoreTimestamp: since.StoreTimestamp, + Address: db.addressInBin(bin), + BinID: since, } } + first := true // first iteration flag for SkipStartFromItem for { select { case <-trigger: @@ -78,17 +86,23 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD // - last index Item is reached // - subscription stop is called // - context is done + metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1) + + ctx, sp := spancontext.StartSpan(ctx, metricName+".iter") + sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until)) + + iterStart := time.Now() + var count int err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) { select { - case chunkDescriptors <- ChunkDescriptor{ - Address: item.Address, - StoreTimestamp: item.StoreTimestamp, + case chunkDescriptors <- chunk.Descriptor{ + Address: item.Address, + BinID: item.BinID, }: + count++ // until chunk descriptor is sent // break the iteration - if until != nil && - (item.StoreTimestamp >= until.StoreTimestamp || - bytes.Equal(item.Address, until.Address)) { + if until > 0 && item.BinID >= until { return true, errStopSubscription } // set next iteration start item @@ -109,19 +123,34 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD }, &shed.IterateOptions{ StartFrom: sinceItem, // sinceItem was sent as the last Address in the previous - // iterator call, skip it in this one - SkipStartFromItem: true, + // iterator call, skip it in this one, but not the item with + // the provided since bin id as it should be sent to a channel + SkipStartFromItem: !first, Prefix: []byte{bin}, }) + + totalTimeMetric(metricName+".iter", iterStart) + + sp.FinishWithOptions(opentracing.FinishOptions{ + LogRecords: []opentracing.LogRecord{ + { + Timestamp: time.Now(), + Fields: []olog.Field{olog.Int("count", count)}, + }, + }, + }) + if err != nil { if err == errStopSubscription { // stop subscription without any errors // if until is reached return } + metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1) log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err) return } + first = false case <-stopChan: // terminate the subscription // on stop @@ -159,35 +188,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD return chunkDescriptors, stop } -// LastPullSubscriptionChunk returns ChunkDescriptor of the latest Chunk +// LastPullSubscriptionBinID returns chunk bin id of the latest Chunk // in pull syncing index for a provided bin. If there are no chunks in -// that bin, chunk.ErrChunkNotFound is returned. -func (db *DB) LastPullSubscriptionChunk(bin uint8) (c *ChunkDescriptor, err error) { +// that bin, 0 value is returned. +func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) { + metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1) + item, err := db.pullIndex.Last([]byte{bin}) if err != nil { if err == leveldb.ErrNotFound { - return nil, chunk.ErrChunkNotFound + return 0, nil } - return nil, err + return 0, err } - return &ChunkDescriptor{ - Address: item.Address, - StoreTimestamp: item.StoreTimestamp, - }, nil -} - -// ChunkDescriptor holds information required for Pull syncing. This struct -// is provided by subscribing to pull index. -type ChunkDescriptor struct { - Address chunk.Address - StoreTimestamp int64 -} - -func (c *ChunkDescriptor) String() string { - if c == nil { - return "none" - } - return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp) + return item.BinID, nil } // triggerPullSubscriptions is used internally for starting iterations @@ -209,3 +223,12 @@ func (db *DB) triggerPullSubscriptions(bin uint8) { } } } + +// addressInBin returns an address that is in a specific +// proximity order bin from database base key. +func (db *DB) addressInBin(bin uint8) (addr chunk.Address) { + addr = append([]byte(nil), db.baseKey...) + b := bin / 8 + addr[b] = addr[b] ^ (1 << (7 - bin%8)) + return addr +} diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go index d5ddae02b..bf364ed44 100644 --- a/swarm/storage/localstore/subscription_pull_test.go +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/shed" ) // TestDB_SubscribePull uploads some chunks before and after @@ -35,15 +36,13 @@ func TestDB_SubscribePull(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int // prepopulate database with some chunks // before the subscription - uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10) + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10) // set a timeout on subscription ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -54,22 +53,22 @@ func TestDB_SubscribePull(t *testing.T) { errChan := make(chan error) for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { - ch, stop := db.SubscribePull(ctx, bin, nil, nil) + ch, stop := db.SubscribePull(ctx, bin, 0, 0) defer stop() // receive and validate addresses from the subscription - go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan) } // upload some chunks just after subscribe - uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5) + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5) time.Sleep(200 * time.Millisecond) // upload some chunks after some short time // to ensure that subscription will include them // in a dynamic environment - uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3) + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3) checkErrChan(ctx, t, errChan, wantedChunksCount) } @@ -82,15 +81,13 @@ func TestDB_SubscribePull_multiple(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int // prepopulate database with some chunks // before the subscription - uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10) + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10) // set a timeout on subscription ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -106,23 +103,23 @@ func TestDB_SubscribePull_multiple(t *testing.T) { // that all of them will write every address error to errChan for j := 0; j < subsCount; j++ { for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { - ch, stop := db.SubscribePull(ctx, bin, nil, nil) + ch, stop := db.SubscribePull(ctx, bin, 0, 0) defer stop() // receive and validate addresses from the subscription - go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan) } } // upload some chunks just after subscribe - uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5) + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5) time.Sleep(200 * time.Millisecond) // upload some chunks after some short time // to ensure that subscription will include them // in a dynamic environment - uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3) + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3) checkErrChan(ctx, t, errChan, wantedChunksCount*subsCount) } @@ -135,61 +132,52 @@ func TestDB_SubscribePull_since(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int - lastTimestamp := time.Now().UTC().UnixNano() - var lastTimestampMu sync.RWMutex - defer setNow(func() (t int64) { - lastTimestampMu.Lock() - defer lastTimestampMu.Unlock() - lastTimestamp++ - return lastTimestamp - })() + binIDCounter := make(map[uint8]uint64) + var binIDCounterMu sync.RWMutex - uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + uploadRandomChunks := func(count int, wanted bool) (first map[uint8]uint64) { addrsMu.Lock() defer addrsMu.Unlock() - last = make(map[uint8]ChunkDescriptor) + first = make(map[uint8]uint64) for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := uploader.Put(ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } bin := db.po(ch.Address()) - if _, ok := addrs[bin]; !ok { - addrs[bin] = make([]chunk.Address, 0) - } + binIDCounterMu.RLock() + binIDCounter[bin]++ + binIDCounterMu.RUnlock() + if wanted { + if _, ok := addrs[bin]; !ok { + addrs[bin] = make([]chunk.Address, 0) + } addrs[bin] = append(addrs[bin], ch.Address()) wantedChunksCount++ - } - lastTimestampMu.RLock() - storeTimestamp := lastTimestamp - lastTimestampMu.RUnlock() - - last[bin] = ChunkDescriptor{ - Address: ch.Address(), - StoreTimestamp: storeTimestamp, + if _, ok := first[bin]; !ok { + first[bin] = binIDCounter[bin] + } } } - return last + return first } // prepopulate database with some chunks // before the subscription - last := uploadRandomChunks(30, false) + uploadRandomChunks(30, false) - uploadRandomChunks(25, true) + first := uploadRandomChunks(25, true) // set a timeout on subscription ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -200,21 +188,18 @@ func TestDB_SubscribePull_since(t *testing.T) { errChan := make(chan error) for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { - var since *ChunkDescriptor - if c, ok := last[bin]; ok { - since = &c + since, ok := first[bin] + if !ok { + continue } - ch, stop := db.SubscribePull(ctx, bin, since, nil) + ch, stop := db.SubscribePull(ctx, bin, since, 0) defer stop() // receive and validate addresses from the subscription - go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan) } - // upload some chunks just after subscribe - uploadRandomChunks(15, true) - checkErrChan(ctx, t, errChan, wantedChunksCount) } @@ -226,30 +211,22 @@ func TestDB_SubscribePull_until(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int - lastTimestamp := time.Now().UTC().UnixNano() - var lastTimestampMu sync.RWMutex - defer setNow(func() (t int64) { - lastTimestampMu.Lock() - defer lastTimestampMu.Unlock() - lastTimestamp++ - return lastTimestamp - })() + binIDCounter := make(map[uint8]uint64) + var binIDCounterMu sync.RWMutex - uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) { addrsMu.Lock() defer addrsMu.Unlock() - last = make(map[uint8]ChunkDescriptor) + last = make(map[uint8]uint64) for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := uploader.Put(ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -264,14 +241,11 @@ func TestDB_SubscribePull_until(t *testing.T) { wantedChunksCount++ } - lastTimestampMu.RLock() - storeTimestamp := lastTimestamp - lastTimestampMu.RUnlock() + binIDCounterMu.RLock() + binIDCounter[bin]++ + binIDCounterMu.RUnlock() - last[bin] = ChunkDescriptor{ - Address: ch.Address(), - StoreTimestamp: storeTimestamp, - } + last[bin] = binIDCounter[bin] } return last } @@ -295,11 +269,11 @@ func TestDB_SubscribePull_until(t *testing.T) { if !ok { continue } - ch, stop := db.SubscribePull(ctx, bin, nil, &until) + ch, stop := db.SubscribePull(ctx, bin, 0, until) defer stop() // receive and validate addresses from the subscription - go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan) } // upload some chunks just after subscribe @@ -316,30 +290,22 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int - lastTimestamp := time.Now().UTC().UnixNano() - var lastTimestampMu sync.RWMutex - defer setNow(func() (t int64) { - lastTimestampMu.Lock() - defer lastTimestampMu.Unlock() - lastTimestamp++ - return lastTimestamp - })() + binIDCounter := make(map[uint8]uint64) + var binIDCounterMu sync.RWMutex - uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) { addrsMu.Lock() defer addrsMu.Unlock() - last = make(map[uint8]ChunkDescriptor) + last = make(map[uint8]uint64) for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := uploader.Put(ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -354,14 +320,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { wantedChunksCount++ } - lastTimestampMu.RLock() - storeTimestamp := lastTimestamp - lastTimestampMu.RUnlock() + binIDCounterMu.RLock() + binIDCounter[bin]++ + binIDCounterMu.RUnlock() - last[bin] = ChunkDescriptor{ - Address: ch.Address(), - StoreTimestamp: storeTimestamp, - } + last[bin] = binIDCounter[bin] } return last } @@ -387,9 +350,10 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { errChan := make(chan error) for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { - var since *ChunkDescriptor - if c, ok := upload1[bin]; ok { - since = &c + since, ok := upload1[bin] + if ok { + // start from the next uploaded chunk + since++ } until, ok := upload2[bin] if !ok { @@ -397,11 +361,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { // skip this bin from testing continue } - ch, stop := db.SubscribePull(ctx, bin, since, &until) + ch, stop := db.SubscribePull(ctx, bin, since, until) defer stop() // receive and validate addresses from the subscription - go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan) } // upload some chunks just after subscribe @@ -412,14 +376,14 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { // uploadRandomChunksBin uploads random chunks to database and adds them to // the map of addresses ber bin. -func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) { +func uploadRandomChunksBin(t *testing.T, db *DB, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) { addrsMu.Lock() defer addrsMu.Unlock() for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := uploader.Put(ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -434,10 +398,10 @@ func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uin } } -// readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and -// sends error to errChan, even if it is nil, to count the number of ChunkDescriptors +// readPullSubscriptionBin is a helper function that reads all chunk.Descriptors from a channel and +// sends error to errChan, even if it is nil, to count the number of chunk.Descriptors // returned by the channel. -func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) { +func readPullSubscriptionBin(ctx context.Context, db *DB, bin uint8, ch <-chan chunk.Descriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) { var i int // address index for { select { @@ -450,9 +414,20 @@ func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDesc if i+1 > len(addrs[bin]) { err = fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin) } else { - want := addrs[bin][i] - if !bytes.Equal(got.Address, want) { - err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want) + addr := addrs[bin][i] + if !bytes.Equal(got.Address, addr) { + err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got.Address.Hex(), addr.Hex()) + } else { + want, err := db.retrievalDataIndex.Get(shed.Item{ + Address: addr, + }) + if err != nil { + err = fmt.Errorf("got chunk (bin id %v in bin %v) from retrieval index %s: %v", i, bin, addrs[bin][i].Hex(), err) + } else { + if got.BinID != want.BinID { + err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got, want) + } + } } } addrsMu.Unlock() @@ -486,27 +461,19 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC } } -// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk +// TestDB_LastPullSubscriptionBinID validates that LastPullSubscriptionBinID // is returning the last chunk descriptor for proximity order bins by // doing a few rounds of chunk uploads. -func TestDB_LastPullSubscriptionChunk(t *testing.T) { +func TestDB_LastPullSubscriptionBinID(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]chunk.Address) - lastTimestamp := time.Now().UTC().UnixNano() - var lastTimestampMu sync.RWMutex - defer setNow(func() (t int64) { - lastTimestampMu.Lock() - defer lastTimestampMu.Unlock() - lastTimestamp++ - return lastTimestamp - })() + binIDCounter := make(map[uint8]uint64) + var binIDCounterMu sync.RWMutex - last := make(map[uint8]ChunkDescriptor) + last := make(map[uint8]uint64) // do a few rounds of uploads and check if // last pull subscription chunk is correct @@ -516,7 +483,7 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := uploader.Put(ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -528,32 +495,42 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) { } addrs[bin] = append(addrs[bin], ch.Address()) - lastTimestampMu.RLock() - storeTimestamp := lastTimestamp - lastTimestampMu.RUnlock() + binIDCounterMu.RLock() + binIDCounter[bin]++ + binIDCounterMu.RUnlock() - last[bin] = ChunkDescriptor{ - Address: ch.Address(), - StoreTimestamp: storeTimestamp, - } + last[bin] = binIDCounter[bin] } // check for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { want, ok := last[bin] - got, err := db.LastPullSubscriptionChunk(bin) + got, err := db.LastPullSubscriptionBinID(bin) if ok { if err != nil { t.Errorf("got unexpected error value %v", err) } - if !bytes.Equal(got.Address, want.Address) { - t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex()) - } - } else { - if err != chunk.ErrChunkNotFound { - t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound) - } } + if got != want { + t.Errorf("got last bin id %v, want %v", got, want) + } + } + } +} + +// TestAddressInBin validates that function addressInBin +// returns a valid address for every proximity order bin. +func TestAddressInBin(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + for po := uint8(0); po < chunk.MaxPO; po++ { + addr := db.addressInBin(po) + + got := db.po(addr) + + if got != uint8(po) { + t.Errorf("got po %v, want %v", got, po) } } } diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go index 5cbc2eb6f..f2463af2a 100644 --- a/swarm/storage/localstore/subscription_push.go +++ b/swarm/storage/localstore/subscription_push.go @@ -19,10 +19,15 @@ package localstore import ( "context" "sync" + "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" ) // SubscribePush returns a channel that provides storage chunks with ordering from push syncing index. @@ -30,6 +35,9 @@ import ( // the returned channel without any errors. Make sure that you check the second returned parameter // from the channel to stop iteration when its value is false. func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) { + metricName := "localstore.SubscribePush" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + chunks := make(chan chunk.Chunk) trigger := make(chan struct{}, 1) @@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun var stopChanOnce sync.Once go func() { + defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1) // close the returned chunkInfo channel at the end to // signal that the subscription is done defer close(chunks) @@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun // - last index Item is reached // - subscription stop is called // - context is done + metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1) + + ctx, sp := spancontext.StartSpan(ctx, metricName+".iter") + + iterStart := time.Now() + var count int err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) { // get chunk data dataItem, err := db.retrievalDataIndex.Get(item) @@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun select { case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data): + count++ // set next iteration start item // when its chunk is successfully sent to channel sinceItem = &item @@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun // iterator call, skip it in this one SkipStartFromItem: true, }) + + totalTimeMetric(metricName+".iter", iterStart) + + sp.FinishWithOptions(opentracing.FinishOptions{ + LogRecords: []opentracing.LogRecord{ + { + Timestamp: time.Now(), + Fields: []olog.Field{olog.Int("count", count)}, + }, + }, + }) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1) log.Error("localstore push subscription iteration", "err", err) return } diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go index 30fb98eb2..6124a534b 100644 --- a/swarm/storage/localstore/subscription_push_test.go +++ b/swarm/storage/localstore/subscription_push_test.go @@ -34,8 +34,6 @@ func TestDB_SubscribePush(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - chunks := make([]chunk.Chunk, 0) var chunksMu sync.Mutex @@ -44,14 +42,14 @@ func TestDB_SubscribePush(t *testing.T) { defer chunksMu.Unlock() for i := 0; i < count; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - chunks = append(chunks, chunk) + chunks = append(chunks, ch) } } @@ -122,8 +120,6 @@ func TestDB_SubscribePush_multiple(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - uploader := db.NewPutter(ModePutUpload) - addrs := make([]chunk.Address, 0) var addrsMu sync.Mutex @@ -132,14 +128,14 @@ func TestDB_SubscribePush_multiple(t *testing.T) { defer addrsMu.Unlock() for i := 0; i < count; i++ { - chunk := generateTestRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } - addrs = append(addrs, chunk.Address()) + addrs = append(addrs, ch.Address()) } } |