aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2019-05-10 19:09:01 +0800
committerGitHub <noreply@github.com>2019-05-10 19:09:01 +0800
commit494f5d448a1685d5de4cb1524b863cd1fc9a13b0 (patch)
tree4db9d1afe4910c888f3488cd93e8537501d88314 /swarm/storage/localstore
parentc94d582aa781b26412ba7d570f6707d193303a02 (diff)
parent9b1543c282f39d452f611eeee0307bdf828e8bc2 (diff)
downloadgo-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.gz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.bz2
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.lz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.xz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.zst
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.zip
Merge pull request #19550 from ethersphere/swarm-rather-stable
swarm v0.4-rc1
Diffstat (limited to 'swarm/storage/localstore')
-rw-r--r--swarm/storage/localstore/export.go204
-rw-r--r--swarm/storage/localstore/export_test.go80
-rw-r--r--swarm/storage/localstore/gc.go19
-rw-r--r--swarm/storage/localstore/gc_test.go48
-rw-r--r--swarm/storage/localstore/index_test.go49
-rw-r--r--swarm/storage/localstore/localstore.go80
-rw-r--r--swarm/storage/localstore/localstore_test.go55
-rw-r--r--swarm/storage/localstore/mode_get.go63
-rw-r--r--swarm/storage/localstore/mode_get_test.go67
-rw-r--r--swarm/storage/localstore/mode_has.go28
-rw-r--r--swarm/storage/localstore/mode_has_test.go13
-rw-r--r--swarm/storage/localstore/mode_put.go118
-rw-r--r--swarm/storage/localstore/mode_put_test.go116
-rw-r--r--swarm/storage/localstore/mode_set.go63
-rw-r--r--swarm/storage/localstore/mode_set_test.go35
-rw-r--r--swarm/storage/localstore/retrieval_index_test.go17
-rw-r--r--swarm/storage/localstore/schema.go52
-rw-r--r--swarm/storage/localstore/subscription_pull.go107
-rw-r--r--swarm/storage/localstore/subscription_pull_test.go243
-rw-r--r--swarm/storage/localstore/subscription_push.go29
-rw-r--r--swarm/storage/localstore/subscription_push_test.go16
21 files changed, 986 insertions, 516 deletions
diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go
new file mode 100644
index 000000000..411392b4e
--- /dev/null
+++ b/swarm/storage/localstore/export.go
@@ -0,0 +1,204 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "archive/tar"
+ "context"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+)
+
+const (
+ // filename in tar archive that holds the information
+ // about exported data format version
+ exportVersionFilename = ".swarm-export-version"
+ // legacy version for previous LDBStore
+ legacyExportVersion = "1"
+ // current export format version
+ currentExportVersion = "2"
+)
+
+// Export writes a tar structured data to the writer of
+// all chunks in the retrieval data index. It returns the
+// number of chunks exported.
+func (db *DB) Export(w io.Writer) (count int64, err error) {
+ tw := tar.NewWriter(w)
+ defer tw.Close()
+
+ if err := tw.WriteHeader(&tar.Header{
+ Name: exportVersionFilename,
+ Mode: 0644,
+ Size: int64(len(currentExportVersion)),
+ }); err != nil {
+ return 0, err
+ }
+ if _, err := tw.Write([]byte(currentExportVersion)); err != nil {
+ return 0, err
+ }
+
+ err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ hdr := &tar.Header{
+ Name: hex.EncodeToString(item.Address),
+ Mode: 0644,
+ Size: int64(len(item.Data)),
+ }
+ if err := tw.WriteHeader(hdr); err != nil {
+ return false, err
+ }
+ if _, err := tw.Write(item.Data); err != nil {
+ return false, err
+ }
+ count++
+ return false, nil
+ }, nil)
+
+ return count, err
+}
+
+// Import reads a tar structured data from the reader and
+// stores chunks in the database. It returns the number of
+// chunks imported.
+func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
+ tr := tar.NewReader(r)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ errC := make(chan error)
+ doneC := make(chan struct{})
+ tokenPool := make(chan struct{}, 100)
+ var wg sync.WaitGroup
+ go func() {
+ var (
+ firstFile = true
+ // if exportVersionFilename file is not present
+ // assume legacy version
+ version = legacyExportVersion
+ )
+ for {
+ hdr, err := tr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ if firstFile {
+ firstFile = false
+ if hdr.Name == exportVersionFilename {
+ data, err := ioutil.ReadAll(tr)
+ if err != nil {
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ version = string(data)
+ continue
+ }
+ }
+
+ if len(hdr.Name) != 64 {
+ log.Warn("ignoring non-chunk file", "name", hdr.Name)
+ continue
+ }
+
+ keybytes, err := hex.DecodeString(hdr.Name)
+ if err != nil {
+ log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
+ continue
+ }
+
+ data, err := ioutil.ReadAll(tr)
+ if err != nil {
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ key := chunk.Address(keybytes)
+
+ var ch chunk.Chunk
+ switch version {
+ case legacyExportVersion:
+ // LDBStore Export exported chunk data prefixed with the chunk key.
+ // That is not necessary, as the key is in the chunk filename,
+ // but backward compatibility needs to be preserved.
+ ch = chunk.NewChunk(key, data[32:])
+ case currentExportVersion:
+ ch = chunk.NewChunk(key, data)
+ default:
+ select {
+ case errC <- fmt.Errorf("unsupported export data version %q", version):
+ case <-ctx.Done():
+ }
+ }
+ tokenPool <- struct{}{}
+ wg.Add(1)
+
+ go func() {
+ _, err := db.Put(ctx, chunk.ModePutUpload, ch)
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ wg.Done()
+ <-tokenPool
+ default:
+ _, err := db.Put(ctx, chunk.ModePutUpload, ch)
+ if err != nil {
+ errC <- err
+ }
+ wg.Done()
+ <-tokenPool
+ }
+ }()
+
+ count++
+ }
+ wg.Wait()
+ close(doneC)
+ }()
+
+ // wait for all chunks to be stored
+ for {
+ select {
+ case err := <-errC:
+ if err != nil {
+ return count, err
+ }
+ case <-ctx.Done():
+ return count, ctx.Err()
+ default:
+ select {
+ case <-doneC:
+ return count, nil
+ default:
+ }
+ }
+ }
+}
diff --git a/swarm/storage/localstore/export_test.go b/swarm/storage/localstore/export_test.go
new file mode 100644
index 000000000..d7f848f80
--- /dev/null
+++ b/swarm/storage/localstore/export_test.go
@@ -0,0 +1,80 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "context"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+)
+
+// TestExportImport constructs two databases, one to put and export
+// chunks and another one to import and validate that all chunks are
+// imported.
+func TestExportImport(t *testing.T) {
+ db1, cleanup1 := newTestDB(t, nil)
+ defer cleanup1()
+
+ var chunkCount = 100
+
+ chunks := make(map[string][]byte, chunkCount)
+ for i := 0; i < chunkCount; i++ {
+ ch := generateTestRandomChunk()
+
+ _, err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ chunks[string(ch.Address())] = ch.Data()
+ }
+
+ var buf bytes.Buffer
+
+ c, err := db1.Export(&buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantChunksCount := int64(len(chunks))
+ if c != wantChunksCount {
+ t.Errorf("got export count %v, want %v", c, wantChunksCount)
+ }
+
+ db2, cleanup2 := newTestDB(t, nil)
+ defer cleanup2()
+
+ c, err = db2.Import(&buf, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != wantChunksCount {
+ t.Errorf("got import count %v, want %v", c, wantChunksCount)
+ }
+
+ for a, want := range chunks {
+ addr := chunk.Address([]byte(a))
+ ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got := ch.Data()
+ if !bytes.Equal(got, want) {
+ t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want)
+ }
+ }
+}
diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go
index 84c4f596d..748e0d663 100644
--- a/swarm/storage/localstore/gc.go
+++ b/swarm/storage/localstore/gc.go
@@ -17,7 +17,10 @@
package localstore
import (
+ "time"
+
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
+ metricName := "localstore.gc"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
+ defer func() {
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+ }
+ }()
+
batch := new(leveldb.Batch)
target := db.gcTarget()
@@ -86,12 +98,17 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, true, err
}
+ metrics.GetOrRegisterGauge(metricName+".gcsize", nil).Update(int64(gcSize))
done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if gcSize-collectedCount <= target {
return true, nil
}
+
+ metrics.GetOrRegisterGauge(metricName+".storets", nil).Update(item.StoreTimestamp)
+ metrics.GetOrRegisterGauge(metricName+".accessts", nil).Update(item.AccessTimestamp)
+
// delete from retrieve, pull, gc
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
@@ -109,11 +126,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, false, err
}
+ metrics.GetOrRegisterCounter(metricName+".collected-count", nil).Inc(int64(collectedCount))
db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch)
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".writebatch.err", nil).Inc(1)
return 0, false, err
}
return collectedCount, done, nil
diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go
index 081e0af80..4a6e0a5f4 100644
--- a/swarm/storage/localstore/gc_test.go
+++ b/swarm/storage/localstore/gc_test.go
@@ -17,6 +17,7 @@
package localstore
import (
+ "context"
"io/ioutil"
"math/rand"
"os"
@@ -63,26 +64,23 @@ func testDB_collectGarbageWorker(t *testing.T) {
})()
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
-
addrs := make([]chunk.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- addrs = append(addrs, chunk.Address())
+ addrs = append(addrs, ch.Address())
}
gcTarget := db.gcTarget()
@@ -110,7 +108,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
@@ -118,7 +116,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
@@ -134,9 +132,6 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
})
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
-
testHookCollectGarbageChan := make(chan uint64)
defer setTestHookCollectGarbage(func(collectedCount uint64) {
testHookCollectGarbageChan <- collectedCount
@@ -146,19 +141,19 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload random chunks just up to the capacity
for i := 0; i < int(db.capacity)-1; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- addrs = append(addrs, chunk.Address())
+ addrs = append(addrs, ch.Address())
}
// set update gc test hook to signal when
@@ -172,7 +167,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// request the latest synced chunk
// to prioritize it in the gc index
// not to be collected
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
@@ -191,11 +186,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload and sync another chunk to trigger
// garbage collection
ch := generateTestRandomChunk()
- err = uploader.Put(ch)
+ _, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(ch.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -235,7 +230,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// requested chunk should not be removed
t.Run("get requested chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
@@ -243,7 +238,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// the second synced chunk should be removed
t.Run("get gc-ed chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[1])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[1])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
@@ -251,7 +246,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
@@ -275,20 +270,17 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err)
}
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
-
count := 100
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go
index cf19e4f6c..0f23aa10a 100644
--- a/swarm/storage/localstore/index_test.go
+++ b/swarm/storage/localstore/index_test.go
@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
+ "context"
"math/rand"
"testing"
@@ -35,29 +36,22 @@ func TestDB_pullIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
- Chunk: chunk,
- // this timestamp is not the same as in
- // the index, but given that uploads
- // are sequential and that only ordering
- // of events matter, this information is
- // sufficient
- storeTimestamp: now(),
+ Chunk: ch,
+ binID: uint64(i),
}
}
@@ -70,10 +64,10 @@ func TestDB_pullIndex(t *testing.T) {
if poi > poj {
return false
}
- if chunks[i].storeTimestamp < chunks[j].storeTimestamp {
+ if chunks[i].binID < chunks[j].binID {
return true
}
- if chunks[i].storeTimestamp > chunks[j].storeTimestamp {
+ if chunks[i].binID > chunks[j].binID {
return false
}
return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
@@ -87,23 +81,21 @@ func TestDB_gcIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
- Chunk: chunk,
+ Chunk: ch,
}
}
@@ -123,9 +115,9 @@ func TestDB_gcIndex(t *testing.T) {
})()
t.Run("request unsynced", func(t *testing.T) {
- chunk := chunks[1]
+ ch := chunks[1]
- _, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -140,9 +132,9 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("sync one chunk", func(t *testing.T) {
- chunk := chunks[0]
+ ch := chunks[0]
- err := db.NewSetter(ModeSetSync).Set(chunk.Address())
+ err := db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -154,10 +146,8 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("sync all chunks", func(t *testing.T) {
- setter := db.NewSetter(ModeSetSync)
-
for i := range chunks {
- err := setter.Set(chunks[i].Address())
+ err := db.Set(context.Background(), chunk.ModeSetSync, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
@@ -171,7 +161,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request one chunk", func(t *testing.T) {
i := 6
- _, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address())
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
@@ -189,14 +179,13 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("random chunk request", func(t *testing.T) {
- requester := db.NewGetter(ModeGetRequest)
rand.Shuffle(len(chunks), func(i, j int) {
chunks[i], chunks[j] = chunks[j], chunks[i]
})
- for _, chunk := range chunks {
- _, err := requester.Get(chunk.Address())
+ for _, ch := range chunks {
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -212,7 +201,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("remove one chunk", func(t *testing.T) {
i := 3
- err := db.NewSetter(modeSetRemove).Set(chunks[i].Address())
+ err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
index 98d4c7881..3b0bd8a93 100644
--- a/swarm/storage/localstore/localstore.go
+++ b/swarm/storage/localstore/localstore.go
@@ -23,11 +23,15 @@ import (
"time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
)
+// DB implements chunk.Store.
+var _ chunk.Store = &DB{}
+
var (
// ErrInvalidMode is retuned when an unknown Mode
// is provided to the function.
@@ -69,6 +73,10 @@ type DB struct {
pullTriggers map[uint8][]chan struct{}
pullTriggersMu sync.RWMutex
+ // binIDs stores the latest chunk serial ID for every
+ // proximity order bin
+ binIDs shed.Uint64Vector
+
// garbage collection index
gcIndex shed.Index
@@ -124,7 +132,10 @@ type Options struct {
// One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if o == nil {
- o = new(Options)
+ // default options
+ o = &Options{
+ Capacity: 5000000,
+ }
}
db = &DB{
capacity: o.Capacity,
@@ -148,11 +159,23 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil {
return nil, err
}
+
// Identify current storage schema by arbitrary name.
db.schemaName, err = db.shed.NewStringField("schema-name")
if err != nil {
return nil, err
}
+ schemaName, err := db.schemaName.Get()
+ if err != nil {
+ return nil, err
+ }
+ if schemaName == "" {
+ // initial new localstore run
+ err := db.schemaName.Put(DbSchemaSanctuary)
+ if err != nil {
+ return nil, err
+ }
+ }
// Persist gc size.
db.gcSize, err = db.shed.NewUint64Field("gc-size")
if err != nil {
@@ -165,8 +188,9 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
)
if o.MockStore != nil {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, 8)
- binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ b := make([]byte, 16)
+ binary.BigEndian.PutUint64(b[:8], fields.BinID)
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
err = o.MockStore.Put(fields.Address, fields.Data)
if err != nil {
return nil, err
@@ -174,25 +198,28 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return b, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
+ e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data, err = o.MockStore.Get(keyItem.Address)
return e, err
}
} else {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, 8)
- binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ b := make([]byte, 16)
+ binary.BigEndian.PutUint64(b[:8], fields.BinID)
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
- e.Data = value[8:]
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
+ e.BinID = binary.BigEndian.Uint64(value[:8])
+ e.Data = value[16:]
return e, nil
}
}
- // Index storing actual chunk address, data and store timestamp.
- db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
+ // Index storing actual chunk address, data and bin id.
+ db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
@@ -230,33 +257,37 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return nil, err
}
// pull index allows history and live syncing per po bin
- db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(fields.Address)
- binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp))
- copy(key[9:], fields.Address[:])
+ binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
- e.Address = key[9:]
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9]))
+ e.BinID = binary.BigEndian.Uint64(key[1:9])
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
- return nil, nil
+ return fields.Address, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.Address = value
return e, nil
},
})
if err != nil {
return nil, err
}
+ // create a vector for bin IDs
+ db.binIDs, err = db.shed.NewUint64Vector("bin-ids")
+ if err != nil {
+ return nil, err
+ }
// create a pull syncing triggers used by SubscribePull function
db.pullTriggers = make(map[uint8][]chan struct{})
// push index contains as yet unsynced chunks
- db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
@@ -281,17 +312,17 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
- db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
- binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
+ binary.BigEndian.PutUint64(b[8:16], fields.BinID)
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
+ e.BinID = binary.BigEndian.Uint64(key[8:16])
e.Address = key[16:]
return e, nil
},
@@ -358,3 +389,12 @@ func init() {
return time.Now().UTC().UnixNano()
}
}
+
+// totalTimeMetric logs a message about time between provided start time
+// and the time when the function is called and sends a resetting timer metric
+// with provided name appended with ".total-time".
+func totalTimeMetric(name string, start time.Time) {
+ totalTime := time.Since(start)
+ log.Trace(name+" total time", "time", totalTime)
+ metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
+}
diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go
index 42e762587..6dbc4b7ad 100644
--- a/swarm/storage/localstore/localstore_test.go
+++ b/swarm/storage/localstore/localstore_test.go
@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
+ "context"
"fmt"
"io/ioutil"
"math/rand"
@@ -59,23 +60,23 @@ func TestDB(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- got, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got data %x, want %x", got.Data(), ch.Data())
}
}
@@ -113,19 +114,17 @@ func TestDB_updateGCSem(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- getter := db.NewGetter(ModeGetRequest)
-
// get more chunks then maxParallelUpdateGC
// in time shorter then updateGCSleep
for i := 0; i < 5; i++ {
- _, err = getter.Get(chunk.Address())
+ _, err = db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -237,71 +236,71 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
// newRetrieveIndexesTestWithAccess returns a test function that validates if the right
// chunk values are in the retrieval indexes when access time must be stored.
-func newRetrieveIndexesTestWithAccess(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
- item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0)
if accessTimestamp > 0 {
- item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
+ validateItem(t, item, ch.Address(), nil, 0, accessTimestamp)
}
}
}
// newPullIndexTest returns a test function that validates if the right
// chunk values are in the pull index.
-func newPullIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.pullIndex.Get(shed.Item{
- Address: chunk.Address(),
- StoreTimestamp: storeTimestamp,
+ Address: ch.Address(),
+ BinID: binID,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
- validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), nil, 0, 0)
}
}
}
// newPushIndexTest returns a test function that validates if the right
// chunk values are in the push index.
-func newPushIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.pushIndex.Get(shed.Item{
- Address: chunk.Address(),
+ Address: ch.Address(),
StoreTimestamp: storeTimestamp,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
- validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), nil, storeTimestamp, 0)
}
}
}
// newGCIndexTest returns a test function that validates if the right
// chunk values are in the push index.
-func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.gcIndex.Get(shed.Item{
Address: chunk.Address(),
- StoreTimestamp: storeTimestamp,
+ BinID: binID,
AccessTimestamp: accessTimestamp,
})
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), nil, storeTimestamp, accessTimestamp)
+ validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
}
}
@@ -349,7 +348,7 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {
// in database. It is used for index values validations.
type testIndexChunk struct {
chunk.Chunk
- storeTimestamp int64
+ binID uint64
}
// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go
index a6353e141..efef82858 100644
--- a/swarm/storage/localstore/mode_get.go
+++ b/swarm/storage/localstore/mode_get.go
@@ -17,45 +17,35 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModeGet enumerates different Getter modes.
-type ModeGet int
-
-// Getter modes.
-const (
- // ModeGetRequest: when accessed for retrieval
- ModeGetRequest ModeGet = iota
- // ModeGetSync: when accessed for syncing or proof of custody request
- ModeGetSync
-)
-
-// Getter provides Get method to retrieve Chunks
-// from database.
-type Getter struct {
- db *DB
- mode ModeGet
-}
-
-// NewGetter returns a new Getter on database
-// with a specific Mode.
-func (db *DB) NewGetter(mode ModeGet) *Getter {
- return &Getter{
- mode: mode,
- db: db,
- }
-}
-
// Get returns a chunk from the database. If the chunk is
// not found chunk.ErrChunkNotFound will be returned.
// All required indexes will be updated required by the
-// Getter Mode.
-func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
- out, err := g.db.get(g.mode, addr)
+// Getter Mode. Get is required to implement chunk.Store
+// interface.
+func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
+ metricName := fmt.Sprintf("localstore.Get.%s", mode)
+
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
+
+ defer func() {
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+ }
+ }()
+
+ out, err := db.get(mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
@@ -67,7 +57,7 @@ func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
// get returns Item from the retrieval index
// and updates other indexes.
-func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
+func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err error) {
item := addressToItem(addr)
out, err = db.retrievalDataIndex.Get(item)
@@ -76,7 +66,7 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
}
switch mode {
// update the access timestamp and gc index
- case ModeGetRequest:
+ case chunk.ModeGetRequest:
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
@@ -90,8 +80,14 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
// for a new goroutine
defer func() { <-db.updateGCSem }()
}
+
+ metricName := "localstore.updateGC"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
+
err := db.updateGC(out)
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
@@ -101,7 +97,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
}()
// no updates to indexes
- case ModeGetSync:
+ case chunk.ModeGetSync:
+ case chunk.ModeGetLookup:
default:
return out, ErrInvalidMode
}
diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go
index 28a70ee0c..217fa5d2d 100644
--- a/swarm/storage/localstore/mode_get_test.go
+++ b/swarm/storage/localstore/mode_get_test.go
@@ -18,8 +18,11 @@ package localstore
import (
"bytes"
+ "context"
"testing"
"time"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
)
// TestModeGetRequest validates ModeGetRequest index values on the provided DB.
@@ -32,15 +35,13 @@ func TestModeGetRequest(t *testing.T) {
return uploadTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- requester := db.NewGetter(ModeGetRequest)
-
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
@@ -52,22 +53,22 @@ func TestModeGetRequest(t *testing.T) {
})()
t.Run("get unsynced", func(t *testing.T) {
- got, err := requester.Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
@@ -75,30 +76,30 @@ func TestModeGetRequest(t *testing.T) {
})
// set chunk to synced state
- err = db.NewSetter(ModeSetSync).Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
t.Run("first get", func(t *testing.T) {
- got, err := requester.Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, uploadTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp))
- t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, uploadTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -111,24 +112,24 @@ func TestModeGetRequest(t *testing.T) {
return accessTimestamp
})()
- got, err := requester.Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, accessTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, accessTimestamp))
- t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, accessTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -146,27 +147,27 @@ func TestModeGetSync(t *testing.T) {
return uploadTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- got, err := db.NewGetter(ModeGetSync).Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
diff --git a/swarm/storage/localstore/mode_has.go b/swarm/storage/localstore/mode_has.go
index 90feaceef..a70ee31b2 100644
--- a/swarm/storage/localstore/mode_has.go
+++ b/swarm/storage/localstore/mode_has.go
@@ -17,23 +17,23 @@
package localstore
import (
+ "context"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
)
-// Hasser provides Has method to retrieve Chunks
-// from database.
-type Hasser struct {
- db *DB
-}
+// Has returns true if the chunk is stored in database.
+func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
+ metricName := "localstore.Has"
-// NewHasser returns a new Hasser on database.
-func (db *DB) NewHasser() *Hasser {
- return &Hasser{
- db: db,
- }
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// Has returns true if the chunk is stored in database.
-func (h *Hasser) Has(addr chunk.Address) (bool, error) {
- return h.db.retrievalDataIndex.Has(addressToItem(addr))
+ has, err := db.retrievalDataIndex.Has(addressToItem(addr))
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+ }
+ return has, err
}
diff --git a/swarm/storage/localstore/mode_has_test.go b/swarm/storage/localstore/mode_has_test.go
index 332616ca2..043b21a2b 100644
--- a/swarm/storage/localstore/mode_has_test.go
+++ b/swarm/storage/localstore/mode_has_test.go
@@ -17,7 +17,10 @@
package localstore
import (
+ "context"
"testing"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
)
// TestHas validates that Hasser is returning true for
@@ -26,16 +29,14 @@ func TestHas(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- hasser := db.NewHasser()
-
- has, err := hasser.Has(chunk.Address())
+ has, err := db.Has(context.Background(), ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -45,7 +46,7 @@ func TestHas(t *testing.T) {
missingChunk := generateTestRandomChunk()
- has, err = hasser.Has(missingChunk.Address())
+ has, err = db.Has(context.Background(), missingChunk.Address())
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go
index 1599ca8e3..a8e355ad0 100644
--- a/swarm/storage/localstore/mode_put.go
+++ b/swarm/storage/localstore/mode_put.go
@@ -17,44 +17,31 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModePut enumerates different Putter modes.
-type ModePut int
-
-// Putter modes.
-const (
- // ModePutRequest: when a chunk is received as a result of retrieve request and delivery
- ModePutRequest ModePut = iota
- // ModePutSync: when a chunk is received via syncing
- ModePutSync
- // ModePutUpload: when a chunk is created by local upload
- ModePutUpload
-)
+// Put stores the Chunk to database and depending
+// on the Putter mode, it updates required indexes.
+// Put is required to implement chunk.Store
+// interface.
+func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
+ metricName := fmt.Sprintf("localstore.Put.%s", mode)
-// Putter provides Put method to store Chunks
-// to database.
-type Putter struct {
- db *DB
- mode ModePut
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// NewPutter returns a new Putter on database
-// with a specific Mode.
-func (db *DB) NewPutter(mode ModePut) *Putter {
- return &Putter{
- mode: mode,
- db: db,
+ exists, err = db.put(mode, chunkToItem(ch))
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
-}
-
-// Put stores the Chunk to database and depending
-// on the Putter mode, it updates required indexes.
-func (p *Putter) Put(ch chunk.Chunk) (err error) {
- return p.db.put(p.mode, chunkToItem(ch))
+ return exists, err
}
// put stores Item to database and updates other
@@ -62,7 +49,7 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) {
// of this function for the same address in parallel.
// Item fields Address and Data must not be
// with their nil values.
-func (db *DB) put(mode ModePut, item shed.Item) (err error) {
+func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
@@ -76,7 +63,7 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
var triggerPushFeed bool // signal push feed subscriptions to iterate
switch mode {
- case ModePutRequest:
+ case chunk.ModePutRequest:
// put to indexes: retrieve, gc; it does not enter the syncpool
// check if the chunk already is in the database
@@ -84,20 +71,25 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
+ exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
+ exists = false
// no chunk accesses
default:
- return err
+ return false, err
}
i, err = db.retrievalDataIndex.Get(item)
switch err {
case nil:
+ exists = true
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
case leveldb.ErrNotFound:
// no chunk accesses
+ exists = false
default:
- return err
+ return false, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
@@ -107,6 +99,12 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
+ if item.BinID == 0 {
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ }
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
@@ -117,36 +115,56 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
db.retrievalDataIndex.PutInBatch(batch, item)
- case ModePutUpload:
+ case chunk.ModePutUpload:
// put to indexes: retrieve, push, pull
- item.StoreTimestamp = now()
- db.retrievalDataIndex.PutInBatch(batch, item)
- db.pullIndex.PutInBatch(batch, item)
- triggerPullFeed = true
- db.pushIndex.PutInBatch(batch, item)
- triggerPushFeed = true
+ exists, err = db.retrievalDataIndex.Has(item)
+ if err != nil {
+ return false, err
+ }
+ if !exists {
+ item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ db.pushIndex.PutInBatch(batch, item)
+ triggerPushFeed = true
+ }
- case ModePutSync:
+ case chunk.ModePutSync:
// put to indexes: retrieve, pull
- item.StoreTimestamp = now()
- db.retrievalDataIndex.PutInBatch(batch, item)
- db.pullIndex.PutInBatch(batch, item)
- triggerPullFeed = true
+ exists, err = db.retrievalDataIndex.Has(item)
+ if err != nil {
+ return exists, err
+ }
+ if !exists {
+ item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ }
default:
- return ErrInvalidMode
+ return false, ErrInvalidMode
}
err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
- return err
+ return false, err
}
err = db.shed.WriteBatch(batch)
if err != nil {
- return err
+ return false, err
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
@@ -154,5 +172,5 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
if triggerPushFeed {
db.triggerPushSubscriptions()
}
- return nil
+ return exists, nil
}
diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go
index 8ecae1d2e..5376aa8b3 100644
--- a/swarm/storage/localstore/mode_put_test.go
+++ b/swarm/storage/localstore/mode_put_test.go
@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
+ "context"
"fmt"
"sync"
"testing"
@@ -31,9 +32,7 @@ func TestModePutRequest(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- putter := db.NewPutter(ModePutRequest)
-
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
// keep the record when the chunk is stored
var storeTimestamp int64
@@ -46,12 +45,12 @@ func TestModePutRequest(t *testing.T) {
storeTimestamp = wantTimestamp
- err := putter.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutRequest, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -64,12 +63,12 @@ func TestModePutRequest(t *testing.T) {
return wantTimestamp
})()
- err := putter.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutRequest, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, storeTimestamp, wantTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, storeTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -87,16 +86,16 @@ func TestModePutSync(t *testing.T) {
return wantTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutSync).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0))
- t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
}
// TestModePutUpload validates ModePutUpload index values on the provided DB.
@@ -109,18 +108,18 @@ func TestModePutUpload(t *testing.T) {
return wantTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0))
- t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
- t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("push index", newPushIndexTest(db, ch, wantTimestamp, nil))
}
// TestModePutUpload_parallel uploads chunks in parallel
@@ -140,14 +139,13 @@ func TestModePutUpload_parallel(t *testing.T) {
// start uploader workers
for i := 0; i < workerCount; i++ {
go func(i int) {
- uploader := db.NewPutter(ModePutUpload)
for {
select {
- case chunk, ok := <-chunkChan:
+ case ch, ok := <-chunkChan:
if !ok {
return
}
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
select {
case errChan <- err:
case <-doneChan:
@@ -188,21 +186,85 @@ func TestModePutUpload_parallel(t *testing.T) {
}
// get every chunk and validate its data
- getter := db.NewGetter(ModeGetRequest)
-
chunksMu.Lock()
defer chunksMu.Unlock()
- for _, chunk := range chunks {
- got, err := getter.Get(chunk.Address())
+ for _, ch := range chunks {
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Fatalf("got chunk %s data %x, want %x", chunk.Address().Hex(), got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Fatalf("got chunk %s data %x, want %x", ch.Address().Hex(), got.Data(), ch.Data())
}
}
}
+// TestModePut_sameChunk puts the same chunk multiple times
+// and validates that all relevant indexes have only one item
+// in them.
+func TestModePut_sameChunk(t *testing.T) {
+ ch := generateTestRandomChunk()
+
+ for _, tc := range []struct {
+ name string
+ mode chunk.ModePut
+ pullIndex bool
+ pushIndex bool
+ }{
+ {
+ name: "ModePutRequest",
+ mode: chunk.ModePutRequest,
+ pullIndex: false,
+ pushIndex: false,
+ },
+ {
+ name: "ModePutUpload",
+ mode: chunk.ModePutUpload,
+ pullIndex: true,
+ pushIndex: true,
+ },
+ {
+ name: "ModePutSync",
+ mode: chunk.ModePutSync,
+ pullIndex: true,
+ pushIndex: false,
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ for i := 0; i < 10; i++ {
+ exists, err := db.Put(context.Background(), tc.mode, ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ switch exists {
+ case false:
+ if i != 0 {
+ t.Fatal("should not exist only on first Put")
+ }
+ case true:
+ if i == 0 {
+ t.Fatal("should exist on all cases other than the first one")
+ }
+ }
+
+ count := func(b bool) (c int) {
+ if b {
+ return 1
+ }
+ return 0
+ }
+
+ newItemsCountTest(db.retrievalDataIndex, 1)(t)
+ newItemsCountTest(db.pullIndex, count(tc.pullIndex))(t)
+ newItemsCountTest(db.pushIndex, count(tc.pushIndex))(t)
+ }
+ })
+ }
+}
+
// BenchmarkPutUpload runs a series of benchmarks that upload
// a specific number of chunks in parallel.
//
@@ -270,7 +332,6 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int)
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
chunks := make([]chunk.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = generateTestRandomChunk()
@@ -286,7 +347,8 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int)
go func(i int) {
defer func() { <-sem }()
- errs <- uploader.Put(chunks[i])
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i])
+ errs <- err
}(i)
}
}()
diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go
index 83fcbea52..14b48a22e 100644
--- a/swarm/storage/localstore/mode_set.go
+++ b/swarm/storage/localstore/mode_set.go
@@ -17,51 +17,37 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModeSet enumerates different Setter modes.
-type ModeSet int
-
-// Setter modes.
-const (
- // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
- ModeSetAccess ModeSet = iota
- // ModeSetSync: when push sync receipt is received
- ModeSetSync
- // modeSetRemove: when GC-d
- // unexported as no external packages should remove chunks from database
- modeSetRemove
-)
+// Set updates database indexes for a specific
+// chunk represented by the address.
+// Set is required to implement chunk.Store
+// interface.
+func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
+ metricName := fmt.Sprintf("localstore.Set.%s", mode)
-// Setter sets the state of a particular
-// Chunk in database by changing indexes.
-type Setter struct {
- db *DB
- mode ModeSet
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// NewSetter returns a new Setter on database
-// with a specific Mode.
-func (db *DB) NewSetter(mode ModeSet) *Setter {
- return &Setter{
- mode: mode,
- db: db,
+ err = db.set(mode, addr)
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
-}
-
-// Set updates database indexes for a specific
-// chunk represented by the address.
-func (s *Setter) Set(addr chunk.Address) (err error) {
- return s.db.set(s.mode, addr)
+ return err
}
// set updates database indexes for a specific
// chunk represented by the address.
// It acquires lockAddr to protect two calls
// of this function for the same address in parallel.
-func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
+func (db *DB) set(mode chunk.ModeSet, addr chunk.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
@@ -76,7 +62,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
item := addressToItem(addr)
switch mode {
- case ModeSetAccess:
+ case chunk.ModeSetAccess:
// add to pull, insert to gc
// need to get access timestamp here as it is not
@@ -87,9 +73,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
switch err {
case nil:
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
case leveldb.ErrNotFound:
db.pushIndex.DeleteInBatch(batch, item)
item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.Inc(uint64(db.po(item.Address)))
+ if err != nil {
+ return err
+ }
default:
return err
}
@@ -112,7 +103,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++
- case ModeSetSync:
+ case chunk.ModeSetSync:
// delete from push, insert to gc
// need to get access timestamp here as it is not
@@ -131,6 +122,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
return err
}
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
i, err = db.retrievalAccessIndex.Get(item)
switch err {
@@ -149,7 +141,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++
- case modeSetRemove:
+ case chunk.ModeSetRemove:
// delete from retrieve, pull, gc
// need to get access timestamp here as it is not
@@ -169,6 +161,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
return err
}
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go
index 674aaabec..9ba62cd20 100644
--- a/swarm/storage/localstore/mode_set_test.go
+++ b/swarm/storage/localstore/mode_set_test.go
@@ -17,9 +17,11 @@
package localstore
import (
+ "context"
"testing"
"time"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -28,23 +30,23 @@ func TestModeSetAccess(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
- err := db.NewSetter(ModeSetAccess).Set(chunk.Address())
+ err := db.Set(context.Background(), chunk.ModeSetAccess, ch.Address())
if err != nil {
t.Fatal(err)
}
- t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 1))
- t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -56,28 +58,28 @@ func TestModeSetSync(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = db.NewSetter(ModeSetSync).Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp))
- t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, leveldb.ErrNotFound))
+ t.Run("push index", newPushIndexTest(db, ch, wantTimestamp, leveldb.ErrNotFound))
- t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -89,40 +91,39 @@ func TestModeSetRemove(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = db.NewSetter(modeSetRemove).Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetRemove, ch.Address())
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", func(t *testing.T) {
wantErr := leveldb.ErrNotFound
- _, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ _, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
}
t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
// access index should not be set
- _, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ _, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
}
t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0))
})
- t.Run("pull index", newPullIndexTest(db, chunk, 0, leveldb.ErrNotFound))
+ t.Run("pull index", newPullIndexTest(db, ch, 0, leveldb.ErrNotFound))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
-
}
diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go
index b08790124..4ca2e32e6 100644
--- a/swarm/storage/localstore/retrieval_index_test.go
+++ b/swarm/storage/localstore/retrieval_index_test.go
@@ -17,6 +17,7 @@
package localstore
import (
+ "context"
"strconv"
"testing"
@@ -61,17 +62,14 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
- requester := db.NewGetter(ModeGetRequest)
addrs := make([]chunk.Address, count)
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ ch := generateTestRandomChunk()
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
b.Fatal(err)
}
- addrs[i] = chunk.Address()
+ addrs[i] = ch.Address()
}
// set update gc test hook to signal when
// update gc goroutine is done by sending to
@@ -85,12 +83,12 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StartTimer()
for i := 0; i < count; i++ {
- err := syncer.Set(addrs[i])
+ err := db.Set(context.Background(), chunk.ModeSetSync, addrs[i])
if err != nil {
b.Fatal(err)
}
- _, err = requester.Get(addrs[i])
+ _, err = db.Get(context.Background(), chunk.ModeGetRequest, addrs[i])
if err != nil {
b.Fatal(err)
}
@@ -133,7 +131,6 @@ func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
chunks := make([]chunk.Chunk, count)
for i := 0; i < count; i++ {
chunk := generateTestRandomChunk()
@@ -142,7 +139,7 @@ func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StartTimer()
for i := 0; i < count; i++ {
- err := uploader.Put(chunks[i])
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i])
if err != nil {
b.Fatal(err)
}
diff --git a/swarm/storage/localstore/schema.go b/swarm/storage/localstore/schema.go
new file mode 100644
index 000000000..538c75d1f
--- /dev/null
+++ b/swarm/storage/localstore/schema.go
@@ -0,0 +1,52 @@
+package localstore
+
+import (
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+// The DB schema we want to use. The actual/current DB schema might differ
+// until migrations are run.
+const CurrentDbSchema = DbSchemaSanctuary
+
+// There was a time when we had no schema at all.
+const DbSchemaNone = ""
+
+// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
+const DbSchemaPurity = "purity"
+
+// "halloween" is here because we had a screw in the garbage collector index.
+// Because of that we had to rebuild the GC index to get rid of erroneous
+// entries and that takes a long time. This schema is used for bookkeeping,
+// so rebuild index will run just once.
+const DbSchemaHalloween = "halloween"
+
+const DbSchemaSanctuary = "sanctuary"
+
+// returns true if legacy database is in the datadir
+func IsLegacyDatabase(datadir string) bool {
+
+ var (
+ legacyDbSchemaKey = []byte{8}
+ )
+
+ db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128})
+ if err != nil {
+ log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err)
+ return false
+ }
+ defer db.Close()
+
+ data, err := db.Get(legacyDbSchemaKey, nil)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ // if we haven't found anything under the legacy db schema key- we are not on legacy
+ return false
+ }
+
+ log.Error("got an unexpected error fetching legacy name from the database", "err", err)
+ }
+ log.Trace("checking if database scheme is legacy", "schema name", string(data))
+ return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity
+}
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
index 0b96102e3..dd07add53 100644
--- a/swarm/storage/localstore/subscription_pull.go
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -17,28 +17,34 @@
package localstore
import (
- "bytes"
"context"
"errors"
- "fmt"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
-// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
-// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
-// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
+// is not 0, the iteration will start from the first item stored after that id. If until is not 0,
+// only chunks stored up to this id will be sent to the channel, and the returned channel will be
+// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
-func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
- chunkDescriptors := make(chan ChunkDescriptor)
+func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
+ metricName := "localstore.SubscribePull"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
+ chunkDescriptors := make(chan chunk.Descriptor)
trigger := make(chan struct{}, 1)
db.pullTriggersMu.Lock()
@@ -59,18 +65,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
var errStopSubscription = errors.New("stop subscription")
go func() {
- // close the returned ChunkDescriptor channel at the end to
+ defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
+ // close the returned chunk.Descriptor channel at the end to
// signal that the subscription is done
defer close(chunkDescriptors)
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
- if since != nil {
+ if since > 0 {
sinceItem = &shed.Item{
- Address: since.Address,
- StoreTimestamp: since.StoreTimestamp,
+ Address: db.addressInBin(bin),
+ BinID: since,
}
}
+ first := true // first iteration flag for SkipStartFromItem
for {
select {
case <-trigger:
@@ -78,17 +86,23 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
// - last index Item is reached
// - subscription stop is called
// - context is done
+ metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+ ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+ sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until))
+
+ iterStart := time.Now()
+ var count int
err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
select {
- case chunkDescriptors <- ChunkDescriptor{
- Address: item.Address,
- StoreTimestamp: item.StoreTimestamp,
+ case chunkDescriptors <- chunk.Descriptor{
+ Address: item.Address,
+ BinID: item.BinID,
}:
+ count++
// until chunk descriptor is sent
// break the iteration
- if until != nil &&
- (item.StoreTimestamp >= until.StoreTimestamp ||
- bytes.Equal(item.Address, until.Address)) {
+ if until > 0 && item.BinID >= until {
return true, errStopSubscription
}
// set next iteration start item
@@ -109,19 +123,34 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
}, &shed.IterateOptions{
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
- // iterator call, skip it in this one
- SkipStartFromItem: true,
+ // iterator call, skip it in this one, but not the item with
+ // the provided since bin id as it should be sent to a channel
+ SkipStartFromItem: !first,
Prefix: []byte{bin},
})
+
+ totalTimeMetric(metricName+".iter", iterStart)
+
+ sp.FinishWithOptions(opentracing.FinishOptions{
+ LogRecords: []opentracing.LogRecord{
+ {
+ Timestamp: time.Now(),
+ Fields: []olog.Field{olog.Int("count", count)},
+ },
+ },
+ })
+
if err != nil {
if err == errStopSubscription {
// stop subscription without any errors
// if until is reached
return
}
+ metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
return
}
+ first = false
case <-stopChan:
// terminate the subscription
// on stop
@@ -159,35 +188,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
return chunkDescriptors, stop
}
-// LastPullSubscriptionChunk returns ChunkDescriptor of the latest Chunk
+// LastPullSubscriptionBinID returns chunk bin id of the latest Chunk
// in pull syncing index for a provided bin. If there are no chunks in
-// that bin, chunk.ErrChunkNotFound is returned.
-func (db *DB) LastPullSubscriptionChunk(bin uint8) (c *ChunkDescriptor, err error) {
+// that bin, 0 value is returned.
+func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
+ metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1)
+
item, err := db.pullIndex.Last([]byte{bin})
if err != nil {
if err == leveldb.ErrNotFound {
- return nil, chunk.ErrChunkNotFound
+ return 0, nil
}
- return nil, err
+ return 0, err
}
- return &ChunkDescriptor{
- Address: item.Address,
- StoreTimestamp: item.StoreTimestamp,
- }, nil
-}
-
-// ChunkDescriptor holds information required for Pull syncing. This struct
-// is provided by subscribing to pull index.
-type ChunkDescriptor struct {
- Address chunk.Address
- StoreTimestamp int64
-}
-
-func (c *ChunkDescriptor) String() string {
- if c == nil {
- return "none"
- }
- return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
+ return item.BinID, nil
}
// triggerPullSubscriptions is used internally for starting iterations
@@ -209,3 +223,12 @@ func (db *DB) triggerPullSubscriptions(bin uint8) {
}
}
}
+
+// addressInBin returns an address that is in a specific
+// proximity order bin from database base key.
+func (db *DB) addressInBin(bin uint8) (addr chunk.Address) {
+ addr = append([]byte(nil), db.baseKey...)
+ b := bin / 8
+ addr[b] = addr[b] ^ (1 << (7 - bin%8))
+ return addr
+}
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
index d5ddae02b..bf364ed44 100644
--- a/swarm/storage/localstore/subscription_pull_test.go
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/shed"
)
// TestDB_SubscribePull uploads some chunks before and after
@@ -35,15 +36,13 @@ func TestDB_SubscribePull(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
// prepopulate database with some chunks
// before the subscription
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -54,22 +53,22 @@ func TestDB_SubscribePull(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ ch, stop := db.SubscribePull(ctx, bin, 0, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3)
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
@@ -82,15 +81,13 @@ func TestDB_SubscribePull_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
// prepopulate database with some chunks
// before the subscription
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -106,23 +103,23 @@ func TestDB_SubscribePull_multiple(t *testing.T) {
// that all of them will write every address error to errChan
for j := 0; j < subsCount; j++ {
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ ch, stop := db.SubscribePull(ctx, bin, 0, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
}
// upload some chunks just after subscribe
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3)
checkErrChan(ctx, t, errChan, wantedChunksCount*subsCount)
}
@@ -135,61 +132,52 @@ func TestDB_SubscribePull_since(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (first map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ first = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
bin := db.po(ch.Address())
- if _, ok := addrs[bin]; !ok {
- addrs[bin] = make([]chunk.Address, 0)
- }
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
+
if wanted {
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]chunk.Address, 0)
+ }
addrs[bin] = append(addrs[bin], ch.Address())
wantedChunksCount++
- }
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
-
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
+ if _, ok := first[bin]; !ok {
+ first[bin] = binIDCounter[bin]
+ }
}
}
- return last
+ return first
}
// prepopulate database with some chunks
// before the subscription
- last := uploadRandomChunks(30, false)
+ uploadRandomChunks(30, false)
- uploadRandomChunks(25, true)
+ first := uploadRandomChunks(25, true)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -200,21 +188,18 @@ func TestDB_SubscribePull_since(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- var since *ChunkDescriptor
- if c, ok := last[bin]; ok {
- since = &c
+ since, ok := first[bin]
+ if !ok {
+ continue
}
- ch, stop := db.SubscribePull(ctx, bin, since, nil)
+ ch, stop := db.SubscribePull(ctx, bin, since, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
- // upload some chunks just after subscribe
- uploadRandomChunks(15, true)
-
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
@@ -226,30 +211,22 @@ func TestDB_SubscribePull_until(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ last = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -264,14 +241,11 @@ func TestDB_SubscribePull_until(t *testing.T) {
wantedChunksCount++
}
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
return last
}
@@ -295,11 +269,11 @@ func TestDB_SubscribePull_until(t *testing.T) {
if !ok {
continue
}
- ch, stop := db.SubscribePull(ctx, bin, nil, &until)
+ ch, stop := db.SubscribePull(ctx, bin, 0, until)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
@@ -316,30 +290,22 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ last = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -354,14 +320,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
wantedChunksCount++
}
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
return last
}
@@ -387,9 +350,10 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- var since *ChunkDescriptor
- if c, ok := upload1[bin]; ok {
- since = &c
+ since, ok := upload1[bin]
+ if ok {
+ // start from the next uploaded chunk
+ since++
}
until, ok := upload2[bin]
if !ok {
@@ -397,11 +361,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
// skip this bin from testing
continue
}
- ch, stop := db.SubscribePull(ctx, bin, since, &until)
+ ch, stop := db.SubscribePull(ctx, bin, since, until)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
@@ -412,14 +376,14 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
// uploadRandomChunksBin uploads random chunks to database and adds them to
// the map of addresses ber bin.
-func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
+func uploadRandomChunksBin(t *testing.T, db *DB, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
addrsMu.Lock()
defer addrsMu.Unlock()
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -434,10 +398,10 @@ func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uin
}
}
-// readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and
-// sends error to errChan, even if it is nil, to count the number of ChunkDescriptors
+// readPullSubscriptionBin is a helper function that reads all chunk.Descriptors from a channel and
+// sends error to errChan, even if it is nil, to count the number of chunk.Descriptors
// returned by the channel.
-func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) {
+func readPullSubscriptionBin(ctx context.Context, db *DB, bin uint8, ch <-chan chunk.Descriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) {
var i int // address index
for {
select {
@@ -450,9 +414,20 @@ func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDesc
if i+1 > len(addrs[bin]) {
err = fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin)
} else {
- want := addrs[bin][i]
- if !bytes.Equal(got.Address, want) {
- err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want)
+ addr := addrs[bin][i]
+ if !bytes.Equal(got.Address, addr) {
+ err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got.Address.Hex(), addr.Hex())
+ } else {
+ want, err := db.retrievalDataIndex.Get(shed.Item{
+ Address: addr,
+ })
+ if err != nil {
+ err = fmt.Errorf("got chunk (bin id %v in bin %v) from retrieval index %s: %v", i, bin, addrs[bin][i].Hex(), err)
+ } else {
+ if got.BinID != want.BinID {
+ err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got, want)
+ }
+ }
}
}
addrsMu.Unlock()
@@ -486,27 +461,19 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC
}
}
-// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk
+// TestDB_LastPullSubscriptionBinID validates that LastPullSubscriptionBinID
// is returning the last chunk descriptor for proximity order bins by
// doing a few rounds of chunk uploads.
-func TestDB_LastPullSubscriptionChunk(t *testing.T) {
+func TestDB_LastPullSubscriptionBinID(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- last := make(map[uint8]ChunkDescriptor)
+ last := make(map[uint8]uint64)
// do a few rounds of uploads and check if
// last pull subscription chunk is correct
@@ -516,7 +483,7 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -528,32 +495,42 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) {
}
addrs[bin] = append(addrs[bin], ch.Address())
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
// check
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
want, ok := last[bin]
- got, err := db.LastPullSubscriptionChunk(bin)
+ got, err := db.LastPullSubscriptionBinID(bin)
if ok {
if err != nil {
t.Errorf("got unexpected error value %v", err)
}
- if !bytes.Equal(got.Address, want.Address) {
- t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex())
- }
- } else {
- if err != chunk.ErrChunkNotFound {
- t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound)
- }
}
+ if got != want {
+ t.Errorf("got last bin id %v, want %v", got, want)
+ }
+ }
+ }
+}
+
+// TestAddressInBin validates that function addressInBin
+// returns a valid address for every proximity order bin.
+func TestAddressInBin(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ for po := uint8(0); po < chunk.MaxPO; po++ {
+ addr := db.addressInBin(po)
+
+ got := db.po(addr)
+
+ if got != uint8(po) {
+ t.Errorf("got po %v, want %v", got, po)
}
}
}
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
index 5cbc2eb6f..f2463af2a 100644
--- a/swarm/storage/localstore/subscription_push.go
+++ b/swarm/storage/localstore/subscription_push.go
@@ -19,10 +19,15 @@ package localstore
import (
"context"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
)
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
@@ -30,6 +35,9 @@ import (
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
+ metricName := "localstore.SubscribePush"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
chunks := make(chan chunk.Chunk)
trigger := make(chan struct{}, 1)
@@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
var stopChanOnce sync.Once
go func() {
+ defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
@@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// - last index Item is reached
// - subscription stop is called
// - context is done
+ metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+ ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+
+ iterStart := time.Now()
+ var count int
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// get chunk data
dataItem, err := db.retrievalDataIndex.Get(item)
@@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
@@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// iterator call, skip it in this one
SkipStartFromItem: true,
})
+
+ totalTimeMetric(metricName+".iter", iterStart)
+
+ sp.FinishWithOptions(opentracing.FinishOptions{
+ LogRecords: []opentracing.LogRecord{
+ {
+ Timestamp: time.Now(),
+ Fields: []olog.Field{olog.Int("count", count)},
+ },
+ },
+ })
+
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore push subscription iteration", "err", err)
return
}
diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go
index 30fb98eb2..6124a534b 100644
--- a/swarm/storage/localstore/subscription_push_test.go
+++ b/swarm/storage/localstore/subscription_push_test.go
@@ -34,8 +34,6 @@ func TestDB_SubscribePush(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
chunks := make([]chunk.Chunk, 0)
var chunksMu sync.Mutex
@@ -44,14 +42,14 @@ func TestDB_SubscribePush(t *testing.T) {
defer chunksMu.Unlock()
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- chunks = append(chunks, chunk)
+ chunks = append(chunks, ch)
}
}
@@ -122,8 +120,6 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make([]chunk.Address, 0)
var addrsMu sync.Mutex
@@ -132,14 +128,14 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
defer addrsMu.Unlock()
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- addrs = append(addrs, chunk.Address())
+ addrs = append(addrs, ch.Address())
}
}