aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage')
-rw-r--r--swarm/storage/chunker_test.go21
-rw-r--r--swarm/storage/common_test.go65
-rw-r--r--swarm/storage/database.go82
-rw-r--r--swarm/storage/feed/handler.go12
-rw-r--r--swarm/storage/feed/handler_test.go13
-rw-r--r--swarm/storage/feed/testutil.go15
-rw-r--r--swarm/storage/filestore.go41
-rw-r--r--swarm/storage/filestore_test.go67
-rw-r--r--swarm/storage/hasherstore.go15
-rw-r--r--swarm/storage/hasherstore_test.go8
-rw-r--r--swarm/storage/ldbstore.go1082
-rw-r--r--swarm/storage/ldbstore_test.go788
-rw-r--r--swarm/storage/localstore.go251
-rw-r--r--swarm/storage/localstore/export.go204
-rw-r--r--swarm/storage/localstore/export_test.go80
-rw-r--r--swarm/storage/localstore/gc.go19
-rw-r--r--swarm/storage/localstore/gc_test.go48
-rw-r--r--swarm/storage/localstore/index_test.go49
-rw-r--r--swarm/storage/localstore/localstore.go80
-rw-r--r--swarm/storage/localstore/localstore_test.go55
-rw-r--r--swarm/storage/localstore/mode_get.go63
-rw-r--r--swarm/storage/localstore/mode_get_test.go67
-rw-r--r--swarm/storage/localstore/mode_has.go28
-rw-r--r--swarm/storage/localstore/mode_has_test.go13
-rw-r--r--swarm/storage/localstore/mode_put.go118
-rw-r--r--swarm/storage/localstore/mode_put_test.go116
-rw-r--r--swarm/storage/localstore/mode_set.go63
-rw-r--r--swarm/storage/localstore/mode_set_test.go35
-rw-r--r--swarm/storage/localstore/retrieval_index_test.go17
-rw-r--r--swarm/storage/localstore/schema.go52
-rw-r--r--swarm/storage/localstore/subscription_pull.go107
-rw-r--r--swarm/storage/localstore/subscription_pull_test.go243
-rw-r--r--swarm/storage/localstore/subscription_push.go29
-rw-r--r--swarm/storage/localstore/subscription_push_test.go16
-rw-r--r--swarm/storage/localstore_test.go244
-rw-r--r--swarm/storage/memstore.go92
-rw-r--r--swarm/storage/memstore_test.go158
-rw-r--r--swarm/storage/netstore.go45
-rw-r--r--swarm/storage/netstore_test.go130
-rw-r--r--swarm/storage/pyramid.go13
-rw-r--r--swarm/storage/schema.go17
-rw-r--r--swarm/storage/types.go44
42 files changed, 1236 insertions, 3469 deletions
diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go
index 9a1259444..a0fe2e769 100644
--- a/swarm/storage/chunker_test.go
+++ b/swarm/storage/chunker_test.go
@@ -24,6 +24,7 @@ import (
"io"
"testing"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/testutil"
"golang.org/x/crypto/sha3"
)
@@ -42,8 +43,10 @@ type chunkerTester struct {
t test
}
+var mockTag = chunk.NewTag(0, "mock-tag", 0)
+
func newTestHasherStore(store ChunkStore, hash string) *hasherStore {
- return NewHasherStore(store, MakeHashFunc(hash), false)
+ return NewHasherStore(store, MakeHashFunc(hash), false, chunk.NewTag(0, "test-tag", 0))
}
func testRandomBrokenData(n int, tester *chunkerTester) {
@@ -91,7 +94,7 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester)
var err error
ctx := context.TODO()
if usePyramid {
- addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter)
+ addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter, mockTag)
} else {
addr, wait, err = TreeSplit(ctx, data, int64(n), putGetter)
}
@@ -188,7 +191,7 @@ func TestDataAppend(t *testing.T) {
putGetter := newTestHasherStore(store, SHA3Hash)
ctx := context.TODO()
- addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag)
if err != nil {
tester.t.Fatalf(err.Error())
}
@@ -208,7 +211,7 @@ func TestDataAppend(t *testing.T) {
}
putGetter = newTestHasherStore(store, SHA3Hash)
- newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter)
+ newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter, mockTag)
if err != nil {
tester.t.Fatalf(err.Error())
}
@@ -278,7 +281,7 @@ func benchmarkSplitJoin(n int, t *testing.B) {
putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
ctx := context.TODO()
- key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag)
if err != nil {
t.Fatalf(err.Error())
}
@@ -335,7 +338,7 @@ func benchmarkSplitPyramidBMT(n int, t *testing.B) {
putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash)
ctx := context.Background()
- _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag)
if err != nil {
t.Fatalf(err.Error())
}
@@ -353,7 +356,7 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash)
ctx := context.Background()
- _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag)
if err != nil {
t.Fatalf(err.Error())
}
@@ -374,7 +377,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
putGetter := newTestHasherStore(store, SHA3Hash)
ctx := context.Background()
- key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
+ key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag)
if err != nil {
t.Fatalf(err.Error())
}
@@ -384,7 +387,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
}
putGetter = newTestHasherStore(store, SHA3Hash)
- _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter)
+ _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter, mockTag)
if err != nil {
t.Fatalf(err.Error())
}
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index c4d187b62..100e778a3 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -22,8 +22,6 @@ import (
"flag"
"fmt"
"io"
- "io/ioutil"
- "os"
"sync"
"testing"
"time"
@@ -59,30 +57,6 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader
}
}
-func newLDBStore(t *testing.T) (*LDBStore, func()) {
- dir, err := ioutil.TempDir("", "bzz-storage-test")
- if err != nil {
- t.Fatal(err)
- }
- log.Trace("memstore.tempdir", "dir", dir)
-
- ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir)
- db, err := NewLDBStore(ldbparams)
- if err != nil {
- t.Fatal(err)
- }
-
- cleanup := func() {
- db.Close()
- err := os.RemoveAll(dir)
- if err != nil {
- t.Fatal(err)
- }
- }
-
- return db, cleanup
-}
-
func mputRandomChunks(store ChunkStore, n int) ([]Chunk, error) {
return mput(store, n, GenerateRandomChunk)
}
@@ -94,14 +68,15 @@ func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
for i := int64(0); i < int64(n); i++ {
- chunk := f(chunk.DefaultSize)
+ ch := f(chunk.DefaultSize)
go func() {
+ _, err := store.Put(ctx, chunk.ModePutUpload, ch)
select {
- case errc <- store.Put(ctx, chunk):
+ case errc <- err:
case <-ctx.Done():
}
}()
- hs = append(hs, chunk)
+ hs = append(hs, ch)
}
// wait for all chunks to be stored
@@ -123,13 +98,13 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error)
go func(h Address) {
defer wg.Done()
// TODO: write timeout with context
- chunk, err := store.Get(context.TODO(), h)
+ ch, err := store.Get(context.TODO(), chunk.ModeGetRequest, h)
if err != nil {
errc <- err
return
}
if f != nil {
- err = f(h, chunk)
+ err = f(h, ch)
if err != nil {
errc <- err
return
@@ -250,14 +225,15 @@ func NewMapChunkStore() *MapChunkStore {
}
}
-func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error {
+func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
+ _, exists := m.chunks[ch.Address().Hex()]
m.chunks[ch.Address().Hex()] = ch
- return nil
+ return exists, nil
}
-func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
+func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
m.mu.RLock()
defer m.mu.RUnlock()
chunk := m.chunks[ref.Hex()]
@@ -268,15 +244,28 @@ func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
}
// Need to implement Has from SyncChunkStore
-func (m *MapChunkStore) Has(ctx context.Context, ref Address) bool {
+func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err error) {
m.mu.RLock()
defer m.mu.RUnlock()
- _, has := m.chunks[ref.Hex()]
- return has
+ _, has = m.chunks[ref.Hex()]
+ return has, nil
+}
+
+func (m *MapChunkStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
+ return nil
+}
+
+func (m *MapChunkStore) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
+ return 0, nil
}
-func (m *MapChunkStore) Close() {
+func (m *MapChunkStore) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
+ return nil, nil
+}
+
+func (m *MapChunkStore) Close() error {
+ return nil
}
func chunkAddresses(chunks []Chunk) []Address {
diff --git a/swarm/storage/database.go b/swarm/storage/database.go
deleted file mode 100644
index 12367b905..000000000
--- a/swarm/storage/database.go
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package storage
-
-// this is a clone of an earlier state of the ethereum ethdb/database
-// no need for queueing/caching
-
-import (
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
-)
-
-const openFileLimit = 128
-
-type LDBDatabase struct {
- db *leveldb.DB
-}
-
-func NewLDBDatabase(file string) (*LDBDatabase, error) {
- // Open the db
- db, err := leveldb.OpenFile(file, &opt.Options{OpenFilesCacheCapacity: openFileLimit})
- if err != nil {
- return nil, err
- }
-
- database := &LDBDatabase{db: db}
-
- return database, nil
-}
-
-func (db *LDBDatabase) Put(key []byte, value []byte) error {
- metrics.GetOrRegisterCounter("ldbdatabase.put", nil).Inc(1)
-
- return db.db.Put(key, value, nil)
-}
-
-func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
- metrics.GetOrRegisterCounter("ldbdatabase.get", nil).Inc(1)
-
- dat, err := db.db.Get(key, nil)
- if err != nil {
- return nil, err
- }
- return dat, nil
-}
-
-func (db *LDBDatabase) Delete(key []byte) error {
- return db.db.Delete(key, nil)
-}
-
-func (db *LDBDatabase) NewIterator() iterator.Iterator {
- metrics.GetOrRegisterCounter("ldbdatabase.newiterator", nil).Inc(1)
-
- return db.db.NewIterator(nil, nil)
-}
-
-func (db *LDBDatabase) Write(batch *leveldb.Batch) error {
- metrics.GetOrRegisterCounter("ldbdatabase.write", nil).Inc(1)
-
- return db.db.Write(batch, nil)
-}
-
-func (db *LDBDatabase) Close() {
- // Close the leveldb database
- db.db.Close()
-}
diff --git a/swarm/storage/feed/handler.go b/swarm/storage/feed/handler.go
index 61124e2db..0f6f2ba34 100644
--- a/swarm/storage/feed/handler.go
+++ b/swarm/storage/feed/handler.go
@@ -24,6 +24,8 @@ import (
"fmt"
"sync"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
"github.com/ethereum/go-ethereum/swarm/log"
@@ -189,7 +191,7 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
defer cancel()
- chunk, err := h.chunkStore.Get(ctx, id.Addr())
+ ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, id.Addr())
if err != nil {
if err == context.DeadlineExceeded { // chunk not found
return nil, nil
@@ -198,7 +200,7 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
}
var request Request
- if err := request.fromChunk(chunk); err != nil {
+ if err := request.fromChunk(ch); err != nil {
return nil, nil
}
if request.Time <= timeLimit {
@@ -257,14 +259,14 @@ func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Ad
return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
}
- chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
+ ch, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
if err != nil {
return nil, err
}
// send the chunk
- h.chunkStore.Put(ctx, chunk)
- log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
+ h.chunkStore.Put(ctx, chunk.ModePutUpload, ch)
+ log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", ch.Data())
// update our feed updates map cache entry if the new update is older than the one we have, if we have it.
if feedUpdate != nil && r.Epoch.After(feedUpdate.Epoch) {
feedUpdate.Epoch = r.Epoch
diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go
index 2f8a52453..c4f6fe689 100644
--- a/swarm/storage/feed/handler_test.go
+++ b/swarm/storage/feed/handler_test.go
@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
+ "github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
var (
@@ -400,9 +401,7 @@ func TestValidatorInStore(t *testing.T) {
}
defer os.RemoveAll(datadir)
- handlerParams := storage.NewDefaultLocalStoreParams()
- handlerParams.Init(datadir)
- store, err := storage.NewLocalStore(handlerParams, nil)
+ localstore, err := localstore.New(datadir, make([]byte, 32), nil)
if err != nil {
t.Fatal(err)
}
@@ -410,7 +409,7 @@ func TestValidatorInStore(t *testing.T) {
// set up Swarm feeds handler and add is as a validator to the localstore
fhParams := &HandlerParams{}
fh := NewHandler(fhParams)
- store.Validators = append(store.Validators, fh)
+ store := chunk.NewValidatorStore(localstore, fh)
// create content addressed chunks, one good, one faulty
chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2)
@@ -447,15 +446,15 @@ func TestValidatorInStore(t *testing.T) {
}
// put the chunks in the store and check their error status
- err = store.Put(context.Background(), goodChunk)
+ _, err = store.Put(context.Background(), chunk.ModePutUpload, goodChunk)
if err == nil {
t.Fatal("expected error on good content address chunk with feed update validator only, but got nil")
}
- err = store.Put(context.Background(), badChunk)
+ _, err = store.Put(context.Background(), chunk.ModePutUpload, badChunk)
if err == nil {
t.Fatal("expected error on bad content address chunk with feed update validator only, but got nil")
}
- err = store.Put(context.Background(), uglyChunk)
+ _, err = store.Put(context.Background(), chunk.ModePutUpload, uglyChunk)
if err != nil {
t.Fatalf("expected no error on feed update chunk with feed update validator only, but got: %s", err)
}
diff --git a/swarm/storage/feed/testutil.go b/swarm/storage/feed/testutil.go
index caa39d9ff..db2d989e1 100644
--- a/swarm/storage/feed/testutil.go
+++ b/swarm/storage/feed/testutil.go
@@ -18,12 +18,13 @@ package feed
import (
"context"
- "fmt"
"path/filepath"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
const (
@@ -53,14 +54,14 @@ func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetF
func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) {
path := filepath.Join(datadir, testDbDirName)
fh := NewHandler(params)
- localstoreparams := storage.NewDefaultLocalStoreParams()
- localstoreparams.Init(path)
- localStore, err := storage.NewLocalStore(localstoreparams, nil)
+
+ db, err := localstore.New(filepath.Join(path, "chunks"), make([]byte, 32), nil)
if err != nil {
- return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err)
+ return nil, err
}
- localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)))
- localStore.Validators = append(localStore.Validators, fh)
+
+ localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)
+
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, err
diff --git a/swarm/storage/filestore.go b/swarm/storage/filestore.go
index 0bad944ee..dc096e56c 100644
--- a/swarm/storage/filestore.go
+++ b/swarm/storage/filestore.go
@@ -21,6 +21,9 @@ import (
"io"
"sort"
"sync"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
/*
@@ -44,6 +47,7 @@ const (
type FileStore struct {
ChunkStore
hashFunc SwarmHasher
+ tags *chunk.Tags
}
type FileStoreParams struct {
@@ -57,22 +61,20 @@ func NewFileStoreParams() *FileStoreParams {
}
// for testing locally
-func NewLocalFileStore(datadir string, basekey []byte) (*FileStore, error) {
- params := NewDefaultLocalStoreParams()
- params.Init(datadir)
- localStore, err := NewLocalStore(params, nil)
+func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, error) {
+ localStore, err := localstore.New(datadir, basekey, nil)
if err != nil {
return nil, err
}
- localStore.Validators = append(localStore.Validators, NewContentAddressValidator(MakeHashFunc(DefaultHash)))
- return NewFileStore(localStore, NewFileStoreParams()), nil
+ return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), nil
}
-func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
+func NewFileStore(store ChunkStore, params *FileStoreParams, tags *chunk.Tags) *FileStore {
hashFunc := MakeHashFunc(params.Hash)
return &FileStore{
ChunkStore: store,
hashFunc: hashFunc,
+ tags: tags,
}
}
@@ -83,7 +85,11 @@ func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
// It returns a reader with the chunk data and whether the content was encrypted
func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) {
isEncrypted = len(addr) > f.hashFunc().Size()
- getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted)
+ tag, err := f.tags.GetFromContext(ctx)
+ if err != nil {
+ tag = chunk.NewTag(0, "ephemeral-retrieval-tag", 0)
+ }
+ getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted, tag)
reader = TreeJoin(ctx, addr, getter, 0)
return
}
@@ -91,8 +97,17 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu
// Store is a public API. Main entry point for document storage directly. Used by the
// FS-aware API and httpaccess
func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) {
- putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt)
- return PyramidSplit(ctx, data, putter, putter)
+ tag, err := f.tags.GetFromContext(ctx)
+ if err != nil {
+ // some of the parts of the codebase, namely the manifest trie, do not store the context
+ // of the original request nor the tag with the trie, recalculating the trie hence
+ // loses the tag uid. thus we create an ephemeral tag here for that purpose
+
+ tag = chunk.NewTag(0, "", 0)
+ //return nil, nil, err
+ }
+ putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag)
+ return PyramidSplit(ctx, data, putter, putter, tag)
}
func (f *FileStore) HashSize() int {
@@ -101,12 +116,14 @@ func (f *FileStore) HashSize() int {
// GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file
func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) {
+ tag := chunk.NewTag(0, "ephemeral-tag", 0) //this tag is just a mock ephemeral tag since we don't want to save these results
+
// create a special kind of putter, which only will store the references
putter := &hashExplorer{
- hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt),
+ hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag),
}
// do the actual splitting anyway, no way around it
- _, wait, err := PyramidSplit(ctx, data, putter, putter)
+ _, wait, err := PyramidSplit(ctx, data, putter, putter, tag)
if err != nil {
return nil, err
}
diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go
index 06c4be1d7..d0a167a24 100644
--- a/swarm/storage/filestore_test.go
+++ b/swarm/storage/filestore_test.go
@@ -22,8 +22,11 @@ import (
"io"
"io/ioutil"
"os"
+ "path/filepath"
"testing"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/storage/localstore"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
@@ -35,21 +38,18 @@ func TestFileStorerandom(t *testing.T) {
}
func testFileStoreRandom(toEncrypt bool, t *testing.T) {
- tdb, cleanup, err := newTestDbStore(false, false)
- defer cleanup()
+ dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
+ t.Fatal(err)
}
- db := tdb.LDBStore
- db.setCapacity(50000)
- memStore := NewMemStore(NewDefaultStoreParams(), db)
- localStore := &LocalStore{
- memStore: memStore,
- DbStore: db,
+ defer os.RemoveAll(dir)
+ localStore, err := localstore.New(dir, make([]byte, 32), nil)
+ if err != nil {
+ t.Fatal(err)
}
+ defer localStore.Close()
- fileStore := NewFileStore(localStore, NewFileStoreParams())
- defer os.RemoveAll("/tmp/bzz")
+ fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags())
slice := testutil.RandomBytes(1, testDataSize)
ctx := context.TODO()
@@ -76,9 +76,8 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
if !bytes.Equal(slice, resultSlice) {
t.Fatalf("Comparison error.")
}
- ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666)
- ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666)
- localStore.memStore = NewMemStore(NewDefaultStoreParams(), db)
+ ioutil.WriteFile(filepath.Join(dir, "slice.bzz.16M"), slice, 0666)
+ ioutil.WriteFile(filepath.Join(dir, "result.bzz.16M"), resultSlice, 0666)
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
@@ -104,18 +103,18 @@ func TestFileStoreCapacity(t *testing.T) {
}
func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
- tdb, cleanup, err := newTestDbStore(false, false)
- defer cleanup()
+ dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
+ t.Fatal(err)
}
- db := tdb.LDBStore
- memStore := NewMemStore(NewDefaultStoreParams(), db)
- localStore := &LocalStore{
- memStore: memStore,
- DbStore: db,
+ defer os.RemoveAll(dir)
+ localStore, err := localstore.New(dir, make([]byte, 32), nil)
+ if err != nil {
+ t.Fatal(err)
}
- fileStore := NewFileStore(localStore, NewFileStoreParams())
+ defer localStore.Close()
+
+ fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags())
slice := testutil.RandomBytes(1, testDataSize)
ctx := context.TODO()
key, wait, err := fileStore.Store(ctx, bytes.NewReader(slice), testDataSize, toEncrypt)
@@ -141,10 +140,6 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
if !bytes.Equal(slice, resultSlice) {
t.Fatalf("Comparison error.")
}
- // Clear memStore
- memStore.setCapacity(0)
- // check whether it is, indeed, empty
- fileStore.ChunkStore = memStore
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
@@ -177,18 +172,18 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
// TestGetAllReferences only tests that GetAllReferences returns an expected
// number of references for a given file
func TestGetAllReferences(t *testing.T) {
- tdb, cleanup, err := newTestDbStore(false, false)
- defer cleanup()
+ dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
+ t.Fatal(err)
}
- db := tdb.LDBStore
- memStore := NewMemStore(NewDefaultStoreParams(), db)
- localStore := &LocalStore{
- memStore: memStore,
- DbStore: db,
+ defer os.RemoveAll(dir)
+ localStore, err := localstore.New(dir, make([]byte, 32), nil)
+ if err != nil {
+ t.Fatal(err)
}
- fileStore := NewFileStore(localStore, NewFileStoreParams())
+ defer localStore.Close()
+
+ fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags())
// testRuns[i] and expectedLen[i] are dataSize and expected length respectively
testRuns := []int{1024, 8192, 16000, 30000, 1000000}
diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go
index 345ce7430..1e702f11a 100644
--- a/swarm/storage/hasherstore.go
+++ b/swarm/storage/hasherstore.go
@@ -28,6 +28,7 @@ import (
type hasherStore struct {
store ChunkStore
+ tag *chunk.Tag
toEncrypt bool
hashFunc SwarmHasher
hashSize int // content hash size
@@ -44,7 +45,7 @@ type hasherStore struct {
// NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces.
// With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore
// and the hasherStore will take core of encryption/decryption of data if necessary
-func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore {
+func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool, tag *chunk.Tag) *hasherStore {
hashSize := hashFunc().Size()
refSize := int64(hashSize)
if toEncrypt {
@@ -53,6 +54,7 @@ func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *has
h := &hasherStore{
store: store,
+ tag: tag,
toEncrypt: toEncrypt,
hashFunc: hashFunc,
hashSize: hashSize,
@@ -93,7 +95,7 @@ func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error)
return nil, err
}
- chunk, err := h.store.Get(ctx, addr)
+ chunk, err := h.store.Get(ctx, chunk.ModeGetRequest, addr)
if err != nil {
return nil, err
}
@@ -239,11 +241,16 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio
return encryption.New(key, int(chunk.DefaultSize), 0, sha3.NewLegacyKeccak256)
}
-func (h *hasherStore) storeChunk(ctx context.Context, chunk Chunk) {
+func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
atomic.AddUint64(&h.nrChunks, 1)
go func() {
+ seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
+ h.tag.Inc(chunk.StateStored)
+ if seen {
+ h.tag.Inc(chunk.StateSeen)
+ }
select {
- case h.errC <- h.store.Put(ctx, chunk):
+ case h.errC <- err:
case <-h.quitC:
}
}()
diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go
index 22cf98d0e..9dfd7ab1d 100644
--- a/swarm/storage/hasherstore_test.go
+++ b/swarm/storage/hasherstore_test.go
@@ -21,9 +21,9 @@ import (
"context"
"testing"
- "github.com/ethereum/go-ethereum/swarm/storage/encryption"
-
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/storage/encryption"
)
func TestHasherStore(t *testing.T) {
@@ -43,7 +43,7 @@ func TestHasherStore(t *testing.T) {
for _, tt := range tests {
chunkStore := NewMapChunkStore()
- hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt)
+ hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 0))
// Put two random chunks into the hasherStore
chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data()
@@ -107,7 +107,7 @@ func TestHasherStore(t *testing.T) {
}
// Check if chunk data in store is encrypted or not
- chunkInStore, err := chunkStore.Get(ctx, hash1)
+ chunkInStore, err := chunkStore.Get(ctx, chunk.ModeGetRequest, hash1)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
deleted file mode 100644
index fd5ec9e30..000000000
--- a/swarm/storage/ldbstore.go
+++ /dev/null
@@ -1,1082 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-// disk storage layer for the package bzz
-// DbStore implements the ChunkStore interface and is used by the FileStore as
-// persistent storage of chunks
-// it implements purging based on access count allowing for external control of
-// max capacity
-
-package storage
-
-import (
- "archive/tar"
- "bytes"
- "context"
- "encoding/binary"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "sync"
-
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/storage/mock"
- "github.com/syndtr/goleveldb/leveldb"
-)
-
-const (
- defaultGCRatio = 10
- defaultMaxGCRound = 10000
- defaultMaxGCBatch = 5000
-
- wEntryCnt = 1 << 0
- wIndexCnt = 1 << 1
- wAccessCnt = 1 << 2
-)
-
-var (
- dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
-)
-
-var (
- keyIndex = byte(0)
- keyAccessCnt = []byte{2}
- keyEntryCnt = []byte{3}
- keyDataIdx = []byte{4}
- keyData = byte(6)
- keyDistanceCnt = byte(7)
- keySchema = []byte{8}
- keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry
-)
-
-var (
- ErrDBClosed = errors.New("LDBStore closed")
-)
-
-type LDBStoreParams struct {
- *StoreParams
- Path string
- Po func(Address) uint8
-}
-
-// NewLDBStoreParams constructs LDBStoreParams with the specified values.
-func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
- return &LDBStoreParams{
- StoreParams: storeparams,
- Path: path,
- Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
- }
-}
-
-type garbage struct {
- maxRound int // maximum number of chunks to delete in one garbage collection round
- maxBatch int // maximum number of chunks to delete in one db request batch
- ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db
- count int // number of chunks deleted in running round
- target int // number of chunks to delete in running round
- batch *dbBatch // the delete batch
- runC chan struct{} // struct in chan means gc is NOT running
-}
-
-type LDBStore struct {
- db *LDBDatabase
-
- // this should be stored in db, accessed transactionally
- entryCnt uint64 // number of items in the LevelDB
- accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
- dataIdx uint64 // similar to entryCnt, but we only increment it
- capacity uint64
- bucketCnt []uint64
-
- hashfunc SwarmHasher
- po func(Address) uint8
-
- batchesC chan struct{}
- closed bool
- batch *dbBatch
- lock sync.RWMutex
- quit chan struct{}
- gc *garbage
-
- // Functions encodeDataFunc is used to bypass
- // the default functionality of DbStore with
- // mock.NodeStore for testing purposes.
- encodeDataFunc func(chunk Chunk) []byte
- // If getDataFunc is defined, it will be used for
- // retrieving the chunk data instead from the local
- // LevelDB database.
- getDataFunc func(key Address) (data []byte, err error)
-}
-
-type dbBatch struct {
- *leveldb.Batch
- err error
- c chan struct{}
-}
-
-func newBatch() *dbBatch {
- return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
-}
-
-// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
-// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
-// a function different from the one that is actually used.
-func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
- s = new(LDBStore)
- s.hashfunc = params.Hash
- s.quit = make(chan struct{})
-
- s.batchesC = make(chan struct{}, 1)
- go s.writeBatches()
- s.batch = newBatch()
- // associate encodeData with default functionality
- s.encodeDataFunc = encodeData
-
- s.db, err = NewLDBDatabase(params.Path)
- if err != nil {
- return nil, err
- }
-
- s.po = params.Po
- s.setCapacity(params.DbCapacity)
-
- s.bucketCnt = make([]uint64, 0x100)
- for i := 0; i < 0x100; i++ {
- k := make([]byte, 2)
- k[0] = keyDistanceCnt
- k[1] = uint8(i)
- cnt, _ := s.db.Get(k)
- s.bucketCnt[i] = BytesToU64(cnt)
- }
- data, _ := s.db.Get(keyEntryCnt)
- s.entryCnt = BytesToU64(data)
- data, _ = s.db.Get(keyAccessCnt)
- s.accessCnt = BytesToU64(data)
- data, _ = s.db.Get(keyDataIdx)
- s.dataIdx = BytesToU64(data)
-
- // set up garbage collection
- s.gc = &garbage{
- maxBatch: defaultMaxGCBatch,
- maxRound: defaultMaxGCRound,
- ratio: defaultGCRatio,
- }
-
- s.gc.runC = make(chan struct{}, 1)
- s.gc.runC <- struct{}{}
-
- return s, nil
-}
-
-// MarkAccessed increments the access counter as a best effort for a chunk, so
-// the chunk won't get garbage collected.
-func (s *LDBStore) MarkAccessed(addr Address) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- if s.closed {
- return
- }
-
- proximity := s.po(addr)
- s.tryAccessIdx(addr, proximity)
-}
-
-// initialize and set values for processing of gc round
-func (s *LDBStore) startGC(c int) {
-
- s.gc.count = 0
- // calculate the target number of deletions
- if c >= s.gc.maxRound {
- s.gc.target = s.gc.maxRound
- } else {
- s.gc.target = c / s.gc.ratio
- }
- s.gc.batch = newBatch()
- log.Debug("startgc", "requested", c, "target", s.gc.target)
-}
-
-// NewMockDbStore creates a new instance of DbStore with
-// mockStore set to a provided value. If mockStore argument is nil,
-// this function behaves exactly as NewDbStore.
-func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
- s, err = NewLDBStore(params)
- if err != nil {
- return nil, err
- }
-
- // replace put and get with mock store functionality
- if mockStore != nil {
- s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
- s.getDataFunc = newMockGetDataFunc(mockStore)
- }
- return
-}
-
-type dpaDBIndex struct {
- Idx uint64
- Access uint64
-}
-
-func BytesToU64(data []byte) uint64 {
- if len(data) < 8 {
- return 0
- }
- return binary.BigEndian.Uint64(data)
-}
-
-func U64ToBytes(val uint64) []byte {
- data := make([]byte, 8)
- binary.BigEndian.PutUint64(data, val)
- return data
-}
-
-func getIndexKey(hash Address) []byte {
- hashSize := len(hash)
- key := make([]byte, hashSize+1)
- key[0] = keyIndex
- copy(key[1:], hash[:])
- return key
-}
-
-func getDataKey(idx uint64, po uint8) []byte {
- key := make([]byte, 10)
- key[0] = keyData
- key[1] = po
- binary.BigEndian.PutUint64(key[2:], idx)
-
- return key
-}
-
-func getGCIdxKey(index *dpaDBIndex) []byte {
- key := make([]byte, 9)
- key[0] = keyGCIdx
- binary.BigEndian.PutUint64(key[1:], index.Access)
- return key
-}
-
-func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
- val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32
- val[0] = po
- binary.BigEndian.PutUint64(val[1:], index.Idx)
- copy(val[9:], addr)
- return val
-}
-
-func parseIdxKey(key []byte) (byte, []byte) {
- return key[0], key[1:]
-}
-
-func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
- index = &dpaDBIndex{
- Idx: binary.BigEndian.Uint64(val[1:]),
- Access: binary.BigEndian.Uint64(accessCnt),
- }
- po = val[0]
- addr = val[9:]
- return
-}
-
-func encodeIndex(index *dpaDBIndex) []byte {
- data, _ := rlp.EncodeToBytes(index)
- return data
-}
-
-func encodeData(chunk Chunk) []byte {
- // Always create a new underlying array for the returned byte slice.
- // The chunk.Address array may be used in the returned slice which
- // may be changed later in the code or by the LevelDB, resulting
- // that the Address is changed as well.
- return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
-}
-
-func decodeIndex(data []byte, index *dpaDBIndex) error {
- dec := rlp.NewStream(bytes.NewReader(data), 0)
- return dec.Decode(index)
-}
-
-func decodeData(addr Address, data []byte) (Chunk, error) {
- return NewChunk(addr, data[32:]), nil
-}
-
-func (s *LDBStore) collectGarbage() error {
- // prevent duplicate gc from starting when one is already running
- select {
- case <-s.gc.runC:
- default:
- return nil
- }
-
- s.lock.Lock()
- entryCnt := s.entryCnt
- s.lock.Unlock()
-
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
-
- // calculate the amount of chunks to collect and reset counter
- s.startGC(int(entryCnt))
- log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt)
-
- for s.gc.count < s.gc.target {
- it := s.db.NewIterator()
- ok := it.Seek([]byte{keyGCIdx})
- var singleIterationCount int
-
- // every batch needs a lock so we avoid entries changing accessidx in the meantime
- s.lock.Lock()
- for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() {
-
- // quit if no more access index keys
- itkey := it.Key()
- if (itkey == nil) || (itkey[0] != keyGCIdx) {
- break
- }
-
- // get chunk data entry from access index
- val := it.Value()
- index, po, hash := parseGCIdxEntry(itkey[1:], val)
- keyIdx := make([]byte, 33)
- keyIdx[0] = keyIndex
- copy(keyIdx[1:], hash)
-
- // add delete operation to batch
- s.delete(s.gc.batch.Batch, index, keyIdx, po)
- singleIterationCount++
- s.gc.count++
- log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
-
- // break if target is not on max garbage batch boundary
- if s.gc.count >= s.gc.target {
- break
- }
- }
-
- s.writeBatch(s.gc.batch, wEntryCnt)
- log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count)
- s.lock.Unlock()
- it.Release()
- }
-
- metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(s.gc.count))
- log.Debug("garbage collect done", "c", s.gc.count)
- s.gc.runC <- struct{}{}
-
- return nil
-}
-
-// Export writes all chunks from the store to a tar archive, returning the
-// number of chunks written.
-func (s *LDBStore) Export(out io.Writer) (int64, error) {
- tw := tar.NewWriter(out)
- defer tw.Close()
-
- it := s.db.NewIterator()
- defer it.Release()
- var count int64
- for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != keyIndex) {
- break
- }
-
- var index dpaDBIndex
-
- hash := key[1:]
- decodeIndex(it.Value(), &index)
- po := s.po(hash)
- datakey := getDataKey(index.Idx, po)
- log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
- data, err := s.db.Get(datakey)
- if err != nil {
- log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
- continue
- }
-
- hdr := &tar.Header{
- Name: hex.EncodeToString(hash),
- Mode: 0644,
- Size: int64(len(data)),
- }
- if err := tw.WriteHeader(hdr); err != nil {
- return count, err
- }
- if _, err := tw.Write(data); err != nil {
- return count, err
- }
- count++
- }
-
- return count, nil
-}
-
-// of chunks read.
-func (s *LDBStore) Import(in io.Reader) (int64, error) {
- tr := tar.NewReader(in)
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- countC := make(chan int64)
- errC := make(chan error)
- var count int64
- go func() {
- for {
- hdr, err := tr.Next()
- if err == io.EOF {
- break
- } else if err != nil {
- select {
- case errC <- err:
- case <-ctx.Done():
- }
- }
-
- if len(hdr.Name) != 64 {
- log.Warn("ignoring non-chunk file", "name", hdr.Name)
- continue
- }
-
- keybytes, err := hex.DecodeString(hdr.Name)
- if err != nil {
- log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
- continue
- }
-
- data, err := ioutil.ReadAll(tr)
- if err != nil {
- select {
- case errC <- err:
- case <-ctx.Done():
- }
- }
- key := Address(keybytes)
- chunk := NewChunk(key, data[32:])
-
- go func() {
- select {
- case errC <- s.Put(ctx, chunk):
- case <-ctx.Done():
- }
- }()
-
- count++
- }
- countC <- count
- }()
-
- // wait for all chunks to be stored
- i := int64(0)
- var total int64
- for {
- select {
- case err := <-errC:
- if err != nil {
- return count, err
- }
- i++
- case total = <-countC:
- case <-ctx.Done():
- return i, ctx.Err()
- }
- if total > 0 && i == total {
- return total, nil
- }
- }
-}
-
-// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
-func (s *LDBStore) Cleanup(f func(Chunk) bool) {
- var errorsFound, removed, total int
-
- it := s.db.NewIterator()
- defer it.Release()
- for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != keyIndex) {
- break
- }
- total++
- var index dpaDBIndex
- err := decodeIndex(it.Value(), &index)
- if err != nil {
- log.Warn("Cannot decode")
- errorsFound++
- continue
- }
- hash := key[1:]
- po := s.po(hash)
- datakey := getDataKey(index.Idx, po)
- data, err := s.db.Get(datakey)
- if err != nil {
- found := false
-
- // The highest possible proximity is 255, so exit loop upon overflow.
- for po = uint8(1); po != 0; po++ {
- datakey = getDataKey(index.Idx, po)
- data, err = s.db.Get(datakey)
- if err == nil {
- found = true
- break
- }
- }
-
- if !found {
- log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
- errorsFound++
- continue
- }
- }
-
- ck := data[:32]
- c, err := decodeData(ck, data)
- if err != nil {
- log.Error("decodeData error", "err", err)
- continue
- }
-
- sdata := c.Data()
-
- cs := int64(binary.LittleEndian.Uint64(sdata[:8]))
- log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(sdata), "size", cs)
-
- // if chunk is to be removed
- if f(c) {
- log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(sdata), "size", cs)
- s.deleteNow(&index, getIndexKey(key[1:]), po)
- removed++
- errorsFound++
- }
- }
-
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
-}
-
-// CleanGCIndex rebuilds the garbage collector index from scratch, while
-// removing inconsistent elements, e.g., indices with missing data chunks.
-// WARN: it's a pretty heavy, long running function.
-func (s *LDBStore) CleanGCIndex() error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- batch := leveldb.Batch{}
-
- var okEntryCount uint64
- var totalEntryCount uint64
-
- // throw out all gc indices, we will rebuild from cleaned index
- it := s.db.NewIterator()
- it.Seek([]byte{keyGCIdx})
- var gcDeletes int
- for it.Valid() {
- rowType, _ := parseIdxKey(it.Key())
- if rowType != keyGCIdx {
- break
- }
- batch.Delete(it.Key())
- gcDeletes++
- it.Next()
- }
- log.Debug("gc", "deletes", gcDeletes)
- if err := s.db.Write(&batch); err != nil {
- return err
- }
- batch.Reset()
-
- it.Release()
-
- // corrected po index pointer values
- var poPtrs [256]uint64
-
- // set to true if chunk count not on 4096 iteration boundary
- var doneIterating bool
-
- // last key index in previous iteration
- lastIdxKey := []byte{keyIndex}
-
- // counter for debug output
- var cleanBatchCount int
-
- // go through all key index entries
- for !doneIterating {
- cleanBatchCount++
- var idxs []dpaDBIndex
- var chunkHashes [][]byte
- var pos []uint8
- it := s.db.NewIterator()
-
- it.Seek(lastIdxKey)
-
- // 4096 is just a nice number, don't look for any hidden meaning here...
- var i int
- for i = 0; i < 4096; i++ {
-
- // this really shouldn't happen unless database is empty
- // but let's keep it to be safe
- if !it.Valid() {
- doneIterating = true
- break
- }
-
- // if it's not keyindex anymore we're done iterating
- rowType, chunkHash := parseIdxKey(it.Key())
- if rowType != keyIndex {
- doneIterating = true
- break
- }
-
- // decode the retrieved index
- var idx dpaDBIndex
- err := decodeIndex(it.Value(), &idx)
- if err != nil {
- return fmt.Errorf("corrupt index: %v", err)
- }
- po := s.po(chunkHash)
- lastIdxKey = it.Key()
-
- // if we don't find the data key, remove the entry
- // if we find it, add to the array of new gc indices to create
- dataKey := getDataKey(idx.Idx, po)
- _, err = s.db.Get(dataKey)
- if err != nil {
- log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
- batch.Delete(it.Key())
- } else {
- idxs = append(idxs, idx)
- chunkHashes = append(chunkHashes, chunkHash)
- pos = append(pos, po)
- okEntryCount++
- if idx.Idx > poPtrs[po] {
- poPtrs[po] = idx.Idx
- }
- }
- totalEntryCount++
- it.Next()
- }
- it.Release()
-
- // flush the key index corrections
- err := s.db.Write(&batch)
- if err != nil {
- return err
- }
- batch.Reset()
-
- // add correct gc indices
- for i, okIdx := range idxs {
- gcIdxKey := getGCIdxKey(&okIdx)
- gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
- batch.Put(gcIdxKey, gcIdxData)
- log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
- }
-
- // flush them
- err = s.db.Write(&batch)
- if err != nil {
- return err
- }
- batch.Reset()
-
- log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
- }
-
- log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
-
- // lastly add updated entry count
- var entryCount [8]byte
- binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
- batch.Put(keyEntryCnt, entryCount[:])
-
- // and add the new po index pointers
- var poKey [2]byte
- poKey[0] = keyDistanceCnt
- for i, poPtr := range poPtrs {
- poKey[1] = uint8(i)
- if poPtr == 0 {
- batch.Delete(poKey[:])
- } else {
- var idxCount [8]byte
- binary.BigEndian.PutUint64(idxCount[:], poPtr)
- batch.Put(poKey[:], idxCount[:])
- }
- }
-
- // if you made it this far your harddisk has survived. Congratulations
- return s.db.Write(&batch)
-}
-
-// Delete is removes a chunk and updates indices.
-// Is thread safe
-func (s *LDBStore) Delete(addr Address) error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- ikey := getIndexKey(addr)
-
- idata, err := s.db.Get(ikey)
- if err != nil {
- return err
- }
-
- var idx dpaDBIndex
- decodeIndex(idata, &idx)
- proximity := s.po(addr)
- return s.deleteNow(&idx, ikey, proximity)
-}
-
-// executes one delete operation immediately
-// see *LDBStore.delete
-func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error {
- batch := new(leveldb.Batch)
- s.delete(batch, idx, idxKey, po)
- return s.db.Write(batch)
-}
-
-// adds a delete chunk operation to the provided batch
-// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64
-func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) {
- metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
-
- gcIdxKey := getGCIdxKey(idx)
- batch.Delete(gcIdxKey)
- dataKey := getDataKey(idx.Idx, po)
- batch.Delete(dataKey)
- batch.Delete(idxKey)
- s.entryCnt--
- dbEntryCount.Dec(1)
- cntKey := make([]byte, 2)
- cntKey[0] = keyDistanceCnt
- cntKey[1] = po
- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
-}
-
-func (s *LDBStore) BinIndex(po uint8) uint64 {
- s.lock.RLock()
- defer s.lock.RUnlock()
- return s.bucketCnt[po]
-}
-
-// Put adds a chunk to the database, adding indices and incrementing global counters.
-// If it already exists, it merely increments the access count of the existing entry.
-// Is thread safe
-func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
- metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
- log.Trace("ldbstore.put", "key", chunk.Address())
-
- ikey := getIndexKey(chunk.Address())
- var index dpaDBIndex
-
- po := s.po(chunk.Address())
-
- s.lock.Lock()
-
- if s.closed {
- s.lock.Unlock()
- return ErrDBClosed
- }
- batch := s.batch
-
- log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
- _, err := s.db.Get(ikey)
- if err != nil {
- s.doPut(chunk, &index, po)
- }
- idata := encodeIndex(&index)
- s.batch.Put(ikey, idata)
-
- // add the access-chunkindex index for garbage collection
- gcIdxKey := getGCIdxKey(&index)
- gcIdxData := getGCIdxValue(&index, po, chunk.Address())
- s.batch.Put(gcIdxKey, gcIdxData)
- s.lock.Unlock()
-
- select {
- case s.batchesC <- struct{}{}:
- default:
- }
-
- select {
- case <-batch.c:
- return batch.err
- case <-ctx.Done():
- return ctx.Err()
- }
-}
-
-// force putting into db, does not check or update necessary indices
-func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
- data := s.encodeDataFunc(chunk)
- dkey := getDataKey(s.dataIdx, po)
- s.batch.Put(dkey, data)
- index.Idx = s.dataIdx
- s.bucketCnt[po] = s.dataIdx
- s.entryCnt++
- dbEntryCount.Inc(1)
- s.dataIdx++
- index.Access = s.accessCnt
- s.accessCnt++
- cntKey := make([]byte, 2)
- cntKey[0] = keyDistanceCnt
- cntKey[1] = po
- s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
-}
-
-func (s *LDBStore) writeBatches() {
- for {
- select {
- case <-s.quit:
- log.Debug("DbStore: quit batch write loop")
- return
- case <-s.batchesC:
- err := s.writeCurrentBatch()
- if err != nil {
- log.Debug("DbStore: quit batch write loop", "err", err.Error())
- return
- }
- }
- }
-
-}
-
-func (s *LDBStore) writeCurrentBatch() error {
- s.lock.Lock()
- defer s.lock.Unlock()
- b := s.batch
- l := b.Len()
- if l == 0 {
- return nil
- }
- s.batch = newBatch()
- b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt)
- close(b.c)
- if s.entryCnt >= s.capacity {
- go s.collectGarbage()
- }
- return nil
-}
-
-// must be called non concurrently
-func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error {
- if wFlag&wEntryCnt > 0 {
- b.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- }
- if wFlag&wIndexCnt > 0 {
- b.Put(keyDataIdx, U64ToBytes(s.dataIdx))
- }
- if wFlag&wAccessCnt > 0 {
- b.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- }
- l := b.Len()
- if err := s.db.Write(b.Batch); err != nil {
- return fmt.Errorf("unable to write batch: %v", err)
- }
- log.Trace(fmt.Sprintf("batch write (%d entries)", l))
- return nil
-}
-
-// newMockEncodeDataFunc returns a function that stores the chunk data
-// to a mock store to bypass the default functionality encodeData.
-// The constructed function always returns the nil data, as DbStore does
-// not need to store the data, but still need to create the index.
-func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
- return func(chunk Chunk) []byte {
- if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
- log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
- }
- return chunk.Address()[:]
- }
-}
-
-// tryAccessIdx tries to find index entry. If found then increments the access
-// count for garbage collection and returns the index entry and true for found,
-// otherwise returns nil and false.
-func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
- ikey := getIndexKey(addr)
- idata, err := s.db.Get(ikey)
- if err != nil {
- return nil, false
- }
-
- index := new(dpaDBIndex)
- decodeIndex(idata, index)
- oldGCIdxKey := getGCIdxKey(index)
- s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- index.Access = s.accessCnt
- idata = encodeIndex(index)
- s.accessCnt++
- s.batch.Put(ikey, idata)
- newGCIdxKey := getGCIdxKey(index)
- newGCIdxData := getGCIdxValue(index, po, ikey[1:])
- s.batch.Delete(oldGCIdxKey)
- s.batch.Put(newGCIdxKey, newGCIdxData)
- select {
- case s.batchesC <- struct{}{}:
- default:
- }
- return index, true
-}
-
-// GetSchema is returning the current named schema of the datastore as read from LevelDB
-func (s *LDBStore) GetSchema() (string, error) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- data, err := s.db.Get(keySchema)
- if err != nil {
- if err == leveldb.ErrNotFound {
- return DbSchemaNone, nil
- }
- return "", err
- }
-
- return string(data), nil
-}
-
-// PutSchema is saving a named schema to the LevelDB datastore
-func (s *LDBStore) PutSchema(schema string) error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- return s.db.Put(keySchema, []byte(schema))
-}
-
-// Get retrieves the chunk matching the provided key from the database.
-// If the chunk entry does not exist, it returns an error
-// Updates access count and is thread safe
-func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
- metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
- log.Trace("ldbstore.get", "key", addr)
-
- s.lock.Lock()
- defer s.lock.Unlock()
- return s.get(addr)
-}
-
-// Has queries the underlying DB if a chunk with the given address is stored
-// Returns true if the chunk is found, false if not
-func (s *LDBStore) Has(_ context.Context, addr Address) bool {
- s.lock.RLock()
- defer s.lock.RUnlock()
-
- ikey := getIndexKey(addr)
- _, err := s.db.Get(ikey)
-
- return err == nil
-}
-
-// TODO: To conform with other private methods of this object indices should not be updated
-func (s *LDBStore) get(addr Address) (chunk Chunk, err error) {
- if s.closed {
- return nil, ErrDBClosed
- }
- proximity := s.po(addr)
- index, found := s.tryAccessIdx(addr, proximity)
- if found {
- var data []byte
- if s.getDataFunc != nil {
- // if getDataFunc is defined, use it to retrieve the chunk data
- log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
- data, err = s.getDataFunc(addr)
- if err != nil {
- return
- }
- } else {
- // default DbStore functionality to retrieve chunk data
- datakey := getDataKey(index.Idx, proximity)
- data, err = s.db.Get(datakey)
- log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
- if err != nil {
- log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
- s.deleteNow(index, getIndexKey(addr), s.po(addr))
- if err == leveldb.ErrNotFound {
- return nil, ErrChunkNotFound
- }
- return nil, err
- }
- }
-
- return decodeData(addr, data)
- } else {
- err = ErrChunkNotFound
- }
-
- return
-}
-
-// newMockGetFunc returns a function that reads chunk data from
-// the mock database, which is used as the value for DbStore.getFunc
-// to bypass the default functionality of DbStore with a mock store.
-func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
- return func(addr Address) (data []byte, err error) {
- data, err = mockStore.Get(addr)
- if err == mock.ErrNotFound {
- // preserve ErrChunkNotFound error
- err = ErrChunkNotFound
- }
- return data, err
- }
-}
-
-func (s *LDBStore) setCapacity(c uint64) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- s.capacity = c
-
- for s.entryCnt > c {
- s.collectGarbage()
- }
-}
-
-func (s *LDBStore) Close() {
- close(s.quit)
- s.lock.Lock()
- s.closed = true
- s.lock.Unlock()
- // force writing out current batch
- s.writeCurrentBatch()
- s.db.Close()
-}
-
-// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
-func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
- metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
-
- sincekey := getDataKey(since, po)
- untilkey := getDataKey(until, po)
- it := s.db.NewIterator()
- defer it.Release()
-
- for ok := it.Seek(sincekey); ok; ok = it.Next() {
- metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
-
- dbkey := it.Key()
- if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
- break
- }
- key := make([]byte, 32)
- val := it.Value()
- copy(key, val[:32])
- if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
- break
- }
- }
- return it.Error()
-}
diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go
deleted file mode 100644
index 1cd4947be..000000000
--- a/swarm/storage/ldbstore_test.go
+++ /dev/null
@@ -1,788 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package storage
-
-import (
- "bytes"
- "context"
- "encoding/binary"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "strings"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/swarm/chunk"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
- "github.com/ethereum/go-ethereum/swarm/testutil"
- ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
-)
-
-type testDbStore struct {
- *LDBStore
- dir string
-}
-
-func newTestDbStore(mock bool, trusted bool) (*testDbStore, func(), error) {
- dir, err := ioutil.TempDir("", "bzz-storage-test")
- if err != nil {
- return nil, func() {}, err
- }
-
- var db *LDBStore
- storeparams := NewDefaultStoreParams()
- params := NewLDBStoreParams(storeparams, dir)
- params.Po = testPoFunc
-
- if mock {
- globalStore := mem.NewGlobalStore()
- addr := common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed")
- mockStore := globalStore.NewNodeStore(addr)
-
- db, err = NewMockDbStore(params, mockStore)
- } else {
- db, err = NewLDBStore(params)
- }
-
- cleanup := func() {
- if db != nil {
- db.Close()
- }
- err = os.RemoveAll(dir)
- if err != nil {
- panic(fmt.Sprintf("db cleanup failed: %v", err))
- }
- }
-
- return &testDbStore{db, dir}, cleanup, err
-}
-
-func testPoFunc(k Address) (ret uint8) {
- basekey := make([]byte, 32)
- return uint8(Proximity(basekey, k[:]))
-}
-
-func testDbStoreRandom(n int, mock bool, t *testing.T) {
- db, cleanup, err := newTestDbStore(mock, true)
- defer cleanup()
- if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
- }
- testStoreRandom(db, n, t)
-}
-
-func testDbStoreCorrect(n int, mock bool, t *testing.T) {
- db, cleanup, err := newTestDbStore(mock, false)
- defer cleanup()
- if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
- }
- testStoreCorrect(db, n, t)
-}
-
-func TestMarkAccessed(t *testing.T) {
- db, cleanup, err := newTestDbStore(false, true)
- defer cleanup()
- if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
- }
-
- h := GenerateRandomChunk(chunk.DefaultSize)
-
- db.Put(context.Background(), h)
-
- var index dpaDBIndex
- addr := h.Address()
- idxk := getIndexKey(addr)
-
- idata, err := db.db.Get(idxk)
- if err != nil {
- t.Fatal(err)
- }
- decodeIndex(idata, &index)
-
- if index.Access != 0 {
- t.Fatalf("Expected the access index to be %d, but it is %d", 0, index.Access)
- }
-
- db.MarkAccessed(addr)
- db.writeCurrentBatch()
-
- idata, err = db.db.Get(idxk)
- if err != nil {
- t.Fatal(err)
- }
- decodeIndex(idata, &index)
-
- if index.Access != 1 {
- t.Fatalf("Expected the access index to be %d, but it is %d", 1, index.Access)
- }
-
-}
-
-func TestDbStoreRandom_1(t *testing.T) {
- testDbStoreRandom(1, false, t)
-}
-
-func TestDbStoreCorrect_1(t *testing.T) {
- testDbStoreCorrect(1, false, t)
-}
-
-func TestDbStoreRandom_1k(t *testing.T) {
- testDbStoreRandom(1000, false, t)
-}
-
-func TestDbStoreCorrect_1k(t *testing.T) {
- testDbStoreCorrect(1000, false, t)
-}
-
-func TestMockDbStoreRandom_1(t *testing.T) {
- testDbStoreRandom(1, true, t)
-}
-
-func TestMockDbStoreCorrect_1(t *testing.T) {
- testDbStoreCorrect(1, true, t)
-}
-
-func TestMockDbStoreRandom_1k(t *testing.T) {
- testDbStoreRandom(1000, true, t)
-}
-
-func TestMockDbStoreCorrect_1k(t *testing.T) {
- testDbStoreCorrect(1000, true, t)
-}
-
-func testDbStoreNotFound(t *testing.T, mock bool) {
- db, cleanup, err := newTestDbStore(mock, false)
- defer cleanup()
- if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
- }
-
- _, err = db.Get(context.TODO(), ZeroAddr)
- if err != ErrChunkNotFound {
- t.Errorf("Expected ErrChunkNotFound, got %v", err)
- }
-}
-
-func TestDbStoreNotFound(t *testing.T) {
- testDbStoreNotFound(t, false)
-}
-func TestMockDbStoreNotFound(t *testing.T) {
- testDbStoreNotFound(t, true)
-}
-
-func testIterator(t *testing.T, mock bool) {
- var i int
- var poc uint
- chunkcount := 32
- chunkkeys := NewAddressCollection(chunkcount)
- chunkkeysResults := NewAddressCollection(chunkcount)
-
- db, cleanup, err := newTestDbStore(mock, false)
- defer cleanup()
- if err != nil {
- t.Fatalf("init dbStore failed: %v", err)
- }
-
- chunks := GenerateRandomChunks(chunk.DefaultSize, chunkcount)
-
- for i = 0; i < len(chunks); i++ {
- chunkkeys[i] = chunks[i].Address()
- err := db.Put(context.TODO(), chunks[i])
- if err != nil {
- t.Fatalf("dbStore.Put failed: %v", err)
- }
- }
-
- for i = 0; i < len(chunkkeys); i++ {
- log.Trace(fmt.Sprintf("Chunk array pos %d/%d: '%v'", i, chunkcount, chunkkeys[i]))
- }
- i = 0
- for poc = 0; poc <= 255; poc++ {
- err := db.SyncIterator(0, uint64(chunkkeys.Len()), uint8(poc), func(k Address, n uint64) bool {
- log.Trace(fmt.Sprintf("Got key %v number %d poc %d", k, n, uint8(poc)))
- chunkkeysResults[n] = k
- i++
- return true
- })
- if err != nil {
- t.Fatalf("Iterator call failed: %v", err)
- }
- }
-
- for i = 0; i < chunkcount; i++ {
- if !bytes.Equal(chunkkeys[i], chunkkeysResults[i]) {
- t.Fatalf("Chunk put #%d key '%v' does not match iterator's key '%v'", i, chunkkeys[i], chunkkeysResults[i])
- }
- }
-
-}
-
-func TestIterator(t *testing.T) {
- testIterator(t, false)
-}
-func TestMockIterator(t *testing.T) {
- testIterator(t, true)
-}
-
-func benchmarkDbStorePut(n int, mock bool, b *testing.B) {
- db, cleanup, err := newTestDbStore(mock, true)
- defer cleanup()
- if err != nil {
- b.Fatalf("init dbStore failed: %v", err)
- }
- benchmarkStorePut(db, n, b)
-}
-
-func benchmarkDbStoreGet(n int, mock bool, b *testing.B) {
- db, cleanup, err := newTestDbStore(mock, true)
- defer cleanup()
- if err != nil {
- b.Fatalf("init dbStore failed: %v", err)
- }
- benchmarkStoreGet(db, n, b)
-}
-
-func BenchmarkDbStorePut_500(b *testing.B) {
- benchmarkDbStorePut(500, false, b)
-}
-
-func BenchmarkDbStoreGet_500(b *testing.B) {
- benchmarkDbStoreGet(500, false, b)
-}
-
-func BenchmarkMockDbStorePut_500(b *testing.B) {
- benchmarkDbStorePut(500, true, b)
-}
-
-func BenchmarkMockDbStoreGet_500(b *testing.B) {
- benchmarkDbStoreGet(500, true, b)
-}
-
-// TestLDBStoreWithoutCollectGarbage tests that we can put a number of random chunks in the LevelDB store, and
-// retrieve them, provided we don't hit the garbage collection
-func TestLDBStoreWithoutCollectGarbage(t *testing.T) {
- capacity := 50
- n := 10
-
- ldb, cleanup := newLDBStore(t)
- ldb.setCapacity(uint64(capacity))
- defer cleanup()
-
- chunks, err := mputRandomChunks(ldb, n)
- if err != nil {
- t.Fatal(err.Error())
- }
-
- log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
-
- for _, ch := range chunks {
- ret, err := ldb.Get(context.TODO(), ch.Address())
- if err != nil {
- t.Fatal(err)
- }
-
- if !bytes.Equal(ret.Data(), ch.Data()) {
- t.Fatal("expected to get the same data back, but got smth else")
- }
- }
-
- if ldb.entryCnt != uint64(n) {
- t.Fatalf("expected entryCnt to be equal to %v, but got %v", n, ldb.entryCnt)
- }
-
- if ldb.accessCnt != uint64(2*n) {
- t.Fatalf("expected accessCnt to be equal to %v, but got %v", 2*n, ldb.accessCnt)
- }
-}
-
-// TestLDBStoreCollectGarbage tests that we can put more chunks than LevelDB's capacity, and
-// retrieve only some of them, because garbage collection must have partially cleared the store
-// Also tests that we can delete chunks and that we can trigger garbage collection
-func TestLDBStoreCollectGarbage(t *testing.T) {
-
- // below max ronud
- initialCap := defaultMaxGCRound / 100
- cap := initialCap / 2
- t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage)
-
- if testutil.RaceEnabled {
- t.Skip("only the simplest case run as others are flaky with race")
- // Note: some tests fail consistently and even locally with `-race`
- }
-
- t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage)
-
- // at max round
- cap = initialCap
- t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage)
- t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage)
-
- // more than max around, not on threshold
- cap = initialCap + 500
- t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage)
- t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage)
-
-}
-
-func testLDBStoreCollectGarbage(t *testing.T) {
- params := strings.Split(t.Name(), "/")
- capacity, err := strconv.Atoi(params[2])
- if err != nil {
- t.Fatal(err)
- }
- n, err := strconv.Atoi(params[3])
- if err != nil {
- t.Fatal(err)
- }
-
- ldb, cleanup := newLDBStore(t)
- ldb.setCapacity(uint64(capacity))
- defer cleanup()
-
- // retrieve the gc round target count for the db capacity
- ldb.startGC(capacity)
- roundTarget := ldb.gc.target
-
- // split put counts to gc target count threshold, and wait for gc to finish in between
- var allChunks []Chunk
- remaining := n
- for remaining > 0 {
- var putCount int
- if remaining < roundTarget {
- putCount = remaining
- } else {
- putCount = roundTarget
- }
- remaining -= putCount
- chunks, err := mputRandomChunks(ldb, putCount)
- if err != nil {
- t.Fatal(err.Error())
- }
- allChunks = append(allChunks, chunks...)
- ldb.lock.RLock()
- log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n)
- ldb.lock.RUnlock()
-
- waitGc(ldb)
- }
-
- // attempt gets on all put chunks
- var missing int
- for _, ch := range allChunks {
- ret, err := ldb.Get(context.TODO(), ch.Address())
- if err == ErrChunkNotFound || err == ldberrors.ErrNotFound {
- missing++
- continue
- }
- if err != nil {
- t.Fatal(err)
- }
-
- if !bytes.Equal(ret.Data(), ch.Data()) {
- t.Fatal("expected to get the same data back, but got smth else")
- }
-
- log.Trace("got back chunk", "chunk", ret)
- }
-
- // all surplus chunks should be missing
- expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget)
- if missing != expectMissing {
- t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", expectMissing, missing)
- }
-
- log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
-}
-
-// TestLDBStoreAddRemove tests that we can put and then delete a given chunk
-func TestLDBStoreAddRemove(t *testing.T) {
- ldb, cleanup := newLDBStore(t)
- ldb.setCapacity(200)
- defer cleanup()
-
- n := 100
- chunks, err := mputRandomChunks(ldb, n)
- if err != nil {
- t.Fatalf(err.Error())
- }
-
- for i := 0; i < n; i++ {
- // delete all even index chunks
- if i%2 == 0 {
- ldb.Delete(chunks[i].Address())
- }
- }
-
- log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
-
- for i := 0; i < n; i++ {
- ret, err := ldb.Get(context.TODO(), chunks[i].Address())
-
- if i%2 == 0 {
- // expect even chunks to be missing
- if err == nil {
- t.Fatal("expected chunk to be missing, but got no error")
- }
- } else {
- // expect odd chunks to be retrieved successfully
- if err != nil {
- t.Fatalf("expected no error, but got %s", err)
- }
-
- if !bytes.Equal(ret.Data(), chunks[i].Data()) {
- t.Fatal("expected to get the same data back, but got smth else")
- }
- }
- }
-}
-
-func testLDBStoreRemoveThenCollectGarbage(t *testing.T) {
- t.Skip("flaky with -race flag")
-
- params := strings.Split(t.Name(), "/")
- capacity, err := strconv.Atoi(params[2])
- if err != nil {
- t.Fatal(err)
- }
- n, err := strconv.Atoi(params[3])
- if err != nil {
- t.Fatal(err)
- }
-
- ldb, cleanup := newLDBStore(t)
- defer cleanup()
- ldb.setCapacity(uint64(capacity))
-
- // put capacity count number of chunks
- chunks := make([]Chunk, n)
- for i := 0; i < n; i++ {
- c := GenerateRandomChunk(chunk.DefaultSize)
- chunks[i] = c
- log.Trace("generate random chunk", "idx", i, "chunk", c)
- }
-
- for i := 0; i < n; i++ {
- err := ldb.Put(context.TODO(), chunks[i])
- if err != nil {
- t.Fatal(err)
- }
- }
-
- waitGc(ldb)
-
- // delete all chunks
- // (only count the ones actually deleted, the rest will have been gc'd)
- deletes := 0
- for i := 0; i < n; i++ {
- if ldb.Delete(chunks[i].Address()) == nil {
- deletes++
- }
- }
-
- log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
-
- if ldb.entryCnt != 0 {
- t.Fatalf("ldb.entrCnt expected 0 got %v", ldb.entryCnt)
- }
-
- // the manual deletes will have increased accesscnt, so we need to add this when we verify the current count
- expAccessCnt := uint64(n)
- if ldb.accessCnt != expAccessCnt {
- t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.accessCnt)
- }
-
- // retrieve the gc round target count for the db capacity
- ldb.startGC(capacity)
- roundTarget := ldb.gc.target
-
- remaining := n
- var puts int
- for remaining > 0 {
- var putCount int
- if remaining < roundTarget {
- putCount = remaining
- } else {
- putCount = roundTarget
- }
- remaining -= putCount
- for putCount > 0 {
- ldb.Put(context.TODO(), chunks[puts])
- ldb.lock.RLock()
- log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n, "puts", puts, "remaining", remaining, "roundtarget", roundTarget)
- ldb.lock.RUnlock()
- puts++
- putCount--
- }
-
- waitGc(ldb)
- }
-
- // expect first surplus chunks to be missing, because they have the smallest access value
- expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget)
- for i := 0; i < expectMissing; i++ {
- _, err := ldb.Get(context.TODO(), chunks[i].Address())
- if err == nil {
- t.Fatalf("expected surplus chunk %d to be missing, but got no error", i)
- }
- }
-
- // expect last chunks to be present, as they have the largest access value
- for i := expectMissing; i < n; i++ {
- ret, err := ldb.Get(context.TODO(), chunks[i].Address())
- if err != nil {
- t.Fatalf("chunk %v: expected no error, but got %s", i, err)
- }
- if !bytes.Equal(ret.Data(), chunks[i].Data()) {
- t.Fatal("expected to get the same data back, but got smth else")
- }
- }
-}
-
-// TestLDBStoreCollectGarbageAccessUnlikeIndex tests garbage collection where accesscount differs from indexcount
-func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
-
- capacity := defaultMaxGCRound / 100 * 2
- n := capacity - 1
-
- ldb, cleanup := newLDBStore(t)
- ldb.setCapacity(uint64(capacity))
- defer cleanup()
-
- chunks, err := mputRandomChunks(ldb, n)
- if err != nil {
- t.Fatal(err.Error())
- }
- log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
-
- // set first added capacity/2 chunks to highest accesscount
- for i := 0; i < capacity/2; i++ {
- _, err := ldb.Get(context.TODO(), chunks[i].Address())
- if err != nil {
- t.Fatalf("fail add chunk #%d - %s: %v", i, chunks[i].Address(), err)
- }
- }
- _, err = mputRandomChunks(ldb, 2)
- if err != nil {
- t.Fatal(err.Error())
- }
-
- // wait for garbage collection to kick in on the responsible actor
- waitGc(ldb)
-
- var missing int
- for i, ch := range chunks[2 : capacity/2] {
- ret, err := ldb.Get(context.TODO(), ch.Address())
- if err == ErrChunkNotFound || err == ldberrors.ErrNotFound {
- t.Fatalf("fail find chunk #%d - %s: %v", i, ch.Address(), err)
- }
-
- if !bytes.Equal(ret.Data(), ch.Data()) {
- t.Fatal("expected to get the same data back, but got smth else")
- }
- log.Trace("got back chunk", "chunk", ret)
- }
-
- log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
-}
-
-func TestCleanIndex(t *testing.T) {
- if testutil.RaceEnabled {
- t.Skip("disabled because it times out with race detector")
- }
-
- capacity := 5000
- n := 3
-
- ldb, cleanup := newLDBStore(t)
- ldb.setCapacity(uint64(capacity))
- defer cleanup()
-
- chunks, err := mputRandomChunks(ldb, n)
- if err != nil {
- t.Fatal(err)
- }
-
- // remove the data of the first chunk
- po := ldb.po(chunks[0].Address()[:])
- dataKey := make([]byte, 10)
- dataKey[0] = keyData
- dataKey[1] = byte(po)
- // dataKey[2:10] = first chunk has storageIdx 0 on [2:10]
- if _, err := ldb.db.Get(dataKey); err != nil {
- t.Fatal(err)
- }
- if err := ldb.db.Delete(dataKey); err != nil {
- t.Fatal(err)
- }
-
- // remove the gc index row for the first chunk
- gcFirstCorrectKey := make([]byte, 9)
- gcFirstCorrectKey[0] = keyGCIdx
- if err := ldb.db.Delete(gcFirstCorrectKey); err != nil {
- t.Fatal(err)
- }
-
- // warp the gc data of the second chunk
- // this data should be correct again after the clean
- gcSecondCorrectKey := make([]byte, 9)
- gcSecondCorrectKey[0] = keyGCIdx
- binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1))
- gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey)
- if err != nil {
- t.Fatal(err)
- }
- warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1)
- copy(warpedGCVal[1:], gcSecondCorrectVal)
- if err := ldb.db.Delete(gcSecondCorrectKey); err != nil {
- t.Fatal(err)
- }
- if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil {
- t.Fatal(err)
- }
-
- if err := ldb.CleanGCIndex(); err != nil {
- t.Fatal(err)
- }
-
- // the index without corresponding data should have been deleted
- idxKey := make([]byte, 33)
- idxKey[0] = keyIndex
- copy(idxKey[1:], chunks[0].Address())
- if _, err := ldb.db.Get(idxKey); err == nil {
- t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey)
- }
-
- // the two other indices should be present
- copy(idxKey[1:], chunks[1].Address())
- if _, err := ldb.db.Get(idxKey); err != nil {
- t.Fatalf("expected chunk 1 idx to be present: %v", idxKey)
- }
-
- copy(idxKey[1:], chunks[2].Address())
- if _, err := ldb.db.Get(idxKey); err != nil {
- t.Fatalf("expected chunk 2 idx to be present: %v", idxKey)
- }
-
- // first gc index should still be gone
- if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil {
- t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey)
- }
-
- // second gc index should still be fixed
- if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
- t.Fatalf("expected gc 1 idx to be present: %v", idxKey)
- }
-
- // third gc index should be unchanged
- binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2))
- if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
- t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
- }
-
- c, err := ldb.db.Get(keyEntryCnt)
- if err != nil {
- t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
- }
-
- // entrycount should now be one less
- entryCount := binary.BigEndian.Uint64(c)
- if entryCount != 2 {
- t.Fatalf("expected entrycnt to be 2, was %d", c)
- }
-
- // the chunks might accidentally be in the same bin
- // if so that bin counter will now be 2 - the highest added index.
- // if not, the total of them will be 3
- poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())}
- if poBins[0] == poBins[1] {
- poBins = poBins[:1]
- }
-
- var binTotal uint64
- var currentBin [2]byte
- currentBin[0] = keyDistanceCnt
- if len(poBins) == 1 {
- currentBin[1] = poBins[0]
- c, err := ldb.db.Get(currentBin[:])
- if err != nil {
- t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
- }
- binCount := binary.BigEndian.Uint64(c)
- if binCount != 2 {
- t.Fatalf("expected entrycnt to be 2, was %d", binCount)
- }
- } else {
- for _, bin := range poBins {
- currentBin[1] = bin
- c, err := ldb.db.Get(currentBin[:])
- if err != nil {
- t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
- }
- binCount := binary.BigEndian.Uint64(c)
- binTotal += binCount
-
- }
- if binTotal != 3 {
- t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
- }
- }
-
- // check that the iterator quits properly
- chunks, err = mputRandomChunks(ldb, 4100)
- if err != nil {
- t.Fatal(err)
- }
-
- po = ldb.po(chunks[4099].Address()[:])
- dataKey = make([]byte, 10)
- dataKey[0] = keyData
- dataKey[1] = byte(po)
- binary.BigEndian.PutUint64(dataKey[2:], 4099+3)
- if _, err := ldb.db.Get(dataKey); err != nil {
- t.Fatal(err)
- }
- if err := ldb.db.Delete(dataKey); err != nil {
- t.Fatal(err)
- }
-
- if err := ldb.CleanGCIndex(); err != nil {
- t.Fatal(err)
- }
-
- // entrycount should now be one less of added chunks
- c, err = ldb.db.Get(keyEntryCnt)
- if err != nil {
- t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
- }
- entryCount = binary.BigEndian.Uint64(c)
- if entryCount != 4099+2 {
- t.Fatalf("expected entrycnt to be 2, was %d", c)
- }
-}
-
-// Note: waitGc does not guarantee that we wait 1 GC round; it only
-// guarantees that if the GC is running we wait for that run to finish
-// ticket: https://github.com/ethersphere/go-ethereum/issues/1151
-func waitGc(ldb *LDBStore) {
- <-ldb.gc.runC
- ldb.gc.runC <- struct{}{}
-}
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
deleted file mode 100644
index a8f6f037f..000000000
--- a/swarm/storage/localstore.go
+++ /dev/null
@@ -1,251 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package storage
-
-import (
- "context"
- "path/filepath"
- "sync"
-
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/storage/mock"
-)
-
-type LocalStoreParams struct {
- *StoreParams
- ChunkDbPath string
- Validators []ChunkValidator `toml:"-"`
-}
-
-func NewDefaultLocalStoreParams() *LocalStoreParams {
- return &LocalStoreParams{
- StoreParams: NewDefaultStoreParams(),
- }
-}
-
-//this can only finally be set after all config options (file, cmd line, env vars)
-//have been evaluated
-func (p *LocalStoreParams) Init(path string) {
- if p.ChunkDbPath == "" {
- p.ChunkDbPath = filepath.Join(path, "chunks")
- }
-}
-
-// LocalStore is a combination of inmemory db over a disk persisted db
-// implements a Get/Put with fallback (caching) logic using any 2 ChunkStores
-type LocalStore struct {
- Validators []ChunkValidator
- memStore *MemStore
- DbStore *LDBStore
- mu sync.Mutex
-}
-
-// This constructor uses MemStore and DbStore as components
-func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalStore, error) {
- ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath)
- dbStore, err := NewMockDbStore(ldbparams, mockStore)
- if err != nil {
- return nil, err
- }
- return &LocalStore{
- memStore: NewMemStore(params.StoreParams, dbStore),
- DbStore: dbStore,
- Validators: params.Validators,
- }, nil
-}
-
-func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) {
- ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath)
- dbStore, err := NewLDBStore(ldbparams)
- if err != nil {
- return nil, err
- }
- localStore := &LocalStore{
- memStore: NewMemStore(params.StoreParams, dbStore),
- DbStore: dbStore,
- Validators: params.Validators,
- }
- return localStore, nil
-}
-
-// isValid returns true if chunk passes any of the LocalStore Validators.
-// isValid also returns true if LocalStore has no Validators.
-func (ls *LocalStore) isValid(chunk Chunk) bool {
- // by default chunks are valid. if we have 0 validators, then all chunks are valid.
- valid := true
-
- // ls.Validators contains a list of one validator per chunk type.
- // if one validator succeeds, then the chunk is valid
- for _, v := range ls.Validators {
- if valid = v.Validate(chunk); valid {
- break
- }
- }
- return valid
-}
-
-// Put is responsible for doing validation and storage of the chunk
-// by using configured ChunkValidators, MemStore and LDBStore.
-// If the chunk is not valid, its GetErrored function will
-// return ErrChunkInvalid.
-// This method will check if the chunk is already in the MemStore
-// and it will return it if it is. If there is an error from
-// the MemStore.Get, it will be returned by calling GetErrored
-// on the chunk.
-// This method is responsible for closing Chunk.ReqC channel
-// when the chunk is stored in memstore.
-// After the LDBStore.Put, it is ensured that the MemStore
-// contains the chunk with the same data, but nil ReqC channel.
-func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error {
- if !ls.isValid(chunk) {
- return ErrChunkInvalid
- }
-
- log.Trace("localstore.put", "key", chunk.Address())
- ls.mu.Lock()
- defer ls.mu.Unlock()
-
- _, err := ls.memStore.Get(ctx, chunk.Address())
- if err == nil {
- return nil
- }
- if err != nil && err != ErrChunkNotFound {
- return err
- }
- ls.memStore.Put(ctx, chunk)
- err = ls.DbStore.Put(ctx, chunk)
- return err
-}
-
-// Has queries the underlying DbStore if a chunk with the given address
-// is being stored there.
-// Returns true if it is stored, false if not
-func (ls *LocalStore) Has(ctx context.Context, addr Address) bool {
- return ls.DbStore.Has(ctx, addr)
-}
-
-// Get(chunk *Chunk) looks up a chunk in the local stores
-// This method is blocking until the chunk is retrieved
-// so additional timeout may be needed to wrap this call if
-// ChunkStores are remote and can have long latency
-func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) {
- ls.mu.Lock()
- defer ls.mu.Unlock()
-
- return ls.get(ctx, addr)
-}
-
-func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) {
- chunk, err = ls.memStore.Get(ctx, addr)
-
- if err != nil && err != ErrChunkNotFound {
- metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
- return nil, err
- }
-
- if err == nil {
- metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
- go ls.DbStore.MarkAccessed(addr)
- return chunk, nil
- }
-
- metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1)
- chunk, err = ls.DbStore.Get(ctx, addr)
- if err != nil {
- metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
- return nil, err
- }
-
- ls.memStore.Put(ctx, chunk)
- return chunk, nil
-}
-
-func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error {
- ls.mu.Lock()
- defer ls.mu.Unlock()
-
- _, err := ls.get(ctx, addr)
- if err == nil {
- return nil
- }
- return func(context.Context) error {
- return err
- }
-}
-
-func (ls *LocalStore) BinIndex(po uint8) uint64 {
- return ls.DbStore.BinIndex(po)
-}
-
-func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
- return ls.DbStore.SyncIterator(from, to, po, f)
-}
-
-// Close the local store
-func (ls *LocalStore) Close() {
- ls.DbStore.Close()
-}
-
-// Migrate checks the datastore schema vs the runtime schema and runs
-// migrations if they don't match
-func (ls *LocalStore) Migrate() error {
- actualDbSchema, err := ls.DbStore.GetSchema()
- if err != nil {
- log.Error(err.Error())
- return err
- }
-
- if actualDbSchema == CurrentDbSchema {
- return nil
- }
-
- log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema)
-
- if actualDbSchema == DbSchemaNone {
- ls.migrateFromNoneToPurity()
- actualDbSchema = DbSchemaPurity
- }
-
- if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
- return err
- }
-
- if actualDbSchema == DbSchemaPurity {
- if err := ls.migrateFromPurityToHalloween(); err != nil {
- return err
- }
- actualDbSchema = DbSchemaHalloween
- }
-
- if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
- return err
- }
- return nil
-}
-
-func (ls *LocalStore) migrateFromNoneToPurity() {
- // delete chunks that are not valid, i.e. chunks that do not pass
- // any of the ls.Validators
- ls.DbStore.Cleanup(func(c Chunk) bool {
- return !ls.isValid(c)
- })
-}
-
-func (ls *LocalStore) migrateFromPurityToHalloween() error {
- return ls.DbStore.CleanGCIndex()
-}
diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go
new file mode 100644
index 000000000..411392b4e
--- /dev/null
+++ b/swarm/storage/localstore/export.go
@@ -0,0 +1,204 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "archive/tar"
+ "context"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+)
+
+const (
+ // filename in tar archive that holds the information
+ // about exported data format version
+ exportVersionFilename = ".swarm-export-version"
+ // legacy version for previous LDBStore
+ legacyExportVersion = "1"
+ // current export format version
+ currentExportVersion = "2"
+)
+
+// Export writes a tar structured data to the writer of
+// all chunks in the retrieval data index. It returns the
+// number of chunks exported.
+func (db *DB) Export(w io.Writer) (count int64, err error) {
+ tw := tar.NewWriter(w)
+ defer tw.Close()
+
+ if err := tw.WriteHeader(&tar.Header{
+ Name: exportVersionFilename,
+ Mode: 0644,
+ Size: int64(len(currentExportVersion)),
+ }); err != nil {
+ return 0, err
+ }
+ if _, err := tw.Write([]byte(currentExportVersion)); err != nil {
+ return 0, err
+ }
+
+ err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ hdr := &tar.Header{
+ Name: hex.EncodeToString(item.Address),
+ Mode: 0644,
+ Size: int64(len(item.Data)),
+ }
+ if err := tw.WriteHeader(hdr); err != nil {
+ return false, err
+ }
+ if _, err := tw.Write(item.Data); err != nil {
+ return false, err
+ }
+ count++
+ return false, nil
+ }, nil)
+
+ return count, err
+}
+
+// Import reads a tar structured data from the reader and
+// stores chunks in the database. It returns the number of
+// chunks imported.
+func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
+ tr := tar.NewReader(r)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ errC := make(chan error)
+ doneC := make(chan struct{})
+ tokenPool := make(chan struct{}, 100)
+ var wg sync.WaitGroup
+ go func() {
+ var (
+ firstFile = true
+ // if exportVersionFilename file is not present
+ // assume legacy version
+ version = legacyExportVersion
+ )
+ for {
+ hdr, err := tr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ if firstFile {
+ firstFile = false
+ if hdr.Name == exportVersionFilename {
+ data, err := ioutil.ReadAll(tr)
+ if err != nil {
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ version = string(data)
+ continue
+ }
+ }
+
+ if len(hdr.Name) != 64 {
+ log.Warn("ignoring non-chunk file", "name", hdr.Name)
+ continue
+ }
+
+ keybytes, err := hex.DecodeString(hdr.Name)
+ if err != nil {
+ log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
+ continue
+ }
+
+ data, err := ioutil.ReadAll(tr)
+ if err != nil {
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ }
+ }
+ key := chunk.Address(keybytes)
+
+ var ch chunk.Chunk
+ switch version {
+ case legacyExportVersion:
+ // LDBStore Export exported chunk data prefixed with the chunk key.
+ // That is not necessary, as the key is in the chunk filename,
+ // but backward compatibility needs to be preserved.
+ ch = chunk.NewChunk(key, data[32:])
+ case currentExportVersion:
+ ch = chunk.NewChunk(key, data)
+ default:
+ select {
+ case errC <- fmt.Errorf("unsupported export data version %q", version):
+ case <-ctx.Done():
+ }
+ }
+ tokenPool <- struct{}{}
+ wg.Add(1)
+
+ go func() {
+ _, err := db.Put(ctx, chunk.ModePutUpload, ch)
+ select {
+ case errC <- err:
+ case <-ctx.Done():
+ wg.Done()
+ <-tokenPool
+ default:
+ _, err := db.Put(ctx, chunk.ModePutUpload, ch)
+ if err != nil {
+ errC <- err
+ }
+ wg.Done()
+ <-tokenPool
+ }
+ }()
+
+ count++
+ }
+ wg.Wait()
+ close(doneC)
+ }()
+
+ // wait for all chunks to be stored
+ for {
+ select {
+ case err := <-errC:
+ if err != nil {
+ return count, err
+ }
+ case <-ctx.Done():
+ return count, ctx.Err()
+ default:
+ select {
+ case <-doneC:
+ return count, nil
+ default:
+ }
+ }
+ }
+}
diff --git a/swarm/storage/localstore/export_test.go b/swarm/storage/localstore/export_test.go
new file mode 100644
index 000000000..d7f848f80
--- /dev/null
+++ b/swarm/storage/localstore/export_test.go
@@ -0,0 +1,80 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "context"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+)
+
+// TestExportImport constructs two databases, one to put and export
+// chunks and another one to import and validate that all chunks are
+// imported.
+func TestExportImport(t *testing.T) {
+ db1, cleanup1 := newTestDB(t, nil)
+ defer cleanup1()
+
+ var chunkCount = 100
+
+ chunks := make(map[string][]byte, chunkCount)
+ for i := 0; i < chunkCount; i++ {
+ ch := generateTestRandomChunk()
+
+ _, err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ chunks[string(ch.Address())] = ch.Data()
+ }
+
+ var buf bytes.Buffer
+
+ c, err := db1.Export(&buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantChunksCount := int64(len(chunks))
+ if c != wantChunksCount {
+ t.Errorf("got export count %v, want %v", c, wantChunksCount)
+ }
+
+ db2, cleanup2 := newTestDB(t, nil)
+ defer cleanup2()
+
+ c, err = db2.Import(&buf, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != wantChunksCount {
+ t.Errorf("got import count %v, want %v", c, wantChunksCount)
+ }
+
+ for a, want := range chunks {
+ addr := chunk.Address([]byte(a))
+ ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got := ch.Data()
+ if !bytes.Equal(got, want) {
+ t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want)
+ }
+ }
+}
diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go
index 84c4f596d..748e0d663 100644
--- a/swarm/storage/localstore/gc.go
+++ b/swarm/storage/localstore/gc.go
@@ -17,7 +17,10 @@
package localstore
import (
+ "time"
+
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
+ metricName := "localstore.gc"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
+ defer func() {
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+ }
+ }()
+
batch := new(leveldb.Batch)
target := db.gcTarget()
@@ -86,12 +98,17 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, true, err
}
+ metrics.GetOrRegisterGauge(metricName+".gcsize", nil).Update(int64(gcSize))
done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if gcSize-collectedCount <= target {
return true, nil
}
+
+ metrics.GetOrRegisterGauge(metricName+".storets", nil).Update(item.StoreTimestamp)
+ metrics.GetOrRegisterGauge(metricName+".accessts", nil).Update(item.AccessTimestamp)
+
// delete from retrieve, pull, gc
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
@@ -109,11 +126,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, false, err
}
+ metrics.GetOrRegisterCounter(metricName+".collected-count", nil).Inc(int64(collectedCount))
db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch)
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".writebatch.err", nil).Inc(1)
return 0, false, err
}
return collectedCount, done, nil
diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go
index 081e0af80..4a6e0a5f4 100644
--- a/swarm/storage/localstore/gc_test.go
+++ b/swarm/storage/localstore/gc_test.go
@@ -17,6 +17,7 @@
package localstore
import (
+ "context"
"io/ioutil"
"math/rand"
"os"
@@ -63,26 +64,23 @@ func testDB_collectGarbageWorker(t *testing.T) {
})()
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
-
addrs := make([]chunk.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- addrs = append(addrs, chunk.Address())
+ addrs = append(addrs, ch.Address())
}
gcTarget := db.gcTarget()
@@ -110,7 +108,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
@@ -118,7 +116,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
@@ -134,9 +132,6 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
})
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
-
testHookCollectGarbageChan := make(chan uint64)
defer setTestHookCollectGarbage(func(collectedCount uint64) {
testHookCollectGarbageChan <- collectedCount
@@ -146,19 +141,19 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload random chunks just up to the capacity
for i := 0; i < int(db.capacity)-1; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- addrs = append(addrs, chunk.Address())
+ addrs = append(addrs, ch.Address())
}
// set update gc test hook to signal when
@@ -172,7 +167,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// request the latest synced chunk
// to prioritize it in the gc index
// not to be collected
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
@@ -191,11 +186,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload and sync another chunk to trigger
// garbage collection
ch := generateTestRandomChunk()
- err = uploader.Put(ch)
+ _, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(ch.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -235,7 +230,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// requested chunk should not be removed
t.Run("get requested chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
@@ -243,7 +238,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// the second synced chunk should be removed
t.Run("get gc-ed chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[1])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[1])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
@@ -251,7 +246,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
- _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
@@ -275,20 +270,17 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err)
}
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
-
count := 100
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = syncer.Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go
index cf19e4f6c..0f23aa10a 100644
--- a/swarm/storage/localstore/index_test.go
+++ b/swarm/storage/localstore/index_test.go
@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
+ "context"
"math/rand"
"testing"
@@ -35,29 +36,22 @@ func TestDB_pullIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
- Chunk: chunk,
- // this timestamp is not the same as in
- // the index, but given that uploads
- // are sequential and that only ordering
- // of events matter, this information is
- // sufficient
- storeTimestamp: now(),
+ Chunk: ch,
+ binID: uint64(i),
}
}
@@ -70,10 +64,10 @@ func TestDB_pullIndex(t *testing.T) {
if poi > poj {
return false
}
- if chunks[i].storeTimestamp < chunks[j].storeTimestamp {
+ if chunks[i].binID < chunks[j].binID {
return true
}
- if chunks[i].storeTimestamp > chunks[j].storeTimestamp {
+ if chunks[i].binID > chunks[j].binID {
return false
}
return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
@@ -87,23 +81,21 @@ func TestDB_gcIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
- Chunk: chunk,
+ Chunk: ch,
}
}
@@ -123,9 +115,9 @@ func TestDB_gcIndex(t *testing.T) {
})()
t.Run("request unsynced", func(t *testing.T) {
- chunk := chunks[1]
+ ch := chunks[1]
- _, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -140,9 +132,9 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("sync one chunk", func(t *testing.T) {
- chunk := chunks[0]
+ ch := chunks[0]
- err := db.NewSetter(ModeSetSync).Set(chunk.Address())
+ err := db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -154,10 +146,8 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("sync all chunks", func(t *testing.T) {
- setter := db.NewSetter(ModeSetSync)
-
for i := range chunks {
- err := setter.Set(chunks[i].Address())
+ err := db.Set(context.Background(), chunk.ModeSetSync, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
@@ -171,7 +161,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request one chunk", func(t *testing.T) {
i := 6
- _, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address())
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
@@ -189,14 +179,13 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("random chunk request", func(t *testing.T) {
- requester := db.NewGetter(ModeGetRequest)
rand.Shuffle(len(chunks), func(i, j int) {
chunks[i], chunks[j] = chunks[j], chunks[i]
})
- for _, chunk := range chunks {
- _, err := requester.Get(chunk.Address())
+ for _, ch := range chunks {
+ _, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -212,7 +201,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("remove one chunk", func(t *testing.T) {
i := 3
- err := db.NewSetter(modeSetRemove).Set(chunks[i].Address())
+ err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
index 98d4c7881..3b0bd8a93 100644
--- a/swarm/storage/localstore/localstore.go
+++ b/swarm/storage/localstore/localstore.go
@@ -23,11 +23,15 @@ import (
"time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
)
+// DB implements chunk.Store.
+var _ chunk.Store = &DB{}
+
var (
// ErrInvalidMode is retuned when an unknown Mode
// is provided to the function.
@@ -69,6 +73,10 @@ type DB struct {
pullTriggers map[uint8][]chan struct{}
pullTriggersMu sync.RWMutex
+ // binIDs stores the latest chunk serial ID for every
+ // proximity order bin
+ binIDs shed.Uint64Vector
+
// garbage collection index
gcIndex shed.Index
@@ -124,7 +132,10 @@ type Options struct {
// One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if o == nil {
- o = new(Options)
+ // default options
+ o = &Options{
+ Capacity: 5000000,
+ }
}
db = &DB{
capacity: o.Capacity,
@@ -148,11 +159,23 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil {
return nil, err
}
+
// Identify current storage schema by arbitrary name.
db.schemaName, err = db.shed.NewStringField("schema-name")
if err != nil {
return nil, err
}
+ schemaName, err := db.schemaName.Get()
+ if err != nil {
+ return nil, err
+ }
+ if schemaName == "" {
+ // initial new localstore run
+ err := db.schemaName.Put(DbSchemaSanctuary)
+ if err != nil {
+ return nil, err
+ }
+ }
// Persist gc size.
db.gcSize, err = db.shed.NewUint64Field("gc-size")
if err != nil {
@@ -165,8 +188,9 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
)
if o.MockStore != nil {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, 8)
- binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ b := make([]byte, 16)
+ binary.BigEndian.PutUint64(b[:8], fields.BinID)
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
err = o.MockStore.Put(fields.Address, fields.Data)
if err != nil {
return nil, err
@@ -174,25 +198,28 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return b, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
+ e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data, err = o.MockStore.Get(keyItem.Address)
return e, err
}
} else {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, 8)
- binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ b := make([]byte, 16)
+ binary.BigEndian.PutUint64(b[:8], fields.BinID)
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
- e.Data = value[8:]
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
+ e.BinID = binary.BigEndian.Uint64(value[:8])
+ e.Data = value[16:]
return e, nil
}
}
- // Index storing actual chunk address, data and store timestamp.
- db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
+ // Index storing actual chunk address, data and bin id.
+ db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
@@ -230,33 +257,37 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return nil, err
}
// pull index allows history and live syncing per po bin
- db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(fields.Address)
- binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp))
- copy(key[9:], fields.Address[:])
+ binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
- e.Address = key[9:]
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9]))
+ e.BinID = binary.BigEndian.Uint64(key[1:9])
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
- return nil, nil
+ return fields.Address, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.Address = value
return e, nil
},
})
if err != nil {
return nil, err
}
+ // create a vector for bin IDs
+ db.binIDs, err = db.shed.NewUint64Vector("bin-ids")
+ if err != nil {
+ return nil, err
+ }
// create a pull syncing triggers used by SubscribePull function
db.pullTriggers = make(map[uint8][]chan struct{})
// push index contains as yet unsynced chunks
- db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
@@ -281,17 +312,17 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
- db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
- binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
+ binary.BigEndian.PutUint64(b[8:16], fields.BinID)
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
+ e.BinID = binary.BigEndian.Uint64(key[8:16])
e.Address = key[16:]
return e, nil
},
@@ -358,3 +389,12 @@ func init() {
return time.Now().UTC().UnixNano()
}
}
+
+// totalTimeMetric logs a message about time between provided start time
+// and the time when the function is called and sends a resetting timer metric
+// with provided name appended with ".total-time".
+func totalTimeMetric(name string, start time.Time) {
+ totalTime := time.Since(start)
+ log.Trace(name+" total time", "time", totalTime)
+ metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
+}
diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go
index 42e762587..6dbc4b7ad 100644
--- a/swarm/storage/localstore/localstore_test.go
+++ b/swarm/storage/localstore/localstore_test.go
@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
+ "context"
"fmt"
"io/ioutil"
"math/rand"
@@ -59,23 +60,23 @@ func TestDB(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- got, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got data %x, want %x", got.Data(), ch.Data())
}
}
@@ -113,19 +114,17 @@ func TestDB_updateGCSem(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- getter := db.NewGetter(ModeGetRequest)
-
// get more chunks then maxParallelUpdateGC
// in time shorter then updateGCSleep
for i := 0; i < 5; i++ {
- _, err = getter.Get(chunk.Address())
+ _, err = db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -237,71 +236,71 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
// newRetrieveIndexesTestWithAccess returns a test function that validates if the right
// chunk values are in the retrieval indexes when access time must be stored.
-func newRetrieveIndexesTestWithAccess(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
- item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0)
if accessTimestamp > 0 {
- item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
+ validateItem(t, item, ch.Address(), nil, 0, accessTimestamp)
}
}
}
// newPullIndexTest returns a test function that validates if the right
// chunk values are in the pull index.
-func newPullIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.pullIndex.Get(shed.Item{
- Address: chunk.Address(),
- StoreTimestamp: storeTimestamp,
+ Address: ch.Address(),
+ BinID: binID,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
- validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), nil, 0, 0)
}
}
}
// newPushIndexTest returns a test function that validates if the right
// chunk values are in the push index.
-func newPushIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.pushIndex.Get(shed.Item{
- Address: chunk.Address(),
+ Address: ch.Address(),
StoreTimestamp: storeTimestamp,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
- validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), nil, storeTimestamp, 0)
}
}
}
// newGCIndexTest returns a test function that validates if the right
// chunk values are in the push index.
-func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.gcIndex.Get(shed.Item{
Address: chunk.Address(),
- StoreTimestamp: storeTimestamp,
+ BinID: binID,
AccessTimestamp: accessTimestamp,
})
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), nil, storeTimestamp, accessTimestamp)
+ validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
}
}
@@ -349,7 +348,7 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {
// in database. It is used for index values validations.
type testIndexChunk struct {
chunk.Chunk
- storeTimestamp int64
+ binID uint64
}
// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go
index a6353e141..efef82858 100644
--- a/swarm/storage/localstore/mode_get.go
+++ b/swarm/storage/localstore/mode_get.go
@@ -17,45 +17,35 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModeGet enumerates different Getter modes.
-type ModeGet int
-
-// Getter modes.
-const (
- // ModeGetRequest: when accessed for retrieval
- ModeGetRequest ModeGet = iota
- // ModeGetSync: when accessed for syncing or proof of custody request
- ModeGetSync
-)
-
-// Getter provides Get method to retrieve Chunks
-// from database.
-type Getter struct {
- db *DB
- mode ModeGet
-}
-
-// NewGetter returns a new Getter on database
-// with a specific Mode.
-func (db *DB) NewGetter(mode ModeGet) *Getter {
- return &Getter{
- mode: mode,
- db: db,
- }
-}
-
// Get returns a chunk from the database. If the chunk is
// not found chunk.ErrChunkNotFound will be returned.
// All required indexes will be updated required by the
-// Getter Mode.
-func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
- out, err := g.db.get(g.mode, addr)
+// Getter Mode. Get is required to implement chunk.Store
+// interface.
+func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
+ metricName := fmt.Sprintf("localstore.Get.%s", mode)
+
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
+
+ defer func() {
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+ }
+ }()
+
+ out, err := db.get(mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
@@ -67,7 +57,7 @@ func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
// get returns Item from the retrieval index
// and updates other indexes.
-func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
+func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err error) {
item := addressToItem(addr)
out, err = db.retrievalDataIndex.Get(item)
@@ -76,7 +66,7 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
}
switch mode {
// update the access timestamp and gc index
- case ModeGetRequest:
+ case chunk.ModeGetRequest:
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
@@ -90,8 +80,14 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
// for a new goroutine
defer func() { <-db.updateGCSem }()
}
+
+ metricName := "localstore.updateGC"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
+
err := db.updateGC(out)
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
@@ -101,7 +97,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
}()
// no updates to indexes
- case ModeGetSync:
+ case chunk.ModeGetSync:
+ case chunk.ModeGetLookup:
default:
return out, ErrInvalidMode
}
diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go
index 28a70ee0c..217fa5d2d 100644
--- a/swarm/storage/localstore/mode_get_test.go
+++ b/swarm/storage/localstore/mode_get_test.go
@@ -18,8 +18,11 @@ package localstore
import (
"bytes"
+ "context"
"testing"
"time"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
)
// TestModeGetRequest validates ModeGetRequest index values on the provided DB.
@@ -32,15 +35,13 @@ func TestModeGetRequest(t *testing.T) {
return uploadTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- requester := db.NewGetter(ModeGetRequest)
-
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
@@ -52,22 +53,22 @@ func TestModeGetRequest(t *testing.T) {
})()
t.Run("get unsynced", func(t *testing.T) {
- got, err := requester.Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
@@ -75,30 +76,30 @@ func TestModeGetRequest(t *testing.T) {
})
// set chunk to synced state
- err = db.NewSetter(ModeSetSync).Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
t.Run("first get", func(t *testing.T) {
- got, err := requester.Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, uploadTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp))
- t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, uploadTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -111,24 +112,24 @@ func TestModeGetRequest(t *testing.T) {
return accessTimestamp
})()
- got, err := requester.Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, accessTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, accessTimestamp))
- t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, accessTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -146,27 +147,27 @@ func TestModeGetSync(t *testing.T) {
return uploadTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- got, err := db.NewGetter(ModeGetSync).Get(chunk.Address())
+ got, err := db.Get(context.Background(), chunk.ModeGetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- if !bytes.Equal(got.Address(), chunk.Address()) {
- t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ if !bytes.Equal(got.Address(), ch.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
diff --git a/swarm/storage/localstore/mode_has.go b/swarm/storage/localstore/mode_has.go
index 90feaceef..a70ee31b2 100644
--- a/swarm/storage/localstore/mode_has.go
+++ b/swarm/storage/localstore/mode_has.go
@@ -17,23 +17,23 @@
package localstore
import (
+ "context"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
)
-// Hasser provides Has method to retrieve Chunks
-// from database.
-type Hasser struct {
- db *DB
-}
+// Has returns true if the chunk is stored in database.
+func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
+ metricName := "localstore.Has"
-// NewHasser returns a new Hasser on database.
-func (db *DB) NewHasser() *Hasser {
- return &Hasser{
- db: db,
- }
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// Has returns true if the chunk is stored in database.
-func (h *Hasser) Has(addr chunk.Address) (bool, error) {
- return h.db.retrievalDataIndex.Has(addressToItem(addr))
+ has, err := db.retrievalDataIndex.Has(addressToItem(addr))
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+ }
+ return has, err
}
diff --git a/swarm/storage/localstore/mode_has_test.go b/swarm/storage/localstore/mode_has_test.go
index 332616ca2..043b21a2b 100644
--- a/swarm/storage/localstore/mode_has_test.go
+++ b/swarm/storage/localstore/mode_has_test.go
@@ -17,7 +17,10 @@
package localstore
import (
+ "context"
"testing"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
)
// TestHas validates that Hasser is returning true for
@@ -26,16 +29,14 @@ func TestHas(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- hasser := db.NewHasser()
-
- has, err := hasser.Has(chunk.Address())
+ has, err := db.Has(context.Background(), ch.Address())
if err != nil {
t.Fatal(err)
}
@@ -45,7 +46,7 @@ func TestHas(t *testing.T) {
missingChunk := generateTestRandomChunk()
- has, err = hasser.Has(missingChunk.Address())
+ has, err = db.Has(context.Background(), missingChunk.Address())
if err != nil {
t.Fatal(err)
}
diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go
index 1599ca8e3..a8e355ad0 100644
--- a/swarm/storage/localstore/mode_put.go
+++ b/swarm/storage/localstore/mode_put.go
@@ -17,44 +17,31 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModePut enumerates different Putter modes.
-type ModePut int
-
-// Putter modes.
-const (
- // ModePutRequest: when a chunk is received as a result of retrieve request and delivery
- ModePutRequest ModePut = iota
- // ModePutSync: when a chunk is received via syncing
- ModePutSync
- // ModePutUpload: when a chunk is created by local upload
- ModePutUpload
-)
+// Put stores the Chunk to database and depending
+// on the Putter mode, it updates required indexes.
+// Put is required to implement chunk.Store
+// interface.
+func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
+ metricName := fmt.Sprintf("localstore.Put.%s", mode)
-// Putter provides Put method to store Chunks
-// to database.
-type Putter struct {
- db *DB
- mode ModePut
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// NewPutter returns a new Putter on database
-// with a specific Mode.
-func (db *DB) NewPutter(mode ModePut) *Putter {
- return &Putter{
- mode: mode,
- db: db,
+ exists, err = db.put(mode, chunkToItem(ch))
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
-}
-
-// Put stores the Chunk to database and depending
-// on the Putter mode, it updates required indexes.
-func (p *Putter) Put(ch chunk.Chunk) (err error) {
- return p.db.put(p.mode, chunkToItem(ch))
+ return exists, err
}
// put stores Item to database and updates other
@@ -62,7 +49,7 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) {
// of this function for the same address in parallel.
// Item fields Address and Data must not be
// with their nil values.
-func (db *DB) put(mode ModePut, item shed.Item) (err error) {
+func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
@@ -76,7 +63,7 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
var triggerPushFeed bool // signal push feed subscriptions to iterate
switch mode {
- case ModePutRequest:
+ case chunk.ModePutRequest:
// put to indexes: retrieve, gc; it does not enter the syncpool
// check if the chunk already is in the database
@@ -84,20 +71,25 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
+ exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
+ exists = false
// no chunk accesses
default:
- return err
+ return false, err
}
i, err = db.retrievalDataIndex.Get(item)
switch err {
case nil:
+ exists = true
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
case leveldb.ErrNotFound:
// no chunk accesses
+ exists = false
default:
- return err
+ return false, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
@@ -107,6 +99,12 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
+ if item.BinID == 0 {
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ }
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
@@ -117,36 +115,56 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
db.retrievalDataIndex.PutInBatch(batch, item)
- case ModePutUpload:
+ case chunk.ModePutUpload:
// put to indexes: retrieve, push, pull
- item.StoreTimestamp = now()
- db.retrievalDataIndex.PutInBatch(batch, item)
- db.pullIndex.PutInBatch(batch, item)
- triggerPullFeed = true
- db.pushIndex.PutInBatch(batch, item)
- triggerPushFeed = true
+ exists, err = db.retrievalDataIndex.Has(item)
+ if err != nil {
+ return false, err
+ }
+ if !exists {
+ item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ db.pushIndex.PutInBatch(batch, item)
+ triggerPushFeed = true
+ }
- case ModePutSync:
+ case chunk.ModePutSync:
// put to indexes: retrieve, pull
- item.StoreTimestamp = now()
- db.retrievalDataIndex.PutInBatch(batch, item)
- db.pullIndex.PutInBatch(batch, item)
- triggerPullFeed = true
+ exists, err = db.retrievalDataIndex.Has(item)
+ if err != nil {
+ return exists, err
+ }
+ if !exists {
+ item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ }
default:
- return ErrInvalidMode
+ return false, ErrInvalidMode
}
err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
- return err
+ return false, err
}
err = db.shed.WriteBatch(batch)
if err != nil {
- return err
+ return false, err
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
@@ -154,5 +172,5 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
if triggerPushFeed {
db.triggerPushSubscriptions()
}
- return nil
+ return exists, nil
}
diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go
index 8ecae1d2e..5376aa8b3 100644
--- a/swarm/storage/localstore/mode_put_test.go
+++ b/swarm/storage/localstore/mode_put_test.go
@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
+ "context"
"fmt"
"sync"
"testing"
@@ -31,9 +32,7 @@ func TestModePutRequest(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- putter := db.NewPutter(ModePutRequest)
-
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
// keep the record when the chunk is stored
var storeTimestamp int64
@@ -46,12 +45,12 @@ func TestModePutRequest(t *testing.T) {
storeTimestamp = wantTimestamp
- err := putter.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutRequest, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -64,12 +63,12 @@ func TestModePutRequest(t *testing.T) {
return wantTimestamp
})()
- err := putter.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutRequest, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, storeTimestamp, wantTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, storeTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -87,16 +86,16 @@ func TestModePutSync(t *testing.T) {
return wantTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutSync).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0))
- t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
}
// TestModePutUpload validates ModePutUpload index values on the provided DB.
@@ -109,18 +108,18 @@ func TestModePutUpload(t *testing.T) {
return wantTimestamp
})()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0))
- t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
- t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("push index", newPushIndexTest(db, ch, wantTimestamp, nil))
}
// TestModePutUpload_parallel uploads chunks in parallel
@@ -140,14 +139,13 @@ func TestModePutUpload_parallel(t *testing.T) {
// start uploader workers
for i := 0; i < workerCount; i++ {
go func(i int) {
- uploader := db.NewPutter(ModePutUpload)
for {
select {
- case chunk, ok := <-chunkChan:
+ case ch, ok := <-chunkChan:
if !ok {
return
}
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
select {
case errChan <- err:
case <-doneChan:
@@ -188,21 +186,85 @@ func TestModePutUpload_parallel(t *testing.T) {
}
// get every chunk and validate its data
- getter := db.NewGetter(ModeGetRequest)
-
chunksMu.Lock()
defer chunksMu.Unlock()
- for _, chunk := range chunks {
- got, err := getter.Get(chunk.Address())
+ for _, ch := range chunks {
+ got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
- if !bytes.Equal(got.Data(), chunk.Data()) {
- t.Fatalf("got chunk %s data %x, want %x", chunk.Address().Hex(), got.Data(), chunk.Data())
+ if !bytes.Equal(got.Data(), ch.Data()) {
+ t.Fatalf("got chunk %s data %x, want %x", ch.Address().Hex(), got.Data(), ch.Data())
}
}
}
+// TestModePut_sameChunk puts the same chunk multiple times
+// and validates that all relevant indexes have only one item
+// in them.
+func TestModePut_sameChunk(t *testing.T) {
+ ch := generateTestRandomChunk()
+
+ for _, tc := range []struct {
+ name string
+ mode chunk.ModePut
+ pullIndex bool
+ pushIndex bool
+ }{
+ {
+ name: "ModePutRequest",
+ mode: chunk.ModePutRequest,
+ pullIndex: false,
+ pushIndex: false,
+ },
+ {
+ name: "ModePutUpload",
+ mode: chunk.ModePutUpload,
+ pullIndex: true,
+ pushIndex: true,
+ },
+ {
+ name: "ModePutSync",
+ mode: chunk.ModePutSync,
+ pullIndex: true,
+ pushIndex: false,
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ for i := 0; i < 10; i++ {
+ exists, err := db.Put(context.Background(), tc.mode, ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ switch exists {
+ case false:
+ if i != 0 {
+ t.Fatal("should not exist only on first Put")
+ }
+ case true:
+ if i == 0 {
+ t.Fatal("should exist on all cases other than the first one")
+ }
+ }
+
+ count := func(b bool) (c int) {
+ if b {
+ return 1
+ }
+ return 0
+ }
+
+ newItemsCountTest(db.retrievalDataIndex, 1)(t)
+ newItemsCountTest(db.pullIndex, count(tc.pullIndex))(t)
+ newItemsCountTest(db.pushIndex, count(tc.pushIndex))(t)
+ }
+ })
+ }
+}
+
// BenchmarkPutUpload runs a series of benchmarks that upload
// a specific number of chunks in parallel.
//
@@ -270,7 +332,6 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int)
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
chunks := make([]chunk.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = generateTestRandomChunk()
@@ -286,7 +347,8 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int)
go func(i int) {
defer func() { <-sem }()
- errs <- uploader.Put(chunks[i])
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i])
+ errs <- err
}(i)
}
}()
diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go
index 83fcbea52..14b48a22e 100644
--- a/swarm/storage/localstore/mode_set.go
+++ b/swarm/storage/localstore/mode_set.go
@@ -17,51 +17,37 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModeSet enumerates different Setter modes.
-type ModeSet int
-
-// Setter modes.
-const (
- // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
- ModeSetAccess ModeSet = iota
- // ModeSetSync: when push sync receipt is received
- ModeSetSync
- // modeSetRemove: when GC-d
- // unexported as no external packages should remove chunks from database
- modeSetRemove
-)
+// Set updates database indexes for a specific
+// chunk represented by the address.
+// Set is required to implement chunk.Store
+// interface.
+func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
+ metricName := fmt.Sprintf("localstore.Set.%s", mode)
-// Setter sets the state of a particular
-// Chunk in database by changing indexes.
-type Setter struct {
- db *DB
- mode ModeSet
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// NewSetter returns a new Setter on database
-// with a specific Mode.
-func (db *DB) NewSetter(mode ModeSet) *Setter {
- return &Setter{
- mode: mode,
- db: db,
+ err = db.set(mode, addr)
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
-}
-
-// Set updates database indexes for a specific
-// chunk represented by the address.
-func (s *Setter) Set(addr chunk.Address) (err error) {
- return s.db.set(s.mode, addr)
+ return err
}
// set updates database indexes for a specific
// chunk represented by the address.
// It acquires lockAddr to protect two calls
// of this function for the same address in parallel.
-func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
+func (db *DB) set(mode chunk.ModeSet, addr chunk.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
@@ -76,7 +62,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
item := addressToItem(addr)
switch mode {
- case ModeSetAccess:
+ case chunk.ModeSetAccess:
// add to pull, insert to gc
// need to get access timestamp here as it is not
@@ -87,9 +73,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
switch err {
case nil:
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
case leveldb.ErrNotFound:
db.pushIndex.DeleteInBatch(batch, item)
item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.Inc(uint64(db.po(item.Address)))
+ if err != nil {
+ return err
+ }
default:
return err
}
@@ -112,7 +103,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++
- case ModeSetSync:
+ case chunk.ModeSetSync:
// delete from push, insert to gc
// need to get access timestamp here as it is not
@@ -131,6 +122,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
return err
}
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
i, err = db.retrievalAccessIndex.Get(item)
switch err {
@@ -149,7 +141,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++
- case modeSetRemove:
+ case chunk.ModeSetRemove:
// delete from retrieve, pull, gc
// need to get access timestamp here as it is not
@@ -169,6 +161,7 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
return err
}
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go
index 674aaabec..9ba62cd20 100644
--- a/swarm/storage/localstore/mode_set_test.go
+++ b/swarm/storage/localstore/mode_set_test.go
@@ -17,9 +17,11 @@
package localstore
import (
+ "context"
"testing"
"time"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/syndtr/goleveldb/leveldb"
)
@@ -28,23 +30,23 @@ func TestModeSetAccess(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
- err := db.NewSetter(ModeSetAccess).Set(chunk.Address())
+ err := db.Set(context.Background(), chunk.ModeSetAccess, ch.Address())
if err != nil {
t.Fatal(err)
}
- t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 1))
- t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -56,28 +58,28 @@ func TestModeSetSync(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = db.NewSetter(ModeSetSync).Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
- t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp))
- t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, leveldb.ErrNotFound))
+ t.Run("push index", newPushIndexTest(db, ch, wantTimestamp, leveldb.ErrNotFound))
- t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
+ t.Run("gc index", newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
@@ -89,40 +91,39 @@ func TestModeSetRemove(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := db.NewPutter(ModePutUpload).Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- err = db.NewSetter(modeSetRemove).Set(chunk.Address())
+ err = db.Set(context.Background(), chunk.ModeSetRemove, ch.Address())
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", func(t *testing.T) {
wantErr := leveldb.ErrNotFound
- _, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ _, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
}
t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
// access index should not be set
- _, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ _, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
}
t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0))
})
- t.Run("pull index", newPullIndexTest(db, chunk, 0, leveldb.ErrNotFound))
+ t.Run("pull index", newPullIndexTest(db, ch, 0, leveldb.ErrNotFound))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
-
}
diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go
index b08790124..4ca2e32e6 100644
--- a/swarm/storage/localstore/retrieval_index_test.go
+++ b/swarm/storage/localstore/retrieval_index_test.go
@@ -17,6 +17,7 @@
package localstore
import (
+ "context"
"strconv"
"testing"
@@ -61,17 +62,14 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
- requester := db.NewGetter(ModeGetRequest)
addrs := make([]chunk.Address, count)
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ ch := generateTestRandomChunk()
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
b.Fatal(err)
}
- addrs[i] = chunk.Address()
+ addrs[i] = ch.Address()
}
// set update gc test hook to signal when
// update gc goroutine is done by sending to
@@ -85,12 +83,12 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StartTimer()
for i := 0; i < count; i++ {
- err := syncer.Set(addrs[i])
+ err := db.Set(context.Background(), chunk.ModeSetSync, addrs[i])
if err != nil {
b.Fatal(err)
}
- _, err = requester.Get(addrs[i])
+ _, err = db.Get(context.Background(), chunk.ModeGetRequest, addrs[i])
if err != nil {
b.Fatal(err)
}
@@ -133,7 +131,6 @@ func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
chunks := make([]chunk.Chunk, count)
for i := 0; i < count; i++ {
chunk := generateTestRandomChunk()
@@ -142,7 +139,7 @@ func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StartTimer()
for i := 0; i < count; i++ {
- err := uploader.Put(chunks[i])
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i])
if err != nil {
b.Fatal(err)
}
diff --git a/swarm/storage/localstore/schema.go b/swarm/storage/localstore/schema.go
new file mode 100644
index 000000000..538c75d1f
--- /dev/null
+++ b/swarm/storage/localstore/schema.go
@@ -0,0 +1,52 @@
+package localstore
+
+import (
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+// The DB schema we want to use. The actual/current DB schema might differ
+// until migrations are run.
+const CurrentDbSchema = DbSchemaSanctuary
+
+// There was a time when we had no schema at all.
+const DbSchemaNone = ""
+
+// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
+const DbSchemaPurity = "purity"
+
+// "halloween" is here because we had a screw in the garbage collector index.
+// Because of that we had to rebuild the GC index to get rid of erroneous
+// entries and that takes a long time. This schema is used for bookkeeping,
+// so rebuild index will run just once.
+const DbSchemaHalloween = "halloween"
+
+const DbSchemaSanctuary = "sanctuary"
+
+// returns true if legacy database is in the datadir
+func IsLegacyDatabase(datadir string) bool {
+
+ var (
+ legacyDbSchemaKey = []byte{8}
+ )
+
+ db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128})
+ if err != nil {
+ log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err)
+ return false
+ }
+ defer db.Close()
+
+ data, err := db.Get(legacyDbSchemaKey, nil)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ // if we haven't found anything under the legacy db schema key- we are not on legacy
+ return false
+ }
+
+ log.Error("got an unexpected error fetching legacy name from the database", "err", err)
+ }
+ log.Trace("checking if database scheme is legacy", "schema name", string(data))
+ return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity
+}
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
index 0b96102e3..dd07add53 100644
--- a/swarm/storage/localstore/subscription_pull.go
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -17,28 +17,34 @@
package localstore
import (
- "bytes"
"context"
"errors"
- "fmt"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
-// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
-// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
-// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
+// is not 0, the iteration will start from the first item stored after that id. If until is not 0,
+// only chunks stored up to this id will be sent to the channel, and the returned channel will be
+// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
-func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
- chunkDescriptors := make(chan ChunkDescriptor)
+func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
+ metricName := "localstore.SubscribePull"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
+ chunkDescriptors := make(chan chunk.Descriptor)
trigger := make(chan struct{}, 1)
db.pullTriggersMu.Lock()
@@ -59,18 +65,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
var errStopSubscription = errors.New("stop subscription")
go func() {
- // close the returned ChunkDescriptor channel at the end to
+ defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
+ // close the returned chunk.Descriptor channel at the end to
// signal that the subscription is done
defer close(chunkDescriptors)
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
- if since != nil {
+ if since > 0 {
sinceItem = &shed.Item{
- Address: since.Address,
- StoreTimestamp: since.StoreTimestamp,
+ Address: db.addressInBin(bin),
+ BinID: since,
}
}
+ first := true // first iteration flag for SkipStartFromItem
for {
select {
case <-trigger:
@@ -78,17 +86,23 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
// - last index Item is reached
// - subscription stop is called
// - context is done
+ metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+ ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+ sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until))
+
+ iterStart := time.Now()
+ var count int
err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
select {
- case chunkDescriptors <- ChunkDescriptor{
- Address: item.Address,
- StoreTimestamp: item.StoreTimestamp,
+ case chunkDescriptors <- chunk.Descriptor{
+ Address: item.Address,
+ BinID: item.BinID,
}:
+ count++
// until chunk descriptor is sent
// break the iteration
- if until != nil &&
- (item.StoreTimestamp >= until.StoreTimestamp ||
- bytes.Equal(item.Address, until.Address)) {
+ if until > 0 && item.BinID >= until {
return true, errStopSubscription
}
// set next iteration start item
@@ -109,19 +123,34 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
}, &shed.IterateOptions{
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
- // iterator call, skip it in this one
- SkipStartFromItem: true,
+ // iterator call, skip it in this one, but not the item with
+ // the provided since bin id as it should be sent to a channel
+ SkipStartFromItem: !first,
Prefix: []byte{bin},
})
+
+ totalTimeMetric(metricName+".iter", iterStart)
+
+ sp.FinishWithOptions(opentracing.FinishOptions{
+ LogRecords: []opentracing.LogRecord{
+ {
+ Timestamp: time.Now(),
+ Fields: []olog.Field{olog.Int("count", count)},
+ },
+ },
+ })
+
if err != nil {
if err == errStopSubscription {
// stop subscription without any errors
// if until is reached
return
}
+ metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
return
}
+ first = false
case <-stopChan:
// terminate the subscription
// on stop
@@ -159,35 +188,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
return chunkDescriptors, stop
}
-// LastPullSubscriptionChunk returns ChunkDescriptor of the latest Chunk
+// LastPullSubscriptionBinID returns chunk bin id of the latest Chunk
// in pull syncing index for a provided bin. If there are no chunks in
-// that bin, chunk.ErrChunkNotFound is returned.
-func (db *DB) LastPullSubscriptionChunk(bin uint8) (c *ChunkDescriptor, err error) {
+// that bin, 0 value is returned.
+func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
+ metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1)
+
item, err := db.pullIndex.Last([]byte{bin})
if err != nil {
if err == leveldb.ErrNotFound {
- return nil, chunk.ErrChunkNotFound
+ return 0, nil
}
- return nil, err
+ return 0, err
}
- return &ChunkDescriptor{
- Address: item.Address,
- StoreTimestamp: item.StoreTimestamp,
- }, nil
-}
-
-// ChunkDescriptor holds information required for Pull syncing. This struct
-// is provided by subscribing to pull index.
-type ChunkDescriptor struct {
- Address chunk.Address
- StoreTimestamp int64
-}
-
-func (c *ChunkDescriptor) String() string {
- if c == nil {
- return "none"
- }
- return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
+ return item.BinID, nil
}
// triggerPullSubscriptions is used internally for starting iterations
@@ -209,3 +223,12 @@ func (db *DB) triggerPullSubscriptions(bin uint8) {
}
}
}
+
+// addressInBin returns an address that is in a specific
+// proximity order bin from database base key.
+func (db *DB) addressInBin(bin uint8) (addr chunk.Address) {
+ addr = append([]byte(nil), db.baseKey...)
+ b := bin / 8
+ addr[b] = addr[b] ^ (1 << (7 - bin%8))
+ return addr
+}
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
index d5ddae02b..bf364ed44 100644
--- a/swarm/storage/localstore/subscription_pull_test.go
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/shed"
)
// TestDB_SubscribePull uploads some chunks before and after
@@ -35,15 +36,13 @@ func TestDB_SubscribePull(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
// prepopulate database with some chunks
// before the subscription
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -54,22 +53,22 @@ func TestDB_SubscribePull(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ ch, stop := db.SubscribePull(ctx, bin, 0, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3)
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
@@ -82,15 +81,13 @@ func TestDB_SubscribePull_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
// prepopulate database with some chunks
// before the subscription
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -106,23 +103,23 @@ func TestDB_SubscribePull_multiple(t *testing.T) {
// that all of them will write every address error to errChan
for j := 0; j < subsCount; j++ {
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ ch, stop := db.SubscribePull(ctx, bin, 0, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
}
// upload some chunks just after subscribe
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3)
checkErrChan(ctx, t, errChan, wantedChunksCount*subsCount)
}
@@ -135,61 +132,52 @@ func TestDB_SubscribePull_since(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (first map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ first = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
bin := db.po(ch.Address())
- if _, ok := addrs[bin]; !ok {
- addrs[bin] = make([]chunk.Address, 0)
- }
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
+
if wanted {
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]chunk.Address, 0)
+ }
addrs[bin] = append(addrs[bin], ch.Address())
wantedChunksCount++
- }
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
-
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
+ if _, ok := first[bin]; !ok {
+ first[bin] = binIDCounter[bin]
+ }
}
}
- return last
+ return first
}
// prepopulate database with some chunks
// before the subscription
- last := uploadRandomChunks(30, false)
+ uploadRandomChunks(30, false)
- uploadRandomChunks(25, true)
+ first := uploadRandomChunks(25, true)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -200,21 +188,18 @@ func TestDB_SubscribePull_since(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- var since *ChunkDescriptor
- if c, ok := last[bin]; ok {
- since = &c
+ since, ok := first[bin]
+ if !ok {
+ continue
}
- ch, stop := db.SubscribePull(ctx, bin, since, nil)
+ ch, stop := db.SubscribePull(ctx, bin, since, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
- // upload some chunks just after subscribe
- uploadRandomChunks(15, true)
-
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
@@ -226,30 +211,22 @@ func TestDB_SubscribePull_until(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ last = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -264,14 +241,11 @@ func TestDB_SubscribePull_until(t *testing.T) {
wantedChunksCount++
}
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
return last
}
@@ -295,11 +269,11 @@ func TestDB_SubscribePull_until(t *testing.T) {
if !ok {
continue
}
- ch, stop := db.SubscribePull(ctx, bin, nil, &until)
+ ch, stop := db.SubscribePull(ctx, bin, 0, until)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
@@ -316,30 +290,22 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ last = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -354,14 +320,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
wantedChunksCount++
}
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
return last
}
@@ -387,9 +350,10 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- var since *ChunkDescriptor
- if c, ok := upload1[bin]; ok {
- since = &c
+ since, ok := upload1[bin]
+ if ok {
+ // start from the next uploaded chunk
+ since++
}
until, ok := upload2[bin]
if !ok {
@@ -397,11 +361,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
// skip this bin from testing
continue
}
- ch, stop := db.SubscribePull(ctx, bin, since, &until)
+ ch, stop := db.SubscribePull(ctx, bin, since, until)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
@@ -412,14 +376,14 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
// uploadRandomChunksBin uploads random chunks to database and adds them to
// the map of addresses ber bin.
-func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
+func uploadRandomChunksBin(t *testing.T, db *DB, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
addrsMu.Lock()
defer addrsMu.Unlock()
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -434,10 +398,10 @@ func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uin
}
}
-// readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and
-// sends error to errChan, even if it is nil, to count the number of ChunkDescriptors
+// readPullSubscriptionBin is a helper function that reads all chunk.Descriptors from a channel and
+// sends error to errChan, even if it is nil, to count the number of chunk.Descriptors
// returned by the channel.
-func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) {
+func readPullSubscriptionBin(ctx context.Context, db *DB, bin uint8, ch <-chan chunk.Descriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) {
var i int // address index
for {
select {
@@ -450,9 +414,20 @@ func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDesc
if i+1 > len(addrs[bin]) {
err = fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin)
} else {
- want := addrs[bin][i]
- if !bytes.Equal(got.Address, want) {
- err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want)
+ addr := addrs[bin][i]
+ if !bytes.Equal(got.Address, addr) {
+ err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got.Address.Hex(), addr.Hex())
+ } else {
+ want, err := db.retrievalDataIndex.Get(shed.Item{
+ Address: addr,
+ })
+ if err != nil {
+ err = fmt.Errorf("got chunk (bin id %v in bin %v) from retrieval index %s: %v", i, bin, addrs[bin][i].Hex(), err)
+ } else {
+ if got.BinID != want.BinID {
+ err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got, want)
+ }
+ }
}
}
addrsMu.Unlock()
@@ -486,27 +461,19 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC
}
}
-// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk
+// TestDB_LastPullSubscriptionBinID validates that LastPullSubscriptionBinID
// is returning the last chunk descriptor for proximity order bins by
// doing a few rounds of chunk uploads.
-func TestDB_LastPullSubscriptionChunk(t *testing.T) {
+func TestDB_LastPullSubscriptionBinID(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- last := make(map[uint8]ChunkDescriptor)
+ last := make(map[uint8]uint64)
// do a few rounds of uploads and check if
// last pull subscription chunk is correct
@@ -516,7 +483,7 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -528,32 +495,42 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) {
}
addrs[bin] = append(addrs[bin], ch.Address())
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
// check
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
want, ok := last[bin]
- got, err := db.LastPullSubscriptionChunk(bin)
+ got, err := db.LastPullSubscriptionBinID(bin)
if ok {
if err != nil {
t.Errorf("got unexpected error value %v", err)
}
- if !bytes.Equal(got.Address, want.Address) {
- t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex())
- }
- } else {
- if err != chunk.ErrChunkNotFound {
- t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound)
- }
}
+ if got != want {
+ t.Errorf("got last bin id %v, want %v", got, want)
+ }
+ }
+ }
+}
+
+// TestAddressInBin validates that function addressInBin
+// returns a valid address for every proximity order bin.
+func TestAddressInBin(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ for po := uint8(0); po < chunk.MaxPO; po++ {
+ addr := db.addressInBin(po)
+
+ got := db.po(addr)
+
+ if got != uint8(po) {
+ t.Errorf("got po %v, want %v", got, po)
}
}
}
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
index 5cbc2eb6f..f2463af2a 100644
--- a/swarm/storage/localstore/subscription_push.go
+++ b/swarm/storage/localstore/subscription_push.go
@@ -19,10 +19,15 @@ package localstore
import (
"context"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
)
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
@@ -30,6 +35,9 @@ import (
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
+ metricName := "localstore.SubscribePush"
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
chunks := make(chan chunk.Chunk)
trigger := make(chan struct{}, 1)
@@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
var stopChanOnce sync.Once
go func() {
+ defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
@@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// - last index Item is reached
// - subscription stop is called
// - context is done
+ metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+ ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+
+ iterStart := time.Now()
+ var count int
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// get chunk data
dataItem, err := db.retrievalDataIndex.Get(item)
@@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
@@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// iterator call, skip it in this one
SkipStartFromItem: true,
})
+
+ totalTimeMetric(metricName+".iter", iterStart)
+
+ sp.FinishWithOptions(opentracing.FinishOptions{
+ LogRecords: []opentracing.LogRecord{
+ {
+ Timestamp: time.Now(),
+ Fields: []olog.Field{olog.Int("count", count)},
+ },
+ },
+ })
+
if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore push subscription iteration", "err", err)
return
}
diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go
index 30fb98eb2..6124a534b 100644
--- a/swarm/storage/localstore/subscription_push_test.go
+++ b/swarm/storage/localstore/subscription_push_test.go
@@ -34,8 +34,6 @@ func TestDB_SubscribePush(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
chunks := make([]chunk.Chunk, 0)
var chunksMu sync.Mutex
@@ -44,14 +42,14 @@ func TestDB_SubscribePush(t *testing.T) {
defer chunksMu.Unlock()
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- chunks = append(chunks, chunk)
+ chunks = append(chunks, ch)
}
}
@@ -122,8 +120,6 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make([]chunk.Address, 0)
var addrsMu sync.Mutex
@@ -132,14 +128,14 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
defer addrsMu.Unlock()
for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
+ ch := generateTestRandomChunk()
- err := uploader.Put(chunk)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
- addrs = append(addrs, chunk.Address())
+ addrs = append(addrs, ch.Address())
}
}
diff --git a/swarm/storage/localstore_test.go b/swarm/storage/localstore_test.go
deleted file mode 100644
index fcadcefa0..000000000
--- a/swarm/storage/localstore_test.go
+++ /dev/null
@@ -1,244 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package storage
-
-import (
- "context"
- "io/ioutil"
- "os"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/swarm/chunk"
-)
-
-var (
- hashfunc = MakeHashFunc(DefaultHash)
-)
-
-// tests that the content address validator correctly checks the data
-// tests that feed update chunks are passed through content address validator
-// the test checking the resouce update validator internal correctness is found in storage/feeds/handler_test.go
-func TestValidator(t *testing.T) {
- // set up localstore
- datadir, err := ioutil.TempDir("", "storage-testvalidator")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(datadir)
-
- params := NewDefaultLocalStoreParams()
- params.Init(datadir)
- store, err := NewLocalStore(params, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- // check puts with no validators, both succeed
- chunks := GenerateRandomChunks(259, 2)
- goodChunk := chunks[0]
- badChunk := chunks[1]
- copy(badChunk.Data(), goodChunk.Data())
-
- errs := putChunks(store, goodChunk, badChunk)
- if errs[0] != nil {
- t.Fatalf("expected no error on good content address chunk in spite of no validation, but got: %s", err)
- }
- if errs[1] != nil {
- t.Fatalf("expected no error on bad content address chunk in spite of no validation, but got: %s", err)
- }
-
- // add content address validator and check puts
- // bad should fail, good should pass
- store.Validators = append(store.Validators, NewContentAddressValidator(hashfunc))
- chunks = GenerateRandomChunks(chunk.DefaultSize, 2)
- goodChunk = chunks[0]
- badChunk = chunks[1]
- copy(badChunk.Data(), goodChunk.Data())
-
- errs = putChunks(store, goodChunk, badChunk)
- if errs[0] != nil {
- t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err)
- }
- if errs[1] == nil {
- t.Fatal("expected error on bad content address chunk with content address validator only, but got nil")
- }
-
- // append a validator that always denies
- // bad should fail, good should pass,
- var negV boolTestValidator
- store.Validators = append(store.Validators, negV)
-
- chunks = GenerateRandomChunks(chunk.DefaultSize, 2)
- goodChunk = chunks[0]
- badChunk = chunks[1]
- copy(badChunk.Data(), goodChunk.Data())
-
- errs = putChunks(store, goodChunk, badChunk)
- if errs[0] != nil {
- t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err)
- }
- if errs[1] == nil {
- t.Fatal("expected error on bad content address chunk with content address validator only, but got nil")
- }
-
- // append a validator that always approves
- // all shall pass
- var posV boolTestValidator = true
- store.Validators = append(store.Validators, posV)
-
- chunks = GenerateRandomChunks(chunk.DefaultSize, 2)
- goodChunk = chunks[0]
- badChunk = chunks[1]
- copy(badChunk.Data(), goodChunk.Data())
-
- errs = putChunks(store, goodChunk, badChunk)
- if errs[0] != nil {
- t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err)
- }
- if errs[1] != nil {
- t.Fatalf("expected no error on bad content address chunk in spite of no validation, but got: %s", err)
- }
-
-}
-
-type boolTestValidator bool
-
-func (self boolTestValidator) Validate(chunk Chunk) bool {
- return bool(self)
-}
-
-// putChunks adds chunks to localstore
-// It waits for receive on the stored channel
-// It logs but does not fail on delivery error
-func putChunks(store *LocalStore, chunks ...Chunk) []error {
- i := 0
- f := func(n int64) Chunk {
- chunk := chunks[i]
- i++
- return chunk
- }
- _, errs := put(store, len(chunks), f)
- return errs
-}
-
-func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs []error) {
- for i := int64(0); i < int64(n); i++ {
- chunk := f(chunk.DefaultSize)
- err := store.Put(context.TODO(), chunk)
- errs = append(errs, err)
- hs = append(hs, chunk.Address())
- }
- return hs, errs
-}
-
-// TestGetFrequentlyAccessedChunkWontGetGarbageCollected tests that the most
-// frequently accessed chunk is not garbage collected from LDBStore, i.e.,
-// from disk when we are at the capacity and garbage collector runs. For that
-// we start putting random chunks into the DB while continuously accessing the
-// chunk we care about then check if we can still retrieve it from disk.
-func TestGetFrequentlyAccessedChunkWontGetGarbageCollected(t *testing.T) {
- ldbCap := defaultGCRatio
- store, cleanup := setupLocalStore(t, ldbCap)
- defer cleanup()
-
- var chunks []Chunk
- for i := 0; i < ldbCap; i++ {
- chunks = append(chunks, GenerateRandomChunk(chunk.DefaultSize))
- }
-
- mostAccessed := chunks[0].Address()
- for _, chunk := range chunks {
- if err := store.Put(context.Background(), chunk); err != nil {
- t.Fatal(err)
- }
-
- if _, err := store.Get(context.Background(), mostAccessed); err != nil {
- t.Fatal(err)
- }
- // Add time for MarkAccessed() to be able to finish in a separate Goroutine
- time.Sleep(1 * time.Millisecond)
- }
-
- store.DbStore.collectGarbage()
- if _, err := store.DbStore.Get(context.Background(), mostAccessed); err != nil {
- t.Logf("most frequntly accessed chunk not found on disk (key: %v)", mostAccessed)
- t.Fatal(err)
- }
-
-}
-
-func setupLocalStore(t *testing.T, ldbCap int) (ls *LocalStore, cleanup func()) {
- t.Helper()
-
- var err error
- datadir, err := ioutil.TempDir("", "storage")
- if err != nil {
- t.Fatal(err)
- }
-
- params := &LocalStoreParams{
- StoreParams: NewStoreParams(uint64(ldbCap), uint(ldbCap), nil, nil),
- }
- params.Init(datadir)
-
- store, err := NewLocalStore(params, nil)
- if err != nil {
- _ = os.RemoveAll(datadir)
- t.Fatal(err)
- }
-
- cleanup = func() {
- store.Close()
- _ = os.RemoveAll(datadir)
- }
-
- return store, cleanup
-}
-
-func TestHas(t *testing.T) {
- ldbCap := defaultGCRatio
- store, cleanup := setupLocalStore(t, ldbCap)
- defer cleanup()
-
- nonStoredAddr := GenerateRandomChunk(128).Address()
-
- has := store.Has(context.Background(), nonStoredAddr)
- if has {
- t.Fatal("Expected Has() to return false, but returned true!")
- }
-
- storeChunks := GenerateRandomChunks(128, 3)
- for _, ch := range storeChunks {
- err := store.Put(context.Background(), ch)
- if err != nil {
- t.Fatalf("Expected store to store chunk, but it failed: %v", err)
- }
-
- has := store.Has(context.Background(), ch.Address())
- if !has {
- t.Fatal("Expected Has() to return true, but returned false!")
- }
- }
-
- //let's be paranoic and test again that the non-existent chunk returns false
- has = store.Has(context.Background(), nonStoredAddr)
- if has {
- t.Fatal("Expected Has() to return false, but returned true!")
- }
-
-}
diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go
deleted file mode 100644
index 611ac3bc5..000000000
--- a/swarm/storage/memstore.go
+++ /dev/null
@@ -1,92 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-// memory storage layer for the package blockhash
-
-package storage
-
-import (
- "context"
-
- lru "github.com/hashicorp/golang-lru"
-)
-
-type MemStore struct {
- cache *lru.Cache
- disabled bool
-}
-
-//NewMemStore is instantiating a MemStore cache keeping all frequently requested
-//chunks in the `cache` LRU cache.
-func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) {
- if params.CacheCapacity == 0 {
- return &MemStore{
- disabled: true,
- }
- }
-
- c, err := lru.New(int(params.CacheCapacity))
- if err != nil {
- panic(err)
- }
-
- return &MemStore{
- cache: c,
- }
-}
-
-// Has needed to implement SyncChunkStore
-func (m *MemStore) Has(_ context.Context, addr Address) bool {
- return m.cache.Contains(addr)
-}
-
-func (m *MemStore) Get(_ context.Context, addr Address) (Chunk, error) {
- if m.disabled {
- return nil, ErrChunkNotFound
- }
-
- c, ok := m.cache.Get(string(addr))
- if !ok {
- return nil, ErrChunkNotFound
- }
- return c.(Chunk), nil
-}
-
-func (m *MemStore) Put(_ context.Context, c Chunk) error {
- if m.disabled {
- return nil
- }
-
- m.cache.Add(string(c.Address()), c)
- return nil
-}
-
-func (m *MemStore) setCapacity(n int) {
- if n <= 0 {
- m.disabled = true
- } else {
- c, err := lru.New(n)
- if err != nil {
- panic(err)
- }
-
- *m = MemStore{
- cache: c,
- }
- }
-}
-
-func (s *MemStore) Close() {}
diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go
deleted file mode 100644
index 8aaf486a7..000000000
--- a/swarm/storage/memstore_test.go
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package storage
-
-import (
- "context"
- "testing"
-
- "github.com/ethereum/go-ethereum/swarm/log"
-)
-
-func newTestMemStore() *MemStore {
- storeparams := NewDefaultStoreParams()
- return NewMemStore(storeparams, nil)
-}
-
-func testMemStoreRandom(n int, t *testing.T) {
- m := newTestMemStore()
- defer m.Close()
- testStoreRandom(m, n, t)
-}
-
-func testMemStoreCorrect(n int, t *testing.T) {
- m := newTestMemStore()
- defer m.Close()
- testStoreCorrect(m, n, t)
-}
-
-func TestMemStoreRandom_1(t *testing.T) {
- testMemStoreRandom(1, t)
-}
-
-func TestMemStoreCorrect_1(t *testing.T) {
- testMemStoreCorrect(1, t)
-}
-
-func TestMemStoreRandom_1k(t *testing.T) {
- testMemStoreRandom(1000, t)
-}
-
-func TestMemStoreCorrect_1k(t *testing.T) {
- testMemStoreCorrect(100, t)
-}
-
-func TestMemStoreNotFound(t *testing.T) {
- m := newTestMemStore()
- defer m.Close()
-
- _, err := m.Get(context.TODO(), ZeroAddr)
- if err != ErrChunkNotFound {
- t.Errorf("Expected ErrChunkNotFound, got %v", err)
- }
-}
-
-func benchmarkMemStorePut(n int, b *testing.B) {
- m := newTestMemStore()
- defer m.Close()
- benchmarkStorePut(m, n, b)
-}
-
-func benchmarkMemStoreGet(n int, b *testing.B) {
- m := newTestMemStore()
- defer m.Close()
- benchmarkStoreGet(m, n, b)
-}
-
-func BenchmarkMemStorePut_500(b *testing.B) {
- benchmarkMemStorePut(500, b)
-}
-
-func BenchmarkMemStoreGet_500(b *testing.B) {
- benchmarkMemStoreGet(500, b)
-}
-
-func TestMemStoreAndLDBStore(t *testing.T) {
- ldb, cleanup := newLDBStore(t)
- ldb.setCapacity(4000)
- defer cleanup()
-
- cacheCap := 200
- memStore := NewMemStore(NewStoreParams(4000, 200, nil, nil), nil)
-
- tests := []struct {
- n int // number of chunks to push to memStore
- chunkSize int64 // size of chunk (by default in Swarm - 4096)
- }{
- {
- n: 1,
- chunkSize: 4096,
- },
- {
- n: 101,
- chunkSize: 4096,
- },
- {
- n: 501,
- chunkSize: 4096,
- },
- {
- n: 1100,
- chunkSize: 4096,
- },
- }
-
- for i, tt := range tests {
- log.Info("running test", "idx", i, "tt", tt)
- var chunks []Chunk
-
- for i := 0; i < tt.n; i++ {
- c := GenerateRandomChunk(tt.chunkSize)
- chunks = append(chunks, c)
- }
-
- for i := 0; i < tt.n; i++ {
- err := ldb.Put(context.TODO(), chunks[i])
- if err != nil {
- t.Fatal(err)
- }
- err = memStore.Put(context.TODO(), chunks[i])
- if err != nil {
- t.Fatal(err)
- }
-
- if got := memStore.cache.Len(); got > cacheCap {
- t.Fatalf("expected to get cache capacity less than %v, but got %v", cacheCap, got)
- }
-
- }
-
- for i := 0; i < tt.n; i++ {
- _, err := memStore.Get(context.TODO(), chunks[i].Address())
- if err != nil {
- if err == ErrChunkNotFound {
- _, err := ldb.Get(context.TODO(), chunks[i].Address())
- if err != nil {
- t.Fatalf("couldn't get chunk %v from ldb, got error: %v", i, err)
- }
- } else {
- t.Fatalf("got error from memstore: %v", err)
- }
- }
- }
- }
-}
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index 7741b8f7b..b675384ce 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/opentracing/opentracing-go"
@@ -49,8 +50,8 @@ type NetFetcher interface {
// fetchers are unique to a chunk and are stored in fetchers LRU memory cache
// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address
type NetStore struct {
+ chunk.Store
mu sync.Mutex
- store SyncChunkStore
fetchers *lru.Cache
NewNetFetcherFunc NewNetFetcherFunc
closeC chan struct{}
@@ -60,13 +61,13 @@ var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if re
// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a
// constructor function that can create a fetch function for a specific chunk address.
-func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) {
+func NewNetStore(store chunk.Store, nnf NewNetFetcherFunc) (*NetStore, error) {
fetchers, err := lru.New(defaultChunkRequestsCacheCapacity)
if err != nil {
return nil, err
}
return &NetStore{
- store: store,
+ Store: store,
fetchers: fetchers,
NewNetFetcherFunc: nnf,
closeC: make(chan struct{}),
@@ -75,14 +76,14 @@ func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error)
// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in
// the fetchers cache
-func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
+func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool, error) {
n.mu.Lock()
defer n.mu.Unlock()
// put to the chunk to the store, there should be no error
- err := n.store.Put(ctx, ch)
+ exists, err := n.Store.Put(ctx, mode, ch)
if err != nil {
- return err
+ return exists, err
}
// if chunk is now put in the store, check if there was an active fetcher and call deliver on it
@@ -92,15 +93,15 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
log.Trace("n.getFetcher deliver", "ref", ch.Address())
f.deliver(ctx, ch)
}
- return nil
+ return exists, nil
}
// Get retrieves the chunk from the NetStore DPA synchronously.
// It calls NetStore.get, and if the chunk is not in local Storage
// it calls fetch with the request, which blocks until the chunk
// arrived or context is done
-func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) {
- chunk, fetch, err := n.get(rctx, ref)
+func (n *NetStore) Get(rctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, error) {
+ chunk, fetch, err := n.get(rctx, mode, ref)
if err != nil {
return nil, err
}
@@ -118,18 +119,10 @@ func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) {
return fetch(rctx)
}
-func (n *NetStore) BinIndex(po uint8) uint64 {
- return n.store.BinIndex(po)
-}
-
-func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
- return n.store.Iterator(from, to, po, f)
-}
-
// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function,
// which returns after the chunk is available or the context is done
func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error {
- chunk, fetch, _ := n.get(ctx, ref)
+ chunk, fetch, _ := n.get(ctx, chunk.ModeGetRequest, ref)
if chunk != nil {
return nil
}
@@ -140,9 +133,8 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
}
// Close chunk store
-func (n *NetStore) Close() {
+func (n *NetStore) Close() (err error) {
close(n.closeC)
- n.store.Close()
wg := sync.WaitGroup{}
for _, key := range n.fetchers.Keys() {
@@ -162,6 +154,8 @@ func (n *NetStore) Close() {
}
}
wg.Wait()
+
+ return n.Store.Close()
}
// get attempts at retrieving the chunk from LocalStore
@@ -172,11 +166,11 @@ func (n *NetStore) Close() {
// or all fetcher contexts are done.
// It returns a chunk, a fetcher function and an error
// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk.
-func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) {
+func (n *NetStore) get(ctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, func(context.Context) (Chunk, error), error) {
n.mu.Lock()
defer n.mu.Unlock()
- chunk, err := n.store.Get(ctx, ref)
+ chunk, err := n.Store.Get(ctx, mode, ref)
if err != nil {
// TODO: Fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped.
if err != ErrChunkNotFound && err != leveldb.ErrNotFound {
@@ -192,13 +186,6 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co
return chunk, nil, nil
}
-// Has is the storage layer entry point to query the underlying
-// database to return if it has a chunk or not.
-// Called from the DebugAPI
-func (n *NetStore) Has(ctx context.Context, ref Address) bool {
- return n.store.Has(ctx, ref)
-}
-
// getOrCreateFetcher attempts at retrieving an existing fetchers
// if none exists, creates one and saves it in the fetchers cache
// caller must hold the lock
diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go
index 653877625..dc0727987 100644
--- a/swarm/storage/netstore_test.go
+++ b/swarm/storage/netstore_test.go
@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "os"
"sync"
"testing"
"time"
@@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9")
@@ -76,45 +78,43 @@ func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Addre
return m.fetcher
}
-func mustNewNetStore(t *testing.T) *NetStore {
- netStore, _ := mustNewNetStoreWithFetcher(t)
- return netStore
-}
-
-func mustNewNetStoreWithFetcher(t *testing.T) (*NetStore, *mockNetFetcher) {
+func newTestNetStore(t *testing.T) (netStore *NetStore, fetcher *mockNetFetcher, cleanup func()) {
t.Helper()
- datadir, err := ioutil.TempDir("", "netstore")
+ dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
t.Fatal(err)
}
- naddr := make([]byte, 32)
- params := NewDefaultLocalStoreParams()
- params.Init(datadir)
- params.BaseKey = naddr
- localStore, err := NewTestLocalStoreForAddr(params)
+ localStore, err := localstore.New(dir, make([]byte, 32), nil)
if err != nil {
+ os.RemoveAll(dir)
t.Fatal(err)
}
+ cleanup = func() {
+ localStore.Close()
+ os.RemoveAll(dir)
+ }
- fetcher := &mockNetFetcher{}
+ fetcher = new(mockNetFetcher)
mockNetFetchFuncFactory := &mockNetFetchFuncFactory{
fetcher: fetcher,
}
- netStore, err := NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher)
+ netStore, err = NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher)
if err != nil {
+ cleanup()
t.Fatal(err)
}
- return netStore, fetcher
+ return netStore, fetcher, cleanup
}
// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put.
// After the Put there should no active fetchers, and the context created for the fetcher should
// be cancelled.
func TestNetStoreGetAndPut(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -126,12 +126,12 @@ func TestNetStoreGetAndPut(t *testing.T) {
time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
// check if netStore created a fetcher in the Get call for the unavailable chunk
- if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
putErrC <- errors.New("Expected netStore to use a fetcher for the Get call")
return
}
- err := netStore.Put(ctx, chunk)
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
if err != nil {
putErrC <- fmt.Errorf("Expected no err got %v", err)
return
@@ -141,7 +141,7 @@ func TestNetStoreGetAndPut(t *testing.T) {
}()
close(c)
- recChunk, err := netStore.Get(ctx, chunk.Address()) // this is blocked until the Put above is done
+ recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) // this is blocked until the Put above is done
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
@@ -150,7 +150,7 @@ func TestNetStoreGetAndPut(t *testing.T) {
t.Fatal(err)
}
// the retrieved chunk should be the same as what we Put
- if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) {
+ if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) {
t.Fatalf("Different chunk received than what was put")
}
// the chunk is already available locally, so there should be no active fetchers waiting for it
@@ -172,26 +172,27 @@ func TestNetStoreGetAndPut(t *testing.T) {
// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore,
// there is no need to create fetchers.
func TestNetStoreGetAfterPut(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// First we Put the chunk, so the chunk will be available locally
- err := netStore.Put(ctx, chunk)
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
// Get should retrieve the chunk from LocalStore, without creating fetcher
- recChunk, err := netStore.Get(ctx, chunk.Address())
+ recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
// the retrieved chunk should be the same as what we Put
- if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) {
+ if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) {
t.Fatalf("Different chunk received than what was put")
}
// no fetcher offer or request should be created for a locally available chunk
@@ -207,9 +208,10 @@ func TestNetStoreGetAfterPut(t *testing.T) {
// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout
func TestNetStoreGetTimeout(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
@@ -221,7 +223,7 @@ func TestNetStoreGetTimeout(t *testing.T) {
time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
// check if netStore created a fetcher in the Get call for the unavailable chunk
- if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call")
return
}
@@ -232,7 +234,7 @@ func TestNetStoreGetTimeout(t *testing.T) {
close(c)
// We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will
// be a timeout
- _, err := netStore.Get(ctx, chunk.Address())
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
// Check if the timeout happened
if err != context.DeadlineExceeded {
@@ -259,9 +261,10 @@ func TestNetStoreGetTimeout(t *testing.T) {
// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks
// the errors
func TestNetStoreGetCancel(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@@ -271,7 +274,7 @@ func TestNetStoreGetCancel(t *testing.T) {
<-c // wait for the Get to be called
time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
// check if netStore created a fetcher in the Get call for the unavailable chunk
- if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call")
return
}
@@ -283,7 +286,7 @@ func TestNetStoreGetCancel(t *testing.T) {
close(c)
// We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery
- _, err := netStore.Get(ctx, chunk.Address())
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
if err := <-fetcherErrC; err != nil {
t.Fatal(err)
@@ -311,9 +314,10 @@ func TestNetStoreGetCancel(t *testing.T) {
// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher
// for the chunk retrieval
func TestNetStoreMultipleGetAndPut(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -327,7 +331,7 @@ func TestNetStoreMultipleGetAndPut(t *testing.T) {
putErrC <- errors.New("Expected netStore to use one fetcher for all Get calls")
return
}
- err := netStore.Put(ctx, chunk)
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
if err != nil {
putErrC <- fmt.Errorf("Expected no err got %v", err)
return
@@ -340,11 +344,11 @@ func TestNetStoreMultipleGetAndPut(t *testing.T) {
errC := make(chan error)
for i := 0; i < count; i++ {
go func() {
- recChunk, err := netStore.Get(ctx, chunk.Address())
+ recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
if err != nil {
errC <- fmt.Errorf("Expected no err got %v", err)
}
- if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) {
+ if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) {
errC <- errors.New("Different chunk received than what was put")
}
errC <- nil
@@ -385,7 +389,8 @@ func TestNetStoreMultipleGetAndPut(t *testing.T) {
// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout
func TestNetStoreFetchFuncTimeout(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
chunk := GenerateRandomChunk(chunk.DefaultSize)
@@ -424,21 +429,22 @@ func TestNetStoreFetchFuncTimeout(t *testing.T) {
// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk
func TestNetStoreFetchFuncAfterPut(t *testing.T) {
- netStore := mustNewNetStore(t)
+ netStore, _, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// We deliver the created the chunk with a Put
- err := netStore.Put(ctx, chunk)
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
// FetchFunc should return nil, because the chunk is available locally, no need to fetch it
- wait := netStore.FetchFunc(ctx, chunk.Address())
+ wait := netStore.FetchFunc(ctx, ch.Address())
if wait != nil {
t.Fatal("Expected wait to be nil")
}
@@ -451,16 +457,17 @@ func TestNetStoreFetchFuncAfterPut(t *testing.T) {
// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk
func TestNetStoreGetCallsRequest(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx := context.WithValue(context.Background(), "hopcount", uint8(5))
ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()
// We call get for a not available chunk, it will timeout because the chunk is not delivered
- _, err := netStore.Get(ctx, chunk.Address())
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
if err != context.DeadlineExceeded {
t.Fatalf("Expected context.DeadlineExceeded err got %v", err)
@@ -479,9 +486,10 @@ func TestNetStoreGetCallsRequest(t *testing.T) {
// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk
// in case of a source peer provided in the context.
func TestNetStoreGetCallsOffer(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
// If a source peer is added to the context, NetStore will handle it as an offer
ctx := context.WithValue(context.Background(), "source", sourcePeerID.String())
@@ -489,7 +497,7 @@ func TestNetStoreGetCallsOffer(t *testing.T) {
defer cancel()
// We call get for a not available chunk, it will timeout because the chunk is not delivered
- _, err := netStore.Get(ctx, chunk.Address())
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
if err != context.DeadlineExceeded {
t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err)
@@ -513,8 +521,8 @@ func TestNetStoreGetCallsOffer(t *testing.T) {
// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context.
// There is no Put call, so the Get calls timeout
func TestNetStoreFetcherCountPeers(t *testing.T) {
-
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
addr := randomAddr()
peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()}
@@ -529,7 +537,7 @@ func TestNetStoreFetcherCountPeers(t *testing.T) {
peer := peers[i]
go func() {
ctx := context.WithValue(ctx, "peer", peer)
- _, err := netStore.Get(ctx, addr)
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, addr)
errC <- err
}()
}
@@ -565,21 +573,22 @@ func TestNetStoreFetcherCountPeers(t *testing.T) {
// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks
// if the fetcher is closed.
func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
- chunk := GenerateRandomChunk(chunk.DefaultSize)
+ ch := GenerateRandomChunk(chunk.DefaultSize)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// FetchFunc should return a non-nil wait function, because the chunk is not available
- wait := netStore.FetchFunc(ctx, chunk.Address())
+ wait := netStore.FetchFunc(ctx, ch.Address())
if wait == nil {
t.Fatal("Expected wait function to be not nil")
}
// There should be exactly one fetcher for the chunk
- if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
}
@@ -596,12 +605,12 @@ func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// there should be still only one fetcher, because all wait calls are for the same chunk
- if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
t.Fatal("Expected netStore to have one fetcher for the requested chunk")
}
// Deliver the chunk with a Put
- err := netStore.Put(ctx, chunk)
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
@@ -630,7 +639,8 @@ func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) {
// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes,
// the only difference is that we don't deilver the chunk, just wait for timeout
func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) {
- netStore, fetcher := mustNewNetStoreWithFetcher(t)
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
chunk := GenerateRandomChunk(chunk.DefaultSize)
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
index 281bbe9fe..9b0d5397b 100644
--- a/swarm/storage/pyramid.go
+++ b/swarm/storage/pyramid.go
@@ -96,12 +96,12 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get
When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes.
New chunks to store are store using the putter which the caller provides.
*/
-func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
- return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx)
+func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) {
+ return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize), tag).Split(ctx)
}
-func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
- return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx)
+func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) {
+ return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize), tag).Append(ctx)
}
// Entry to create a tree node
@@ -142,6 +142,7 @@ type PyramidChunker struct {
putter Putter
getter Getter
key Address
+ tag *chunk.Tag
workerCount int64
workerLock sync.RWMutex
jobC chan *chunkJob
@@ -152,7 +153,7 @@ type PyramidChunker struct {
chunkLevel [][]*TreeEntry
}
-func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) {
+func NewPyramidSplitter(params *PyramidSplitterParams, tag *chunk.Tag) (pc *PyramidChunker) {
pc = &PyramidChunker{}
pc.reader = params.reader
pc.hashSize = params.hashSize
@@ -161,6 +162,7 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) {
pc.putter = params.putter
pc.getter = params.getter
pc.key = params.addr
+ pc.tag = tag
pc.workerCount = 0
pc.jobC = make(chan *chunkJob, 2*ChunkProcessors)
pc.wg = &sync.WaitGroup{}
@@ -273,6 +275,7 @@ func (pc *PyramidChunker) processor(ctx context.Context, id int64) {
return
}
pc.processChunk(ctx, id, job)
+ pc.tag.Inc(chunk.StateSplit)
case <-pc.quitC:
return
}
diff --git a/swarm/storage/schema.go b/swarm/storage/schema.go
deleted file mode 100644
index 91847ca0f..000000000
--- a/swarm/storage/schema.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package storage
-
-// The DB schema we want to use. The actual/current DB schema might differ
-// until migrations are run.
-const CurrentDbSchema = DbSchemaHalloween
-
-// There was a time when we had no schema at all.
-const DbSchemaNone = ""
-
-// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
-const DbSchemaPurity = "purity"
-
-// "halloween" is here because we had a screw in the garbage collector index.
-// Because of that we had to rebuild the GC index to get rid of erroneous
-// entries and that takes a long time. This schema is used for bookkeeping,
-// so rebuild index will run just once.
-const DbSchemaHalloween = "halloween"
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index 2f39685b4..d1d47dbe8 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -178,9 +178,7 @@ func (c ChunkData) Size() uint64 {
return binary.LittleEndian.Uint64(c[:8])
}
-type ChunkValidator interface {
- Validate(chunk Chunk) bool
-}
+type ChunkValidator = chunk.Validator
// Provides method for validation of content address in chunks
// Holds the corresponding hasher to create the address
@@ -211,20 +209,7 @@ func (v *ContentAddressValidator) Validate(ch Chunk) bool {
return bytes.Equal(hash, ch.Address())
}
-type ChunkStore interface {
- Put(ctx context.Context, ch Chunk) (err error)
- Get(rctx context.Context, ref Address) (ch Chunk, err error)
- Has(rctx context.Context, ref Address) bool
- Close()
-}
-
-// SyncChunkStore is a ChunkStore which supports syncing
-type SyncChunkStore interface {
- ChunkStore
- BinIndex(po uint8) uint64
- Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error
- FetchFunc(ctx context.Context, ref Address) func(context.Context) error
-}
+type ChunkStore = chunk.Store
// FakeChunkStore doesn't store anything, just implements the ChunkStore interface
// It can be used to inject into a hasherStore if you don't want to actually store data just do the
@@ -233,20 +218,33 @@ type FakeChunkStore struct {
}
// Put doesn't store anything it is just here to implement ChunkStore
-func (f *FakeChunkStore) Put(_ context.Context, ch Chunk) error {
- return nil
+func (f *FakeChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
+ return false, nil
}
// Has doesn't do anything it is just here to implement ChunkStore
-func (f *FakeChunkStore) Has(_ context.Context, ref Address) bool {
- panic("FakeChunkStore doesn't support HasChunk")
+func (f *FakeChunkStore) Has(_ context.Context, ref Address) (bool, error) {
+ panic("FakeChunkStore doesn't support Has")
}
// Get doesn't store anything it is just here to implement ChunkStore
-func (f *FakeChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
+func (f *FakeChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
panic("FakeChunkStore doesn't support Get")
}
+func (f *FakeChunkStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
+ panic("FakeChunkStore doesn't support Set")
+}
+
+func (f *FakeChunkStore) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
+ panic("FakeChunkStore doesn't support LastPullSubscriptionBinID")
+}
+
+func (f *FakeChunkStore) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
+ panic("FakeChunkStore doesn't support SubscribePull")
+}
+
// Close doesn't store anything it is just here to implement ChunkStore
-func (f *FakeChunkStore) Close() {
+func (f *FakeChunkStore) Close() error {
+ return nil
}