aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-02-08 01:40:26 +0800
committerRafael Matias <rafael@skyle.net>2019-02-19 20:06:14 +0800
commit333b1bfb6c95cf5453404f6117252aefff2c190f (patch)
tree8f773911c28a59a28e56fd5683ded88283d970b7
parentd1ace4f344616fb6fa8643872c1f9cac89f8549e (diff)
downloadgo-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.tar
go-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.tar.gz
go-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.tar.bz2
go-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.tar.lz
go-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.tar.xz
go-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.tar.zst
go-tangerine-333b1bfb6c95cf5453404f6117252aefff2c190f.zip
swarm/storage/localstore: new localstore package (#19015)
(cherry picked from commit 4f3d22f06c546f36487b33dfb6b5cb4df3ecf073)
-rw-r--r--swarm/storage/localstore/doc.go56
-rw-r--r--swarm/storage/localstore/gc.go302
-rw-r--r--swarm/storage/localstore/gc_test.go358
-rw-r--r--swarm/storage/localstore/index_test.go227
-rw-r--r--swarm/storage/localstore/localstore.go431
-rw-r--r--swarm/storage/localstore/localstore_test.go520
-rw-r--r--swarm/storage/localstore/mode_get.go154
-rw-r--r--swarm/storage/localstore/mode_get_test.go237
-rw-r--r--swarm/storage/localstore/mode_put.go160
-rw-r--r--swarm/storage/localstore/mode_put_test.go300
-rw-r--r--swarm/storage/localstore/mode_set.go205
-rw-r--r--swarm/storage/localstore/mode_set_test.go128
-rw-r--r--swarm/storage/localstore/retrieval_index_test.go150
-rw-r--r--swarm/storage/localstore/subscription_pull.go193
-rw-r--r--swarm/storage/localstore/subscription_pull_test.go478
-rw-r--r--swarm/storage/localstore/subscription_push.go145
-rw-r--r--swarm/storage/localstore/subscription_push_test.go200
17 files changed, 4244 insertions, 0 deletions
diff --git a/swarm/storage/localstore/doc.go b/swarm/storage/localstore/doc.go
new file mode 100644
index 000000000..98f6fc40a
--- /dev/null
+++ b/swarm/storage/localstore/doc.go
@@ -0,0 +1,56 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+Package localstore provides disk storage layer for Swarm Chunk persistence.
+It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB
+implementation.
+
+The main type is DB which manages the storage by providing methods to
+access and add Chunks and to manage their status.
+
+Modes are abstractions that do specific changes to Chunks. There are three
+mode types:
+
+ - ModeGet, for Chunk access
+ - ModePut, for adding Chunks to the database
+ - ModeSet, for changing Chunk statuses
+
+Every mode type has a corresponding type (Getter, Putter and Setter)
+that provides adequate method to perform the opperation and that type
+should be injected into localstore consumers instead the whole DB.
+This provides more clear insight which operations consumer is performing
+on the database.
+
+Getters, Putters and Setters accept different get, put and set modes
+to perform different actions. For example, ModeGet has two different
+variables ModeGetRequest and ModeGetSync and two different Getters
+can be constructed with them that are used when the chunk is requested
+or when the chunk is synced as this two events are differently changing
+the database.
+
+Subscription methods are implemented for a specific purpose of
+continuous iterations over Chunks that should be provided to
+Push and Pull syncing.
+
+DB implements an internal garbage collector that removes only synced
+Chunks from the database based on their most recent access time.
+
+Internally, DB stores Chunk data and any required information, such as
+store and access timestamps in different shed indexes that can be
+iterated on by garbage collector or subscriptions.
+*/
+package localstore
diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go
new file mode 100644
index 000000000..7718d1e58
--- /dev/null
+++ b/swarm/storage/localstore/gc.go
@@ -0,0 +1,302 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+Counting number of items in garbage collection index
+
+The number of items in garbage collection index is not the same as the number of
+chunks in retrieval index (total number of stored chunks). Chunk can be garbage
+collected only when it is set to a synced state by ModSetSync, and only then can
+be counted into garbage collection size, which determines whether a number of
+chunk should be removed from the storage by the garbage collection. This opens a
+possibility that the storage size exceeds the limit if files are locally
+uploaded and the node is not connected to other nodes or there is a problem with
+syncing.
+
+Tracking of garbage collection size (gcSize) is focused on performance. Key
+points:
+
+ 1. counting the number of key/value pairs in LevelDB takes around 0.7s for 1e6
+ on a very fast ssd (unacceptable long time in reality)
+ 2. locking leveldb batch writes with a global mutex (serial batch writes) is
+ not acceptable, we should use locking per chunk address
+
+Because of point 1. we cannot count the number of items in garbage collection
+index in New constructor as it could last very long for realistic scenarios
+where limit is 5e6 and nodes are running on slower hdd disks or cloud providers
+with low IOPS.
+
+Point 2. is a performance optimization to allow parallel batch writes with
+getters, putters and setters. Every single batch that they create contain only
+information related to a single chunk, no relations with other chunks or shared
+statistical data (like gcSize). This approach avoids race conditions on writing
+batches in parallel, but creates a problem of synchronizing statistical data
+values like gcSize. With global mutex lock, any data could be written by any
+batch, but would not use utilize the full potential of leveldb parallel writes.
+
+To mitigate this two problems, the implementation of counting and persisting
+gcSize is split into two parts. One is the in-memory value (gcSize) that is fast
+to read and write with a dedicated mutex (gcSizeMu) if the batch which adds or
+removes items from garbage collection index is successful. The second part is
+the reliable persistence of this value to leveldb database, as storedGCSize
+field. This database field is saved by writeGCSizeWorker and writeGCSize
+functions when in-memory gcSize variable is changed, but no too often to avoid
+very frequent database writes. This database writes are triggered by
+writeGCSizeTrigger when a call is made to function incGCSize. Trigger ensures
+that no database writes are done only when gcSize is changed (contrary to a
+simpler periodic writes or checks). A backoff of 10s in writeGCSizeWorker
+ensures that no frequent batch writes are made. Saving the storedGCSize on
+database Close function ensures that in-memory gcSize is persisted when database
+is closed.
+
+This persistence must be resilient to failures like panics. For this purpose, a
+collection of hashes that are added to the garbage collection index, but still
+not persisted to storedGCSize, must be tracked to count them in when DB is
+constructed again with New function after the failure (swarm node restarts). On
+every batch write that adds a new item to garbage collection index, the same
+hash is added to gcUncountedHashesIndex. This ensures that there is a persisted
+information which hashes were added to the garbage collection index. But, when
+the storedGCSize is saved by writeGCSize function, this values are removed in
+the same batch in which storedGCSize is changed to ensure consistency. When the
+panic happen, or database Close method is not saved. The database storage
+contains all information to reliably and efficiently get the correct number of
+items in garbage collection index. This is performed in the New function when
+all hashes in gcUncountedHashesIndex are counted, added to the storedGCSize and
+saved to the disk before the database is constructed again. Index
+gcUncountedHashesIndex is acting as dirty bit for recovery that provides
+information what needs to be corrected. With a simple dirty bit, the whole
+garbage collection index should me counted on recovery instead only the items in
+gcUncountedHashesIndex. Because of the triggering mechanizm of writeGCSizeWorker
+and relatively short backoff time, the number of hashes in
+gcUncountedHashesIndex should be low and it should take a very short time to
+recover from the previous failure. If there was no failure and
+gcUncountedHashesIndex is empty, which is the usual case, New function will take
+the minimal time to return.
+*/
+
+package localstore
+
+import (
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+var (
+ // gcTargetRatio defines the target number of items
+ // in garbage collection index that will not be removed
+ // on garbage collection. The target number of items
+ // is calculated by gcTarget function. This value must be
+ // in range (0,1]. For example, with 0.9 value,
+ // garbage collection will leave 90% of defined capacity
+ // in database after its run. This prevents frequent
+ // garbage collection runs.
+ gcTargetRatio = 0.9
+ // gcBatchSize limits the number of chunks in a single
+ // leveldb batch on garbage collection.
+ gcBatchSize int64 = 1000
+)
+
+// collectGarbageWorker is a long running function that waits for
+// collectGarbageTrigger channel to signal a garbage collection
+// run. GC run iterates on gcIndex and removes older items
+// form retrieval and other indexes.
+func (db *DB) collectGarbageWorker() {
+ for {
+ select {
+ case <-db.collectGarbageTrigger:
+ // run a single collect garbage run and
+ // if done is false, gcBatchSize is reached and
+ // another collect garbage run is needed
+ collectedCount, done, err := db.collectGarbage()
+ if err != nil {
+ log.Error("localstore collect garbage", "err", err)
+ }
+ // check if another gc run is needed
+ if !done {
+ db.triggerGarbageCollection()
+ }
+
+ if testHookCollectGarbage != nil {
+ testHookCollectGarbage(collectedCount)
+ }
+ case <-db.close:
+ return
+ }
+ }
+}
+
+// collectGarbage removes chunks from retrieval and other
+// indexes if maximal number of chunks in database is reached.
+// This function returns the number of removed chunks. If done
+// is false, another call to this function is needed to collect
+// the rest of the garbage as the batch size limit is reached.
+// This function is called in collectGarbageWorker.
+func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) {
+ batch := new(leveldb.Batch)
+ target := db.gcTarget()
+
+ done = true
+ err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ // protect parallel updates
+ unlock, err := db.lockAddr(item.Address)
+ if err != nil {
+ return false, err
+ }
+ defer unlock()
+
+ gcSize := db.getGCSize()
+ if gcSize-collectedCount <= target {
+ return true, nil
+ }
+ // delete from retrieve, pull, gc
+ db.retrievalDataIndex.DeleteInBatch(batch, item)
+ db.retrievalAccessIndex.DeleteInBatch(batch, item)
+ db.pullIndex.DeleteInBatch(batch, item)
+ db.gcIndex.DeleteInBatch(batch, item)
+ collectedCount++
+ if collectedCount >= gcBatchSize {
+ // bach size limit reached,
+ // another gc run is needed
+ done = false
+ return true, nil
+ }
+ return false, nil
+ }, nil)
+ if err != nil {
+ return 0, false, err
+ }
+
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return 0, false, err
+ }
+ // batch is written, decrement gcSize
+ db.incGCSize(-collectedCount)
+ return collectedCount, done, nil
+}
+
+// gcTrigger retruns the absolute value for garbage collection
+// target value, calculated from db.capacity and gcTargetRatio.
+func (db *DB) gcTarget() (target int64) {
+ return int64(float64(db.capacity) * gcTargetRatio)
+}
+
+// incGCSize increments gcSize by the provided number.
+// If count is negative, it will decrement gcSize.
+func (db *DB) incGCSize(count int64) {
+ if count == 0 {
+ return
+ }
+
+ db.gcSizeMu.Lock()
+ new := db.gcSize + count
+ db.gcSize = new
+ db.gcSizeMu.Unlock()
+
+ select {
+ case db.writeGCSizeTrigger <- struct{}{}:
+ default:
+ }
+ if new >= db.capacity {
+ db.triggerGarbageCollection()
+ }
+}
+
+// getGCSize returns gcSize value by locking it
+// with gcSizeMu mutex.
+func (db *DB) getGCSize() (count int64) {
+ db.gcSizeMu.RLock()
+ count = db.gcSize
+ db.gcSizeMu.RUnlock()
+ return count
+}
+
+// triggerGarbageCollection signals collectGarbageWorker
+// to call collectGarbage.
+func (db *DB) triggerGarbageCollection() {
+ select {
+ case db.collectGarbageTrigger <- struct{}{}:
+ case <-db.close:
+ default:
+ }
+}
+
+// writeGCSizeWorker writes gcSize on trigger event
+// and waits writeGCSizeDelay after each write.
+// It implements a linear backoff with delay of
+// writeGCSizeDelay duration to avoid very frequent
+// database operations.
+func (db *DB) writeGCSizeWorker() {
+ for {
+ select {
+ case <-db.writeGCSizeTrigger:
+ err := db.writeGCSize(db.getGCSize())
+ if err != nil {
+ log.Error("localstore write gc size", "err", err)
+ }
+ // Wait some time before writing gc size in the next
+ // iteration. This prevents frequent I/O operations.
+ select {
+ case <-time.After(10 * time.Second):
+ case <-db.close:
+ return
+ }
+ case <-db.close:
+ return
+ }
+ }
+}
+
+// writeGCSize stores the number of items in gcIndex.
+// It removes all hashes from gcUncountedHashesIndex
+// not to include them on the next DB initialization
+// (New function) when gcSize is counted.
+func (db *DB) writeGCSize(gcSize int64) (err error) {
+ const maxBatchSize = 1000
+
+ batch := new(leveldb.Batch)
+ db.storedGCSize.PutInBatch(batch, uint64(gcSize))
+ batchSize := 1
+
+ // use only one iterator as it acquires its snapshot
+ // not to remove hashes from index that are added
+ // after stored gc size is written
+ err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
+ batchSize++
+ if batchSize >= maxBatchSize {
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return false, err
+ }
+ batch.Reset()
+ batchSize = 0
+ }
+ return false, nil
+ }, nil)
+ if err != nil {
+ return err
+ }
+ return db.shed.WriteBatch(batch)
+}
+
+// testHookCollectGarbage is a hook that can provide
+// information when a garbage collection run is done
+// and how many items it removed.
+var testHookCollectGarbage func(collectedCount int64)
diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go
new file mode 100644
index 000000000..eb039a554
--- /dev/null
+++ b/swarm/storage/localstore/gc_test.go
@@ -0,0 +1,358 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TestDB_collectGarbageWorker tests garbage collection runs
+// by uploading and syncing a number of chunks.
+func TestDB_collectGarbageWorker(t *testing.T) {
+ testDB_collectGarbageWorker(t)
+}
+
+// TestDB_collectGarbageWorker_multipleBatches tests garbage
+// collection runs by uploading and syncing a number of
+// chunks by having multiple smaller batches.
+func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
+ // lower the maximal number of chunks in a single
+ // gc batch to ensure multiple batches.
+ defer func(s int64) { gcBatchSize = s }(gcBatchSize)
+ gcBatchSize = 2
+
+ testDB_collectGarbageWorker(t)
+}
+
+// testDB_collectGarbageWorker is a helper test function to test
+// garbage collection runs by uploading and syncing a number of chunks.
+func testDB_collectGarbageWorker(t *testing.T) {
+ chunkCount := 150
+
+ testHookCollectGarbageChan := make(chan int64)
+ defer setTestHookCollectGarbage(func(collectedCount int64) {
+ testHookCollectGarbageChan <- collectedCount
+ })()
+
+ db, cleanupFunc := newTestDB(t, &Options{
+ Capacity: 100,
+ })
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+ syncer := db.NewSetter(ModeSetSync)
+
+ addrs := make([]storage.Address, 0)
+
+ // upload random chunks
+ for i := 0; i < chunkCount; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = syncer.Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ addrs = append(addrs, chunk.Address())
+ }
+
+ gcTarget := db.gcTarget()
+
+ for {
+ select {
+ case <-testHookCollectGarbageChan:
+ case <-time.After(10 * time.Second):
+ t.Error("collect garbage timeout")
+ }
+ gcSize := db.getGCSize()
+ if gcSize == gcTarget {
+ break
+ }
+ }
+
+ t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+
+ // the first synced chunk should be removed
+ t.Run("get the first synced chunk", func(t *testing.T) {
+ _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ if err != storage.ErrChunkNotFound {
+ t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound)
+ }
+ })
+
+ // last synced chunk should not be removed
+ t.Run("get most recent synced chunk", func(t *testing.T) {
+ _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+
+ // cleanup: drain the last testHookCollectGarbageChan
+ // element before calling deferred functions not to block
+ // collectGarbageWorker loop, preventing the race in
+ // setting testHookCollectGarbage function
+ select {
+ case <-testHookCollectGarbageChan:
+ default:
+ }
+}
+
+// TestDB_collectGarbageWorker_withRequests is a helper test function
+// to test garbage collection runs by uploading, syncing and
+// requesting a number of chunks.
+func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, &Options{
+ Capacity: 100,
+ })
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+ syncer := db.NewSetter(ModeSetSync)
+
+ testHookCollectGarbageChan := make(chan int64)
+ defer setTestHookCollectGarbage(func(collectedCount int64) {
+ testHookCollectGarbageChan <- collectedCount
+ })()
+
+ addrs := make([]storage.Address, 0)
+
+ // upload random chunks just up to the capacity
+ for i := 0; i < int(db.capacity)-1; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = syncer.Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ addrs = append(addrs, chunk.Address())
+ }
+
+ // request the latest synced chunk
+ // to prioritize it in the gc index
+ // not to be collected
+ _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // upload and sync another chunk to trigger
+ // garbage collection
+ chunk := generateRandomChunk()
+ err = uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = syncer.Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ addrs = append(addrs, chunk.Address())
+
+ // wait for garbage collection
+
+ gcTarget := db.gcTarget()
+
+ var totalCollectedCount int64
+ for {
+ select {
+ case c := <-testHookCollectGarbageChan:
+ totalCollectedCount += c
+ case <-time.After(10 * time.Second):
+ t.Error("collect garbage timeout")
+ }
+ gcSize := db.getGCSize()
+ if gcSize == gcTarget {
+ break
+ }
+ }
+
+ wantTotalCollectedCount := int64(len(addrs)) - gcTarget
+ if totalCollectedCount != wantTotalCollectedCount {
+ t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount)
+ }
+
+ t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+
+ // requested chunk should not be removed
+ t.Run("get requested chunk", func(t *testing.T) {
+ _, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+
+ // the second synced chunk should be removed
+ t.Run("get gc-ed chunk", func(t *testing.T) {
+ _, err := db.NewGetter(ModeGetRequest).Get(addrs[1])
+ if err != storage.ErrChunkNotFound {
+ t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound)
+ }
+ })
+
+ // last synced chunk should not be removed
+ t.Run("get most recent synced chunk", func(t *testing.T) {
+ _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+}
+
+// TestDB_gcSize checks if gcSize has a correct value after
+// database is initialized with existing data.
+func TestDB_gcSize(t *testing.T) {
+ dir, err := ioutil.TempDir("", "localstore-stored-gc-size")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+ baseKey := make([]byte, 32)
+ if _, err := rand.Read(baseKey); err != nil {
+ t.Fatal(err)
+ }
+ db, err := New(dir, baseKey, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ uploader := db.NewPutter(ModePutUpload)
+ syncer := db.NewSetter(ModeSetSync)
+
+ count := 100
+
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = syncer.Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // DB.Close writes gc size to disk, so
+ // Instead calling Close, simulate database shutdown
+ // without it.
+ close(db.close)
+ db.updateGCWG.Wait()
+ err = db.shed.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ db, err = New(dir, baseKey, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("gc index size", newIndexGCSizeTest(db))
+
+ t.Run("gc uncounted hashes index count", newItemsCountTest(db.gcUncountedHashesIndex, 0))
+}
+
+// setTestHookCollectGarbage sets testHookCollectGarbage and
+// returns a function that will reset it to the
+// value before the change.
+func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) {
+ current := testHookCollectGarbage
+ reset = func() { testHookCollectGarbage = current }
+ testHookCollectGarbage = h
+ return reset
+}
+
+// TestSetTestHookCollectGarbage tests if setTestHookCollectGarbage changes
+// testHookCollectGarbage function correctly and if its reset function
+// resets the original function.
+func TestSetTestHookCollectGarbage(t *testing.T) {
+ // Set the current function after the test finishes.
+ defer func(h func(collectedCount int64)) { testHookCollectGarbage = h }(testHookCollectGarbage)
+
+ // expected value for the unchanged function
+ original := 1
+ // expected value for the changed function
+ changed := 2
+
+ // this variable will be set with two different functions
+ var got int
+
+ // define the original (unchanged) functions
+ testHookCollectGarbage = func(_ int64) {
+ got = original
+ }
+
+ // set got variable
+ testHookCollectGarbage(0)
+
+ // test if got variable is set correctly
+ if got != original {
+ t.Errorf("got hook value %v, want %v", got, original)
+ }
+
+ // set the new function
+ reset := setTestHookCollectGarbage(func(_ int64) {
+ got = changed
+ })
+
+ // set got variable
+ testHookCollectGarbage(0)
+
+ // test if got variable is set correctly to changed value
+ if got != changed {
+ t.Errorf("got hook value %v, want %v", got, changed)
+ }
+
+ // set the function to the original one
+ reset()
+
+ // set got variable
+ testHookCollectGarbage(0)
+
+ // test if got variable is set correctly to original value
+ if got != original {
+ t.Errorf("got hook value %v, want %v", got, original)
+ }
+}
diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go
new file mode 100644
index 000000000..d9abf440f
--- /dev/null
+++ b/swarm/storage/localstore/index_test.go
@@ -0,0 +1,227 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "math/rand"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TestDB_pullIndex validates the ordering of keys in pull index.
+// Pull index key contains PO prefix which is calculated from
+// DB base key and chunk address. This is not an Item field
+// which are checked in Mode tests.
+// This test uploads chunks, sorts them in expected order and
+// validates that pull index iterator will iterate it the same
+// order.
+func TestDB_pullIndex(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ chunkCount := 50
+
+ chunks := make([]testIndexChunk, chunkCount)
+
+ // upload random chunks
+ for i := 0; i < chunkCount; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ chunks[i] = testIndexChunk{
+ Chunk: chunk,
+ // this timestamp is not the same as in
+ // the index, but given that uploads
+ // are sequential and that only ordering
+ // of events matter, this information is
+ // sufficient
+ storeTimestamp: now(),
+ }
+ }
+
+ testItemsOrder(t, db.pullIndex, chunks, func(i, j int) (less bool) {
+ poi := storage.Proximity(db.baseKey, chunks[i].Address())
+ poj := storage.Proximity(db.baseKey, chunks[j].Address())
+ if poi < poj {
+ return true
+ }
+ if poi > poj {
+ return false
+ }
+ if chunks[i].storeTimestamp < chunks[j].storeTimestamp {
+ return true
+ }
+ if chunks[i].storeTimestamp > chunks[j].storeTimestamp {
+ return false
+ }
+ return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
+ })
+}
+
+// TestDB_gcIndex validates garbage collection index by uploading
+// a chunk with and performing operations using synced, access and
+// request modes.
+func TestDB_gcIndex(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ chunkCount := 50
+
+ chunks := make([]testIndexChunk, chunkCount)
+
+ // upload random chunks
+ for i := 0; i < chunkCount; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ chunks[i] = testIndexChunk{
+ Chunk: chunk,
+ }
+ }
+
+ // check if all chunks are stored
+ newItemsCountTest(db.pullIndex, chunkCount)(t)
+
+ // check that chunks are not collectable for garbage
+ newItemsCountTest(db.gcIndex, 0)(t)
+
+ // set update gc test hook to signal when
+ // update gc goroutine is done by sending to
+ // testHookUpdateGCChan channel, which is
+ // used to wait for indexes change verifications
+ testHookUpdateGCChan := make(chan struct{})
+ defer setTestHookUpdateGC(func() {
+ testHookUpdateGCChan <- struct{}{}
+ })()
+
+ t.Run("request unsynced", func(t *testing.T) {
+ chunk := chunks[1]
+
+ _, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+
+ // the chunk is not synced
+ // should not be in the garbace collection index
+ newItemsCountTest(db.gcIndex, 0)(t)
+
+ newIndexGCSizeTest(db)(t)
+ })
+
+ t.Run("sync one chunk", func(t *testing.T) {
+ chunk := chunks[0]
+
+ err := db.NewSetter(ModeSetSync).Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // the chunk is synced and should be in gc index
+ newItemsCountTest(db.gcIndex, 1)(t)
+
+ newIndexGCSizeTest(db)(t)
+ })
+
+ t.Run("sync all chunks", func(t *testing.T) {
+ setter := db.NewSetter(ModeSetSync)
+
+ for i := range chunks {
+ err := setter.Set(chunks[i].Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ testItemsOrder(t, db.gcIndex, chunks, nil)
+
+ newIndexGCSizeTest(db)(t)
+ })
+
+ t.Run("request one chunk", func(t *testing.T) {
+ i := 6
+
+ _, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+
+ // move the chunk to the end of the expected gc
+ c := chunks[i]
+ chunks = append(chunks[:i], chunks[i+1:]...)
+ chunks = append(chunks, c)
+
+ testItemsOrder(t, db.gcIndex, chunks, nil)
+
+ newIndexGCSizeTest(db)(t)
+ })
+
+ t.Run("random chunk request", func(t *testing.T) {
+ requester := db.NewGetter(ModeGetRequest)
+
+ rand.Shuffle(len(chunks), func(i, j int) {
+ chunks[i], chunks[j] = chunks[j], chunks[i]
+ })
+
+ for _, chunk := range chunks {
+ _, err := requester.Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+ }
+
+ testItemsOrder(t, db.gcIndex, chunks, nil)
+
+ newIndexGCSizeTest(db)(t)
+ })
+
+ t.Run("remove one chunk", func(t *testing.T) {
+ i := 3
+
+ err := db.NewSetter(modeSetRemove).Set(chunks[i].Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove the chunk from the expected chunks in gc index
+ chunks = append(chunks[:i], chunks[i+1:]...)
+
+ testItemsOrder(t, db.gcIndex, chunks, nil)
+
+ newIndexGCSizeTest(db)(t)
+ })
+}
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
new file mode 100644
index 000000000..7a9fb54f5
--- /dev/null
+++ b/swarm/storage/localstore/localstore.go
@@ -0,0 +1,431 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "encoding/binary"
+ "encoding/hex"
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
+)
+
+var (
+ // ErrInvalidMode is retuned when an unknown Mode
+ // is provided to the function.
+ ErrInvalidMode = errors.New("invalid mode")
+ // ErrAddressLockTimeout is returned when the same chunk
+ // is updated in parallel and one of the updates
+ // takes longer then the configured timeout duration.
+ ErrAddressLockTimeout = errors.New("address lock timeout")
+)
+
+var (
+ // Default value for Capacity DB option.
+ defaultCapacity int64 = 5000000
+ // Limit the number of goroutines created by Getters
+ // that call updateGC function. Value 0 sets no limit.
+ maxParallelUpdateGC = 1000
+)
+
+// DB is the local store implementation and holds
+// database related objects.
+type DB struct {
+ shed *shed.DB
+
+ // schema name of loaded data
+ schemaName shed.StringField
+ // field that stores number of intems in gc index
+ storedGCSize shed.Uint64Field
+
+ // retrieval indexes
+ retrievalDataIndex shed.Index
+ retrievalAccessIndex shed.Index
+ // push syncing index
+ pushIndex shed.Index
+ // push syncing subscriptions triggers
+ pushTriggers []chan struct{}
+ pushTriggersMu sync.RWMutex
+
+ // pull syncing index
+ pullIndex shed.Index
+ // pull syncing subscriptions triggers per bin
+ pullTriggers map[uint8][]chan struct{}
+ pullTriggersMu sync.RWMutex
+
+ // garbage collection index
+ gcIndex shed.Index
+ // index that stores hashes that are not
+ // counted in and saved to storedGCSize
+ gcUncountedHashesIndex shed.Index
+
+ // number of elements in garbage collection index
+ // it must be always read by getGCSize and
+ // set with incGCSize which are locking gcSizeMu
+ gcSize int64
+ gcSizeMu sync.RWMutex
+ // garbage collection is triggered when gcSize exceeds
+ // the capacity value
+ capacity int64
+
+ // triggers garbage collection event loop
+ collectGarbageTrigger chan struct{}
+ // triggers write gc size event loop
+ writeGCSizeTrigger chan struct{}
+
+ // a buffered channel acting as a semaphore
+ // to limit the maximal number of goroutines
+ // created by Getters to call updateGC function
+ updateGCSem chan struct{}
+ // a wait group to ensure all updateGC goroutines
+ // are done before closing the database
+ updateGCWG sync.WaitGroup
+
+ baseKey []byte
+
+ addressLocks sync.Map
+
+ // this channel is closed when close function is called
+ // to terminate other goroutines
+ close chan struct{}
+}
+
+// Options struct holds optional parameters for configuring DB.
+type Options struct {
+ // MockStore is a mock node store that is used to store
+ // chunk data in a central store. It can be used to reduce
+ // total storage space requirements in testing large number
+ // of swarm nodes with chunk data deduplication provided by
+ // the mock global store.
+ MockStore *mock.NodeStore
+ // Capacity is a limit that triggers garbage collection when
+ // number of items in gcIndex equals or exceeds it.
+ Capacity int64
+ // MetricsPrefix defines a prefix for metrics names.
+ MetricsPrefix string
+}
+
+// New returns a new DB. All fields and indexes are initialized
+// and possible conflicts with schema from existing database is checked.
+// One goroutine for writing batches is created.
+func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
+ if o == nil {
+ o = new(Options)
+ }
+ db = &DB{
+ capacity: o.Capacity,
+ baseKey: baseKey,
+ // channels collectGarbageTrigger and writeGCSizeTrigger
+ // need to be buffered with the size of 1
+ // to signal another event if it
+ // is triggered during already running function
+ collectGarbageTrigger: make(chan struct{}, 1),
+ writeGCSizeTrigger: make(chan struct{}, 1),
+ close: make(chan struct{}),
+ }
+ if db.capacity <= 0 {
+ db.capacity = defaultCapacity
+ }
+ if maxParallelUpdateGC > 0 {
+ db.updateGCSem = make(chan struct{}, maxParallelUpdateGC)
+ }
+
+ db.shed, err = shed.NewDB(path, o.MetricsPrefix)
+ if err != nil {
+ return nil, err
+ }
+ // Identify current storage schema by arbitrary name.
+ db.schemaName, err = db.shed.NewStringField("schema-name")
+ if err != nil {
+ return nil, err
+ }
+ // Persist gc size.
+ db.storedGCSize, err = db.shed.NewUint64Field("gc-size")
+ if err != nil {
+ return nil, err
+ }
+ // Functions for retrieval data index.
+ var (
+ encodeValueFunc func(fields shed.Item) (value []byte, err error)
+ decodeValueFunc func(keyItem shed.Item, value []byte) (e shed.Item, err error)
+ )
+ if o.MockStore != nil {
+ encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ err = o.MockStore.Put(fields.Address, fields.Data)
+ if err != nil {
+ return nil, err
+ }
+ return b, nil
+ }
+ decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
+ e.Data, err = o.MockStore.Get(keyItem.Address)
+ return e, err
+ }
+ } else {
+ encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ value = append(b, fields.Data...)
+ return value, nil
+ }
+ decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
+ e.Data = value[8:]
+ return e, nil
+ }
+ }
+ // Index storing actual chunk address, data and store timestamp.
+ db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
+ return fields.Address, nil
+ },
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
+ e.Address = key
+ return e, nil
+ },
+ EncodeValue: encodeValueFunc,
+ DecodeValue: decodeValueFunc,
+ })
+ if err != nil {
+ return nil, err
+ }
+ // Index storing access timestamp for a particular address.
+ // It is needed in order to update gc index keys for iteration order.
+ db.retrievalAccessIndex, err = db.shed.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
+ return fields.Address, nil
+ },
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
+ e.Address = key
+ return e, nil
+ },
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
+ return b, nil
+ },
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
+ return e, nil
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ // pull index allows history and live syncing per po bin
+ db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
+ key = make([]byte, 41)
+ key[0] = db.po(fields.Address)
+ binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp))
+ copy(key[9:], fields.Address[:])
+ return key, nil
+ },
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
+ e.Address = key[9:]
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9]))
+ return e, nil
+ },
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
+ return nil, nil
+ },
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ return e, nil
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ // create a pull syncing triggers used by SubscribePull function
+ db.pullTriggers = make(map[uint8][]chan struct{})
+ // push index contains as yet unsynced chunks
+ db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
+ key = make([]byte, 40)
+ binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
+ copy(key[8:], fields.Address[:])
+ return key, nil
+ },
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
+ e.Address = key[8:]
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
+ return e, nil
+ },
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
+ return nil, nil
+ },
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ return e, nil
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ // create a push syncing triggers used by SubscribePush function
+ db.pushTriggers = make([]chan struct{}, 0)
+ // gc index for removable chunk ordered by ascending last access time
+ db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
+ b := make([]byte, 16, 16+len(fields.Address))
+ binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
+ key = append(b, fields.Address...)
+ return key, nil
+ },
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
+ e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
+ e.Address = key[16:]
+ return e, nil
+ },
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
+ return nil, nil
+ },
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ return e, nil
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ // gc uncounted hashes index keeps hashes that are in gc index
+ // but not counted in and saved to storedGCSize
+ db.gcUncountedHashesIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
+ return fields.Address, nil
+ },
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
+ e.Address = key
+ return e, nil
+ },
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
+ return nil, nil
+ },
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ return e, nil
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // count number of elements in garbage collection index
+ gcSize, err := db.storedGCSize.Get()
+ if err != nil {
+ return nil, err
+ }
+ // get number of uncounted hashes
+ gcUncountedSize, err := db.gcUncountedHashesIndex.Count()
+ if err != nil {
+ return nil, err
+ }
+ gcSize += uint64(gcUncountedSize)
+ // remove uncounted hashes from the index and
+ // save the total gcSize after uncounted hashes are removed
+ err = db.writeGCSize(int64(gcSize))
+ if err != nil {
+ return nil, err
+ }
+ db.incGCSize(int64(gcSize))
+
+ // start worker to write gc size
+ go db.writeGCSizeWorker()
+ // start garbage collection worker
+ go db.collectGarbageWorker()
+ return db, nil
+}
+
+// Close closes the underlying database.
+func (db *DB) Close() (err error) {
+ close(db.close)
+ db.updateGCWG.Wait()
+ if err := db.writeGCSize(db.getGCSize()); err != nil {
+ log.Error("localstore: write gc size", "err", err)
+ }
+ return db.shed.Close()
+}
+
+// po computes the proximity order between the address
+// and database base key.
+func (db *DB) po(addr storage.Address) (bin uint8) {
+ return uint8(storage.Proximity(db.baseKey, addr))
+}
+
+var (
+ // Maximal time for lockAddr to wait until it
+ // returns error.
+ addressLockTimeout = 3 * time.Second
+ // duration between two lock checks in lockAddr.
+ addressLockCheckDelay = 30 * time.Microsecond
+)
+
+// lockAddr sets the lock on a particular address
+// using addressLocks sync.Map and returns unlock function.
+// If the address is locked this function will check it
+// in a for loop for addressLockTimeout time, after which
+// it will return ErrAddressLockTimeout error.
+func (db *DB) lockAddr(addr storage.Address) (unlock func(), err error) {
+ start := time.Now()
+ lockKey := hex.EncodeToString(addr)
+ for {
+ _, loaded := db.addressLocks.LoadOrStore(lockKey, struct{}{})
+ if !loaded {
+ break
+ }
+ time.Sleep(addressLockCheckDelay)
+ if time.Since(start) > addressLockTimeout {
+ return nil, ErrAddressLockTimeout
+ }
+ }
+ return func() { db.addressLocks.Delete(lockKey) }, nil
+}
+
+// chunkToItem creates new Item with data provided by the Chunk.
+func chunkToItem(ch storage.Chunk) shed.Item {
+ return shed.Item{
+ Address: ch.Address(),
+ Data: ch.Data(),
+ }
+}
+
+// addressToItem creates new Item with a provided address.
+func addressToItem(addr storage.Address) shed.Item {
+ return shed.Item{
+ Address: addr,
+ }
+}
+
+// now is a helper function that returns a current unix timestamp
+// in UTC timezone.
+// It is set in the init function for usage in production, and
+// optionally overridden in tests for data validation.
+var now func() int64
+
+func init() {
+ // set the now function
+ now = func() (t int64) {
+ return time.Now().UTC().UnixNano()
+ }
+}
diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go
new file mode 100644
index 000000000..c7309d3cd
--- /dev/null
+++ b/swarm/storage/localstore/localstore_test.go
@@ -0,0 +1,520 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "sort"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// TestDB validates if the chunk can be uploaded and
+// correctly retrieved.
+func TestDB(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ got, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !bytes.Equal(got.Address(), chunk.Address()) {
+ t.Errorf("got address %x, want %x", got.Address(), chunk.Address())
+ }
+ if !bytes.Equal(got.Data(), chunk.Data()) {
+ t.Errorf("got data %x, want %x", got.Data(), chunk.Data())
+ }
+}
+
+// TestDB_updateGCSem tests maxParallelUpdateGC limit.
+// This test temporary sets the limit to a low number,
+// makes updateGC function execution time longer by
+// setting a custom testHookUpdateGC function with a sleep
+// and a count current and maximal number of goroutines.
+func TestDB_updateGCSem(t *testing.T) {
+ updateGCSleep := time.Second
+ var count int
+ var max int
+ var mu sync.Mutex
+ defer setTestHookUpdateGC(func() {
+ mu.Lock()
+ // add to the count of current goroutines
+ count++
+ if count > max {
+ // set maximal detected numbers of goroutines
+ max = count
+ }
+ mu.Unlock()
+
+ // wait for some time to ensure multiple parallel goroutines
+ time.Sleep(updateGCSleep)
+
+ mu.Lock()
+ count--
+ mu.Unlock()
+ })()
+
+ defer func(m int) { maxParallelUpdateGC = m }(maxParallelUpdateGC)
+ maxParallelUpdateGC = 3
+
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ getter := db.NewGetter(ModeGetRequest)
+
+ // get more chunks then maxParallelUpdateGC
+ // in time shorter then updateGCSleep
+ for i := 0; i < 5; i++ {
+ _, err = getter.Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ if max != maxParallelUpdateGC {
+ t.Errorf("got max %v, want %v", max, maxParallelUpdateGC)
+ }
+}
+
+// BenchmarkNew measures the time that New function
+// needs to initialize and count the number of key/value
+// pairs in GC index.
+// This benchmark generates a number of chunks, uploads them,
+// sets them to synced state for them to enter the GC index,
+// and measures the execution time of New function by creating
+// new databases with the same data directory.
+//
+// This benchmark takes significant amount of time.
+//
+// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show
+// that New function executes around 1s for database with 1M chunks.
+//
+// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkNew -v -timeout 20m
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
+// BenchmarkNew/1000-8 200 11672414 ns/op 9570960 B/op 10008 allocs/op
+// BenchmarkNew/10000-8 100 14890609 ns/op 10490118 B/op 7759 allocs/op
+// BenchmarkNew/100000-8 20 58334080 ns/op 17763157 B/op 22978 allocs/op
+// BenchmarkNew/1000000-8 2 748595153 ns/op 45297404 B/op 253242 allocs/op
+// PASS
+func BenchmarkNew(b *testing.B) {
+ if testing.Short() {
+ b.Skip("skipping benchmark in short mode")
+ }
+ for _, count := range []int{
+ 1000,
+ 10000,
+ 100000,
+ 1000000,
+ } {
+ b.Run(strconv.Itoa(count), func(b *testing.B) {
+ dir, err := ioutil.TempDir("", "localstore-new-benchmark")
+ if err != nil {
+ b.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+ baseKey := make([]byte, 32)
+ if _, err := rand.Read(baseKey); err != nil {
+ b.Fatal(err)
+ }
+ db, err := New(dir, baseKey, nil)
+ if err != nil {
+ b.Fatal(err)
+ }
+ uploader := db.NewPutter(ModePutUpload)
+ syncer := db.NewSetter(ModeSetSync)
+ for i := 0; i < count; i++ {
+ chunk := generateFakeRandomChunk()
+ err := uploader.Put(chunk)
+ if err != nil {
+ b.Fatal(err)
+ }
+ err = syncer.Set(chunk.Address())
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+ err = db.Close()
+ if err != nil {
+ b.Fatal(err)
+ }
+ b.ResetTimer()
+
+ for n := 0; n < b.N; n++ {
+ b.StartTimer()
+ db, err := New(dir, baseKey, nil)
+ b.StopTimer()
+
+ if err != nil {
+ b.Fatal(err)
+ }
+ err = db.Close()
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+ }
+}
+
+// newTestDB is a helper function that constructs a
+// temporary database and returns a cleanup function that must
+// be called to remove the data.
+func newTestDB(t testing.TB, o *Options) (db *DB, cleanupFunc func()) {
+ t.Helper()
+
+ dir, err := ioutil.TempDir("", "localstore-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ cleanupFunc = func() { os.RemoveAll(dir) }
+ baseKey := make([]byte, 32)
+ if _, err := rand.Read(baseKey); err != nil {
+ t.Fatal(err)
+ }
+ db, err = New(dir, baseKey, o)
+ if err != nil {
+ cleanupFunc()
+ t.Fatal(err)
+ }
+ cleanupFunc = func() {
+ err := db.Close()
+ if err != nil {
+ t.Error(err)
+ }
+ os.RemoveAll(dir)
+ }
+ return db, cleanupFunc
+}
+
+// generateRandomChunk generates a valid Chunk with
+// data size of default chunk size.
+func generateRandomChunk() storage.Chunk {
+ return storage.GenerateRandomChunk(ch.DefaultSize)
+}
+
+func init() {
+ // needed for generateFakeRandomChunk
+ rand.Seed(time.Now().UnixNano())
+}
+
+// generateFakeRandomChunk generates a Chunk that is not
+// valid, but it contains a random key and a random value.
+// This function is faster then storage.GenerateRandomChunk
+// which generates a valid chunk.
+// Some tests in this package do not need valid chunks, just
+// random data, and their execution time can be decreased
+// using this function.
+func generateFakeRandomChunk() storage.Chunk {
+ data := make([]byte, ch.DefaultSize)
+ rand.Read(data)
+ key := make([]byte, 32)
+ rand.Read(key)
+ return storage.NewChunk(key, data)
+}
+
+// TestGenerateFakeRandomChunk validates that
+// generateFakeRandomChunk returns random data by comparing
+// two generated chunks.
+func TestGenerateFakeRandomChunk(t *testing.T) {
+ c1 := generateFakeRandomChunk()
+ c2 := generateFakeRandomChunk()
+ addrLen := len(c1.Address())
+ if addrLen != 32 {
+ t.Errorf("first chunk address length %v, want %v", addrLen, 32)
+ }
+ dataLen := len(c1.Data())
+ if dataLen != ch.DefaultSize {
+ t.Errorf("first chunk data length %v, want %v", dataLen, ch.DefaultSize)
+ }
+ addrLen = len(c2.Address())
+ if addrLen != 32 {
+ t.Errorf("second chunk address length %v, want %v", addrLen, 32)
+ }
+ dataLen = len(c2.Data())
+ if dataLen != ch.DefaultSize {
+ t.Errorf("second chunk data length %v, want %v", dataLen, ch.DefaultSize)
+ }
+ if bytes.Equal(c1.Address(), c2.Address()) {
+ t.Error("fake chunks addresses do not differ")
+ }
+ if bytes.Equal(c1.Data(), c2.Data()) {
+ t.Error("fake chunks data bytes do not differ")
+ }
+}
+
+// newRetrieveIndexesTest returns a test function that validates if the right
+// chunk values are in the retrieval indexes.
+func newRetrieveIndexesTest(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
+
+ // access index should not be set
+ wantErr := leveldb.ErrNotFound
+ item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ if err != wantErr {
+ t.Errorf("got error %v, want %v", err, wantErr)
+ }
+ }
+}
+
+// newRetrieveIndexesTestWithAccess returns a test function that validates if the right
+// chunk values are in the retrieval indexes when access time must be stored.
+func newRetrieveIndexesTestWithAccess(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
+
+ if accessTimestamp > 0 {
+ item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
+ }
+ }
+}
+
+// newPullIndexTest returns a test function that validates if the right
+// chunk values are in the pull index.
+func newPullIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.pullIndex.Get(shed.Item{
+ Address: chunk.Address(),
+ StoreTimestamp: storeTimestamp,
+ })
+ if err != wantError {
+ t.Errorf("got error %v, want %v", err, wantError)
+ }
+ if err == nil {
+ validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
+ }
+ }
+}
+
+// newPushIndexTest returns a test function that validates if the right
+// chunk values are in the push index.
+func newPushIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.pushIndex.Get(shed.Item{
+ Address: chunk.Address(),
+ StoreTimestamp: storeTimestamp,
+ })
+ if err != wantError {
+ t.Errorf("got error %v, want %v", err, wantError)
+ }
+ if err == nil {
+ validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
+ }
+ }
+}
+
+// newGCIndexTest returns a test function that validates if the right
+// chunk values are in the push index.
+func newGCIndexTest(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.gcIndex.Get(shed.Item{
+ Address: chunk.Address(),
+ StoreTimestamp: storeTimestamp,
+ AccessTimestamp: accessTimestamp,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, chunk.Address(), nil, storeTimestamp, accessTimestamp)
+ }
+}
+
+// newItemsCountTest returns a test function that validates if
+// an index contains expected number of key/value pairs.
+func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
+ return func(t *testing.T) {
+ var c int
+ err := i.Iterate(func(item shed.Item) (stop bool, err error) {
+ c++
+ return
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != want {
+ t.Errorf("got %v items in index, want %v", c, want)
+ }
+ }
+}
+
+// newIndexGCSizeTest retruns a test function that validates if DB.gcSize
+// value is the same as the number of items in DB.gcIndex.
+func newIndexGCSizeTest(db *DB) func(t *testing.T) {
+ return func(t *testing.T) {
+ var want int64
+ err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ want++
+ return
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got := db.getGCSize()
+ if got != want {
+ t.Errorf("got gc size %v, want %v", got, want)
+ }
+ }
+}
+
+// testIndexChunk embeds storageChunk with additional data that is stored
+// in database. It is used for index values validations.
+type testIndexChunk struct {
+ storage.Chunk
+ storeTimestamp int64
+}
+
+// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
+// chunks will be sorted with it before validation.
+func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFunc func(i, j int) (less bool)) {
+ newItemsCountTest(i, len(chunks))(t)
+
+ if sortFunc != nil {
+ sort.Slice(chunks, sortFunc)
+ }
+
+ var cursor int
+ err := i.Iterate(func(item shed.Item) (stop bool, err error) {
+ want := chunks[cursor].Address()
+ got := item.Address
+ if !bytes.Equal(got, want) {
+ return true, fmt.Errorf("got address %x at position %v, want %x", got, cursor, want)
+ }
+ cursor++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+// validateItem is a helper function that checks Item values.
+func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64) {
+ t.Helper()
+
+ if !bytes.Equal(item.Address, address) {
+ t.Errorf("got item address %x, want %x", item.Address, address)
+ }
+ if !bytes.Equal(item.Data, data) {
+ t.Errorf("got item data %x, want %x", item.Data, data)
+ }
+ if item.StoreTimestamp != storeTimestamp {
+ t.Errorf("got item store timestamp %v, want %v", item.StoreTimestamp, storeTimestamp)
+ }
+ if item.AccessTimestamp != accessTimestamp {
+ t.Errorf("got item access timestamp %v, want %v", item.AccessTimestamp, accessTimestamp)
+ }
+}
+
+// setNow replaces now function and
+// returns a function that will reset it to the
+// value before the change.
+func setNow(f func() int64) (reset func()) {
+ current := now
+ reset = func() { now = current }
+ now = f
+ return reset
+}
+
+// TestSetNow tests if setNow function changes now function
+// correctly and if its reset function resets the original function.
+func TestSetNow(t *testing.T) {
+ // set the current function after the test finishes
+ defer func(f func() int64) { now = f }(now)
+
+ // expected value for the unchanged function
+ var original int64 = 1
+ // expected value for the changed function
+ var changed int64 = 2
+
+ // define the original (unchanged) functions
+ now = func() int64 {
+ return original
+ }
+
+ // get the time
+ got := now()
+
+ // test if got variable is set correctly
+ if got != original {
+ t.Errorf("got now value %v, want %v", got, original)
+ }
+
+ // set the new function
+ reset := setNow(func() int64 {
+ return changed
+ })
+
+ // get the time
+ got = now()
+
+ // test if got variable is set correctly to changed value
+ if got != changed {
+ t.Errorf("got hook value %v, want %v", got, changed)
+ }
+
+ // set the function to the original one
+ reset()
+
+ // get the time
+ got = now()
+
+ // test if got variable is set correctly to original value
+ if got != original {
+ t.Errorf("got hook value %v, want %v", got, original)
+ }
+}
diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go
new file mode 100644
index 000000000..3a69f6e9d
--- /dev/null
+++ b/swarm/storage/localstore/mode_get.go
@@ -0,0 +1,154 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// ModeGet enumerates different Getter modes.
+type ModeGet int
+
+// Getter modes.
+const (
+ // ModeGetRequest: when accessed for retrieval
+ ModeGetRequest ModeGet = iota
+ // ModeGetSync: when accessed for syncing or proof of custody request
+ ModeGetSync
+)
+
+// Getter provides Get method to retrieve Chunks
+// from database.
+type Getter struct {
+ db *DB
+ mode ModeGet
+}
+
+// NewGetter returns a new Getter on database
+// with a specific Mode.
+func (db *DB) NewGetter(mode ModeGet) *Getter {
+ return &Getter{
+ mode: mode,
+ db: db,
+ }
+}
+
+// Get returns a chunk from the database. If the chunk is
+// not found storage.ErrChunkNotFound will be returned.
+// All required indexes will be updated required by the
+// Getter Mode.
+func (g *Getter) Get(addr storage.Address) (chunk storage.Chunk, err error) {
+ out, err := g.db.get(g.mode, addr)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ return nil, storage.ErrChunkNotFound
+ }
+ return nil, err
+ }
+ return storage.NewChunk(out.Address, out.Data), nil
+}
+
+// get returns Item from the retrieval index
+// and updates other indexes.
+func (db *DB) get(mode ModeGet, addr storage.Address) (out shed.Item, err error) {
+ item := addressToItem(addr)
+
+ out, err = db.retrievalDataIndex.Get(item)
+ if err != nil {
+ return out, err
+ }
+ switch mode {
+ // update the access timestamp and gc index
+ case ModeGetRequest:
+ if db.updateGCSem != nil {
+ // wait before creating new goroutines
+ // if updateGCSem buffer id full
+ db.updateGCSem <- struct{}{}
+ }
+ db.updateGCWG.Add(1)
+ go func() {
+ defer db.updateGCWG.Done()
+ if db.updateGCSem != nil {
+ // free a spot in updateGCSem buffer
+ // for a new goroutine
+ defer func() { <-db.updateGCSem }()
+ }
+ err := db.updateGC(out)
+ if err != nil {
+ log.Error("localstore update gc", "err", err)
+ }
+ // if gc update hook is defined, call it
+ if testHookUpdateGC != nil {
+ testHookUpdateGC()
+ }
+ }()
+
+ // no updates to indexes
+ case ModeGetSync:
+ default:
+ return out, ErrInvalidMode
+ }
+ return out, nil
+}
+
+// updateGC updates garbage collection index for
+// a single item. Provided item is expected to have
+// only Address and Data fields with non zero values,
+// which is ensured by the get function.
+func (db *DB) updateGC(item shed.Item) (err error) {
+ unlock, err := db.lockAddr(item.Address)
+ if err != nil {
+ return err
+ }
+ defer unlock()
+
+ batch := new(leveldb.Batch)
+
+ // update accessTimeStamp in retrieve, gc
+
+ i, err := db.retrievalAccessIndex.Get(item)
+ switch err {
+ case nil:
+ item.AccessTimestamp = i.AccessTimestamp
+ case leveldb.ErrNotFound:
+ // no chunk accesses
+ default:
+ return err
+ }
+ if item.AccessTimestamp == 0 {
+ // chunk is not yet synced
+ // do not add it to the gc index
+ return nil
+ }
+ // delete current entry from the gc index
+ db.gcIndex.DeleteInBatch(batch, item)
+ // update access timestamp
+ item.AccessTimestamp = now()
+ // update retrieve access index
+ db.retrievalAccessIndex.PutInBatch(batch, item)
+ // add new entry to gc index
+ db.gcIndex.PutInBatch(batch, item)
+
+ return db.shed.WriteBatch(batch)
+}
+
+// testHookUpdateGC is a hook that can provide
+// information when a garbage collection index is updated.
+var testHookUpdateGC func()
diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go
new file mode 100644
index 000000000..6615a3b88
--- /dev/null
+++ b/swarm/storage/localstore/mode_get_test.go
@@ -0,0 +1,237 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "testing"
+ "time"
+)
+
+// TestModeGetRequest validates ModeGetRequest index values on the provided DB.
+func TestModeGetRequest(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploadTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return uploadTimestamp
+ })()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ requester := db.NewGetter(ModeGetRequest)
+
+ // set update gc test hook to signal when
+ // update gc goroutine is done by sending to
+ // testHookUpdateGCChan channel, which is
+ // used to wait for garbage colletion index
+ // changes
+ testHookUpdateGCChan := make(chan struct{})
+ defer setTestHookUpdateGC(func() {
+ testHookUpdateGCChan <- struct{}{}
+ })()
+
+ t.Run("get unsynced", func(t *testing.T) {
+ got, err := requester.Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+
+ if !bytes.Equal(got.Address(), chunk.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ }
+
+ if !bytes.Equal(got.Data(), chunk.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+ })
+
+ // set chunk to synced state
+ err = db.NewSetter(ModeSetSync).Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("first get", func(t *testing.T) {
+ got, err := requester.Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+
+ if !bytes.Equal(got.Address(), chunk.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ }
+
+ if !bytes.Equal(got.Data(), chunk.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, uploadTimestamp))
+
+ t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, uploadTimestamp))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+ })
+
+ t.Run("second get", func(t *testing.T) {
+ accessTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return accessTimestamp
+ })()
+
+ got, err := requester.Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+
+ if !bytes.Equal(got.Address(), chunk.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ }
+
+ if !bytes.Equal(got.Data(), chunk.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, accessTimestamp))
+
+ t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, accessTimestamp))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+ })
+}
+
+// TestModeGetSync validates ModeGetSync index values on the provided DB.
+func TestModeGetSync(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploadTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return uploadTimestamp
+ })()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ got, err := db.NewGetter(ModeGetSync).Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !bytes.Equal(got.Address(), chunk.Address()) {
+ t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
+ }
+
+ if !bytes.Equal(got.Data(), chunk.Data()) {
+ t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+}
+
+// setTestHookUpdateGC sets testHookUpdateGC and
+// returns a function that will reset it to the
+// value before the change.
+func setTestHookUpdateGC(h func()) (reset func()) {
+ current := testHookUpdateGC
+ reset = func() { testHookUpdateGC = current }
+ testHookUpdateGC = h
+ return reset
+}
+
+// TestSetTestHookUpdateGC tests if setTestHookUpdateGC changes
+// testHookUpdateGC function correctly and if its reset function
+// resets the original function.
+func TestSetTestHookUpdateGC(t *testing.T) {
+ // Set the current function after the test finishes.
+ defer func(h func()) { testHookUpdateGC = h }(testHookUpdateGC)
+
+ // expected value for the unchanged function
+ original := 1
+ // expected value for the changed function
+ changed := 2
+
+ // this variable will be set with two different functions
+ var got int
+
+ // define the original (unchanged) functions
+ testHookUpdateGC = func() {
+ got = original
+ }
+
+ // set got variable
+ testHookUpdateGC()
+
+ // test if got variable is set correctly
+ if got != original {
+ t.Errorf("got hook value %v, want %v", got, original)
+ }
+
+ // set the new function
+ reset := setTestHookUpdateGC(func() {
+ got = changed
+ })
+
+ // set got variable
+ testHookUpdateGC()
+
+ // test if got variable is set correctly to changed value
+ if got != changed {
+ t.Errorf("got hook value %v, want %v", got, changed)
+ }
+
+ // set the function to the original one
+ reset()
+
+ // set got variable
+ testHookUpdateGC()
+
+ // test if got variable is set correctly to original value
+ if got != original {
+ t.Errorf("got hook value %v, want %v", got, original)
+ }
+}
diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go
new file mode 100644
index 000000000..1a5a3d1b1
--- /dev/null
+++ b/swarm/storage/localstore/mode_put.go
@@ -0,0 +1,160 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// ModePut enumerates different Putter modes.
+type ModePut int
+
+// Putter modes.
+const (
+ // ModePutRequest: when a chunk is received as a result of retrieve request and delivery
+ ModePutRequest ModePut = iota
+ // ModePutSync: when a chunk is received via syncing
+ ModePutSync
+ // ModePutUpload: when a chunk is created by local upload
+ ModePutUpload
+)
+
+// Putter provides Put method to store Chunks
+// to database.
+type Putter struct {
+ db *DB
+ mode ModePut
+}
+
+// NewPutter returns a new Putter on database
+// with a specific Mode.
+func (db *DB) NewPutter(mode ModePut) *Putter {
+ return &Putter{
+ mode: mode,
+ db: db,
+ }
+}
+
+// Put stores the Chunk to database and depending
+// on the Putter mode, it updates required indexes.
+func (p *Putter) Put(ch storage.Chunk) (err error) {
+ return p.db.put(p.mode, chunkToItem(ch))
+}
+
+// put stores Item to database and updates other
+// indexes. It acquires lockAddr to protect two calls
+// of this function for the same address in parallel.
+// Item fields Address and Data must not be
+// with their nil values.
+func (db *DB) put(mode ModePut, item shed.Item) (err error) {
+ // protect parallel updates
+ unlock, err := db.lockAddr(item.Address)
+ if err != nil {
+ return err
+ }
+ defer unlock()
+
+ batch := new(leveldb.Batch)
+
+ // variables that provide information for operations
+ // to be done after write batch function successfully executes
+ var gcSizeChange int64 // number to add or subtract from gcSize
+ var triggerPullFeed bool // signal pull feed subscriptions to iterate
+ var triggerPushFeed bool // signal push feed subscriptions to iterate
+
+ switch mode {
+ case ModePutRequest:
+ // put to indexes: retrieve, gc; it does not enter the syncpool
+
+ // check if the chunk already is in the database
+ // as gc index is updated
+ i, err := db.retrievalAccessIndex.Get(item)
+ switch err {
+ case nil:
+ item.AccessTimestamp = i.AccessTimestamp
+ case leveldb.ErrNotFound:
+ // no chunk accesses
+ default:
+ return err
+ }
+ i, err = db.retrievalDataIndex.Get(item)
+ switch err {
+ case nil:
+ item.StoreTimestamp = i.StoreTimestamp
+ case leveldb.ErrNotFound:
+ // no chunk accesses
+ default:
+ return err
+ }
+ if item.AccessTimestamp != 0 {
+ // delete current entry from the gc index
+ db.gcIndex.DeleteInBatch(batch, item)
+ gcSizeChange--
+ }
+ if item.StoreTimestamp == 0 {
+ item.StoreTimestamp = now()
+ }
+ // update access timestamp
+ item.AccessTimestamp = now()
+ // update retrieve access index
+ db.retrievalAccessIndex.PutInBatch(batch, item)
+ // add new entry to gc index
+ db.gcIndex.PutInBatch(batch, item)
+ db.gcUncountedHashesIndex.PutInBatch(batch, item)
+ gcSizeChange++
+
+ db.retrievalDataIndex.PutInBatch(batch, item)
+
+ case ModePutUpload:
+ // put to indexes: retrieve, push, pull
+
+ item.StoreTimestamp = now()
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ db.pushIndex.PutInBatch(batch, item)
+ triggerPushFeed = true
+
+ case ModePutSync:
+ // put to indexes: retrieve, pull
+
+ item.StoreTimestamp = now()
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+
+ default:
+ return ErrInvalidMode
+ }
+
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return err
+ }
+ if gcSizeChange != 0 {
+ db.incGCSize(gcSizeChange)
+ }
+ if triggerPullFeed {
+ db.triggerPullSubscriptions(db.po(item.Address))
+ }
+ if triggerPushFeed {
+ db.triggerPushSubscriptions()
+ }
+ return nil
+}
diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go
new file mode 100644
index 000000000..ffe6a4cb4
--- /dev/null
+++ b/swarm/storage/localstore/mode_put_test.go
@@ -0,0 +1,300 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TestModePutRequest validates ModePutRequest index values on the provided DB.
+func TestModePutRequest(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ putter := db.NewPutter(ModePutRequest)
+
+ chunk := generateRandomChunk()
+
+ // keep the record when the chunk is stored
+ var storeTimestamp int64
+
+ t.Run("first put", func(t *testing.T) {
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ storeTimestamp = wantTimestamp
+
+ err := putter.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+ })
+
+ t.Run("second put", func(t *testing.T) {
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ err := putter.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, storeTimestamp, wantTimestamp))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+ })
+}
+
+// TestModePutSync validates ModePutSync index values on the provided DB.
+func TestModePutSync(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutSync).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
+
+ t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+}
+
+// TestModePutUpload validates ModePutUpload index values on the provided DB.
+func TestModePutUpload(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
+
+ t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+
+ t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, nil))
+}
+
+// TestModePutUpload_parallel uploads chunks in parallel
+// and validates if all chunks can be retrieved with correct data.
+func TestModePutUpload_parallel(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ chunkCount := 1000
+ workerCount := 100
+
+ chunkChan := make(chan storage.Chunk)
+ errChan := make(chan error)
+ doneChan := make(chan struct{})
+ defer close(doneChan)
+
+ // start uploader workers
+ for i := 0; i < workerCount; i++ {
+ go func(i int) {
+ uploader := db.NewPutter(ModePutUpload)
+ for {
+ select {
+ case chunk, ok := <-chunkChan:
+ if !ok {
+ return
+ }
+ err := uploader.Put(chunk)
+ select {
+ case errChan <- err:
+ case <-doneChan:
+ }
+ case <-doneChan:
+ return
+ }
+ }
+ }(i)
+ }
+
+ chunks := make([]storage.Chunk, 0)
+ var chunksMu sync.Mutex
+
+ // send chunks to workers
+ go func() {
+ for i := 0; i < chunkCount; i++ {
+ chunk := generateRandomChunk()
+ select {
+ case chunkChan <- chunk:
+ case <-doneChan:
+ return
+ }
+ chunksMu.Lock()
+ chunks = append(chunks, chunk)
+ chunksMu.Unlock()
+ }
+
+ close(chunkChan)
+ }()
+
+ // validate every error from workers
+ for i := 0; i < chunkCount; i++ {
+ err := <-errChan
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // get every chunk and validate its data
+ getter := db.NewGetter(ModeGetRequest)
+
+ chunksMu.Lock()
+ defer chunksMu.Unlock()
+ for _, chunk := range chunks {
+ got, err := getter.Get(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got.Data(), chunk.Data()) {
+ t.Fatalf("got chunk %s data %x, want %x", chunk.Address().Hex(), got.Data(), chunk.Data())
+ }
+ }
+}
+
+// BenchmarkPutUpload runs a series of benchmarks that upload
+// a specific number of chunks in parallel.
+//
+// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014)
+//
+// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkPutUpload -v
+//
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
+// BenchmarkPutUpload/count_100_parallel_1-8 300 5107704 ns/op 2081461 B/op 2374 allocs/op
+// BenchmarkPutUpload/count_100_parallel_2-8 300 5411742 ns/op 2081608 B/op 2364 allocs/op
+// BenchmarkPutUpload/count_100_parallel_4-8 500 3704964 ns/op 2081696 B/op 2324 allocs/op
+// BenchmarkPutUpload/count_100_parallel_8-8 500 2932663 ns/op 2082594 B/op 2295 allocs/op
+// BenchmarkPutUpload/count_100_parallel_16-8 500 3117157 ns/op 2085438 B/op 2282 allocs/op
+// BenchmarkPutUpload/count_100_parallel_32-8 500 3449122 ns/op 2089721 B/op 2286 allocs/op
+// BenchmarkPutUpload/count_1000_parallel_1-8 20 79784470 ns/op 25211240 B/op 23225 allocs/op
+// BenchmarkPutUpload/count_1000_parallel_2-8 20 75422164 ns/op 25210730 B/op 23187 allocs/op
+// BenchmarkPutUpload/count_1000_parallel_4-8 20 70698378 ns/op 25206522 B/op 22692 allocs/op
+// BenchmarkPutUpload/count_1000_parallel_8-8 20 71285528 ns/op 25213436 B/op 22345 allocs/op
+// BenchmarkPutUpload/count_1000_parallel_16-8 20 71301826 ns/op 25205040 B/op 22090 allocs/op
+// BenchmarkPutUpload/count_1000_parallel_32-8 30 57713506 ns/op 25219781 B/op 21848 allocs/op
+// BenchmarkPutUpload/count_10000_parallel_1-8 2 656719345 ns/op 216792908 B/op 248940 allocs/op
+// BenchmarkPutUpload/count_10000_parallel_2-8 2 646301962 ns/op 216730800 B/op 248270 allocs/op
+// BenchmarkPutUpload/count_10000_parallel_4-8 2 532784228 ns/op 216667080 B/op 241910 allocs/op
+// BenchmarkPutUpload/count_10000_parallel_8-8 3 494290188 ns/op 216297749 B/op 236247 allocs/op
+// BenchmarkPutUpload/count_10000_parallel_16-8 3 483485315 ns/op 216060384 B/op 231090 allocs/op
+// BenchmarkPutUpload/count_10000_parallel_32-8 3 434461294 ns/op 215371280 B/op 224800 allocs/op
+// BenchmarkPutUpload/count_100000_parallel_1-8 1 22767894338 ns/op 2331372088 B/op 4049876 allocs/op
+// BenchmarkPutUpload/count_100000_parallel_2-8 1 25347872677 ns/op 2344140160 B/op 4106763 allocs/op
+// BenchmarkPutUpload/count_100000_parallel_4-8 1 23580460174 ns/op 2338582576 B/op 4027452 allocs/op
+// BenchmarkPutUpload/count_100000_parallel_8-8 1 22197559193 ns/op 2321803496 B/op 3877553 allocs/op
+// BenchmarkPutUpload/count_100000_parallel_16-8 1 22527046476 ns/op 2327854800 B/op 3885455 allocs/op
+// BenchmarkPutUpload/count_100000_parallel_32-8 1 21332243613 ns/op 2299654568 B/op 3697181 allocs/op
+// PASS
+func BenchmarkPutUpload(b *testing.B) {
+ for _, count := range []int{
+ 100,
+ 1000,
+ 10000,
+ 100000,
+ } {
+ for _, maxParallelUploads := range []int{
+ 1,
+ 2,
+ 4,
+ 8,
+ 16,
+ 32,
+ } {
+ name := fmt.Sprintf("count %v parallel %v", count, maxParallelUploads)
+ b.Run(name, func(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ benchmarkPutUpload(b, nil, count, maxParallelUploads)
+ }
+ })
+ }
+ }
+}
+
+// benchmarkPutUpload runs a benchmark by uploading a specific number
+// of chunks with specified max parallel uploads.
+func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) {
+ b.StopTimer()
+ db, cleanupFunc := newTestDB(b, o)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+ chunks := make([]storage.Chunk, count)
+ for i := 0; i < count; i++ {
+ chunks[i] = generateFakeRandomChunk()
+ }
+ errs := make(chan error)
+ b.StartTimer()
+
+ go func() {
+ sem := make(chan struct{}, maxParallelUploads)
+ for i := 0; i < count; i++ {
+ sem <- struct{}{}
+
+ go func(i int) {
+ defer func() { <-sem }()
+
+ errs <- uploader.Put(chunks[i])
+ }(i)
+ }
+ }()
+
+ for i := 0; i < count; i++ {
+ err := <-errs
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go
new file mode 100644
index 000000000..a522f4447
--- /dev/null
+++ b/swarm/storage/localstore/mode_set.go
@@ -0,0 +1,205 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// ModeSet enumerates different Setter modes.
+type ModeSet int
+
+// Setter modes.
+const (
+ // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
+ ModeSetAccess ModeSet = iota
+ // ModeSetSync: when push sync receipt is received
+ ModeSetSync
+ // modeSetRemove: when GC-d
+ // unexported as no external packages should remove chunks from database
+ modeSetRemove
+)
+
+// Setter sets the state of a particular
+// Chunk in database by changing indexes.
+type Setter struct {
+ db *DB
+ mode ModeSet
+}
+
+// NewSetter returns a new Setter on database
+// with a specific Mode.
+func (db *DB) NewSetter(mode ModeSet) *Setter {
+ return &Setter{
+ mode: mode,
+ db: db,
+ }
+}
+
+// Set updates database indexes for a specific
+// chunk represented by the address.
+func (s *Setter) Set(addr storage.Address) (err error) {
+ return s.db.set(s.mode, addr)
+}
+
+// set updates database indexes for a specific
+// chunk represented by the address.
+// It acquires lockAddr to protect two calls
+// of this function for the same address in parallel.
+func (db *DB) set(mode ModeSet, addr storage.Address) (err error) {
+ // protect parallel updates
+ unlock, err := db.lockAddr(addr)
+ if err != nil {
+ return err
+ }
+ defer unlock()
+
+ batch := new(leveldb.Batch)
+
+ // variables that provide information for operations
+ // to be done after write batch function successfully executes
+ var gcSizeChange int64 // number to add or subtract from gcSize
+ var triggerPullFeed bool // signal pull feed subscriptions to iterate
+
+ item := addressToItem(addr)
+
+ switch mode {
+ case ModeSetAccess:
+ // add to pull, insert to gc
+
+ // need to get access timestamp here as it is not
+ // provided by the access function, and it is not
+ // a property of a chunk provided to Accessor.Put.
+
+ i, err := db.retrievalDataIndex.Get(item)
+ switch err {
+ case nil:
+ item.StoreTimestamp = i.StoreTimestamp
+ case leveldb.ErrNotFound:
+ db.pushIndex.DeleteInBatch(batch, item)
+ item.StoreTimestamp = now()
+ default:
+ return err
+ }
+
+ i, err = db.retrievalAccessIndex.Get(item)
+ switch err {
+ case nil:
+ item.AccessTimestamp = i.AccessTimestamp
+ db.gcIndex.DeleteInBatch(batch, item)
+ gcSizeChange--
+ case leveldb.ErrNotFound:
+ // the chunk is not accessed before
+ default:
+ return err
+ }
+ item.AccessTimestamp = now()
+ db.retrievalAccessIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ db.gcIndex.PutInBatch(batch, item)
+ db.gcUncountedHashesIndex.PutInBatch(batch, item)
+ gcSizeChange++
+
+ case ModeSetSync:
+ // delete from push, insert to gc
+
+ // need to get access timestamp here as it is not
+ // provided by the access function, and it is not
+ // a property of a chunk provided to Accessor.Put.
+ i, err := db.retrievalDataIndex.Get(item)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ // chunk is not found,
+ // no need to update gc index
+ // just delete from the push index
+ // if it is there
+ db.pushIndex.DeleteInBatch(batch, item)
+ return nil
+ }
+ return err
+ }
+ item.StoreTimestamp = i.StoreTimestamp
+
+ i, err = db.retrievalAccessIndex.Get(item)
+ switch err {
+ case nil:
+ item.AccessTimestamp = i.AccessTimestamp
+ db.gcIndex.DeleteInBatch(batch, item)
+ gcSizeChange--
+ case leveldb.ErrNotFound:
+ // the chunk is not accessed before
+ default:
+ return err
+ }
+ item.AccessTimestamp = now()
+ db.retrievalAccessIndex.PutInBatch(batch, item)
+ db.pushIndex.DeleteInBatch(batch, item)
+ db.gcIndex.PutInBatch(batch, item)
+ db.gcUncountedHashesIndex.PutInBatch(batch, item)
+ gcSizeChange++
+
+ case modeSetRemove:
+ // delete from retrieve, pull, gc
+
+ // need to get access timestamp here as it is not
+ // provided by the access function, and it is not
+ // a property of a chunk provided to Accessor.Put.
+
+ i, err := db.retrievalAccessIndex.Get(item)
+ switch err {
+ case nil:
+ item.AccessTimestamp = i.AccessTimestamp
+ case leveldb.ErrNotFound:
+ default:
+ return err
+ }
+ i, err = db.retrievalDataIndex.Get(item)
+ if err != nil {
+ return err
+ }
+ item.StoreTimestamp = i.StoreTimestamp
+
+ db.retrievalDataIndex.DeleteInBatch(batch, item)
+ db.retrievalAccessIndex.DeleteInBatch(batch, item)
+ db.pullIndex.DeleteInBatch(batch, item)
+ db.gcIndex.DeleteInBatch(batch, item)
+ db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
+ // a check is needed for decrementing gcSize
+ // as delete is not reporting if the key/value pair
+ // is deleted or not
+ if _, err := db.gcIndex.Get(item); err == nil {
+ gcSizeChange = -1
+ }
+
+ default:
+ return ErrInvalidMode
+ }
+
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return err
+ }
+ if gcSizeChange != 0 {
+ db.incGCSize(gcSizeChange)
+ }
+ if triggerPullFeed {
+ db.triggerPullSubscriptions(db.po(item.Address))
+ }
+ return nil
+}
diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go
new file mode 100644
index 000000000..94cd0a3e2
--- /dev/null
+++ b/swarm/storage/localstore/mode_set_test.go
@@ -0,0 +1,128 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "testing"
+ "time"
+
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// TestModeSetAccess validates ModeSetAccess index values on the provided DB.
+func TestModeSetAccess(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ chunk := generateRandomChunk()
+
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ err := db.NewSetter(ModeSetAccess).Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
+
+ t.Run("pull index count", newItemsCountTest(db.pullIndex, 1))
+
+ t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+}
+
+// TestModeSetSync validates ModeSetSync index values on the provided DB.
+func TestModeSetSync(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ chunk := generateRandomChunk()
+
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = db.NewSetter(ModeSetSync).Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
+
+ t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, leveldb.ErrNotFound))
+
+ t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+}
+
+// TestModeSetRemove validates ModeSetRemove index values on the provided DB.
+func TestModeSetRemove(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ chunk := generateRandomChunk()
+
+ err := db.NewPutter(ModePutUpload).Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = db.NewSetter(modeSetRemove).Set(chunk.Address())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", func(t *testing.T) {
+ wantErr := leveldb.ErrNotFound
+ _, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ if err != wantErr {
+ t.Errorf("got error %v, want %v", err, wantErr)
+ }
+ t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
+
+ // access index should not be set
+ _, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ if err != wantErr {
+ t.Errorf("got error %v, want %v", err, wantErr)
+ }
+ t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0))
+ })
+
+ t.Run("pull index", newPullIndexTest(db, chunk, 0, leveldb.ErrNotFound))
+
+ t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
+
+ t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
+
+ t.Run("gc size", newIndexGCSizeTest(db))
+
+}
diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go
new file mode 100644
index 000000000..9f5b452c5
--- /dev/null
+++ b/swarm/storage/localstore/retrieval_index_test.go
@@ -0,0 +1,150 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "strconv"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// BenchmarkRetrievalIndexes uploads a number of chunks in order to measure
+// total time of updating their retrieval indexes by setting them
+// to synced state and requesting them.
+//
+// This benchmark takes significant amount of time.
+//
+// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show
+// that two separated indexes perform better.
+//
+// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkRetrievalIndexes -v
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
+// BenchmarkRetrievalIndexes/1000-8 20 75556686 ns/op 19033493 B/op 84500 allocs/op
+// BenchmarkRetrievalIndexes/10000-8 1 1079084922 ns/op 382792064 B/op 1429644 allocs/op
+// BenchmarkRetrievalIndexes/100000-8 1 16891305737 ns/op 2629165304 B/op 12465019 allocs/op
+// PASS
+func BenchmarkRetrievalIndexes(b *testing.B) {
+ for _, count := range []int{
+ 1000,
+ 10000,
+ 100000,
+ } {
+ b.Run(strconv.Itoa(count)+"-split", func(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ benchmarkRetrievalIndexes(b, nil, count)
+ }
+ })
+ }
+}
+
+// benchmarkRetrievalIndexes is used in BenchmarkRetrievalIndexes
+// to do benchmarks with a specific number of chunks and different
+// database options.
+func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
+ b.StopTimer()
+ db, cleanupFunc := newTestDB(b, o)
+ defer cleanupFunc()
+ uploader := db.NewPutter(ModePutUpload)
+ syncer := db.NewSetter(ModeSetSync)
+ requester := db.NewGetter(ModeGetRequest)
+ addrs := make([]storage.Address, count)
+ for i := 0; i < count; i++ {
+ chunk := generateFakeRandomChunk()
+ err := uploader.Put(chunk)
+ if err != nil {
+ b.Fatal(err)
+ }
+ addrs[i] = chunk.Address()
+ }
+ // set update gc test hook to signal when
+ // update gc goroutine is done by sending to
+ // testHookUpdateGCChan channel, which is
+ // used to wait for gc index updates to be
+ // included in the benchmark time
+ testHookUpdateGCChan := make(chan struct{})
+ defer setTestHookUpdateGC(func() {
+ testHookUpdateGCChan <- struct{}{}
+ })()
+ b.StartTimer()
+
+ for i := 0; i < count; i++ {
+ err := syncer.Set(addrs[i])
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ _, err = requester.Get(addrs[i])
+ if err != nil {
+ b.Fatal(err)
+ }
+ // wait for update gc goroutine to be done
+ <-testHookUpdateGCChan
+ }
+}
+
+// BenchmarkUpload compares uploading speed for different
+// retrieval indexes and various number of chunks.
+//
+// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014).
+//
+// go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkUpload -v
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
+// BenchmarkUpload/1000-8 20 59437463 ns/op 25205193 B/op 23208 allocs/op
+// BenchmarkUpload/10000-8 2 580646362 ns/op 216532932 B/op 248090 allocs/op
+// BenchmarkUpload/100000-8 1 22373390892 ns/op 2323055312 B/op 3995903 allocs/op
+// PASS
+func BenchmarkUpload(b *testing.B) {
+ for _, count := range []int{
+ 1000,
+ 10000,
+ 100000,
+ } {
+ b.Run(strconv.Itoa(count), func(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ benchmarkUpload(b, nil, count)
+ }
+ })
+ }
+}
+
+// benchmarkUpload is used in BenchmarkUpload
+// to do benchmarks with a specific number of chunks and different
+// database options.
+func benchmarkUpload(b *testing.B, o *Options, count int) {
+ b.StopTimer()
+ db, cleanupFunc := newTestDB(b, o)
+ defer cleanupFunc()
+ uploader := db.NewPutter(ModePutUpload)
+ chunks := make([]storage.Chunk, count)
+ for i := 0; i < count; i++ {
+ chunk := generateFakeRandomChunk()
+ chunks[i] = chunk
+ }
+ b.StartTimer()
+
+ for i := 0; i < count; i++ {
+ err := uploader.Put(chunks[i])
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
new file mode 100644
index 000000000..a18f0915d
--- /dev/null
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -0,0 +1,193 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
+// Pull syncing index can be only subscribed to a particular proximity order bin. If since
+// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
+// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
+// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
+// function will terminate current and further iterations without errors, and also close the returned channel.
+// Make sure that you check the second returned parameter from the channel to stop iteration when its value
+// is false.
+func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
+ chunkDescriptors := make(chan ChunkDescriptor)
+ trigger := make(chan struct{}, 1)
+
+ db.pullTriggersMu.Lock()
+ if _, ok := db.pullTriggers[bin]; !ok {
+ db.pullTriggers[bin] = make([]chan struct{}, 0)
+ }
+ db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
+ db.pullTriggersMu.Unlock()
+
+ // send signal for the initial iteration
+ trigger <- struct{}{}
+
+ stopChan := make(chan struct{})
+ var stopChanOnce sync.Once
+
+ // used to provide information from the iterator to
+ // stop subscription when until chunk descriptor is reached
+ var errStopSubscription = errors.New("stop subscription")
+
+ go func() {
+ // close the returned ChunkDescriptor channel at the end to
+ // signal that the subscription is done
+ defer close(chunkDescriptors)
+ // sinceItem is the Item from which the next iteration
+ // should start. The first iteration starts from the first Item.
+ var sinceItem *shed.Item
+ if since != nil {
+ sinceItem = &shed.Item{
+ Address: since.Address,
+ StoreTimestamp: since.StoreTimestamp,
+ }
+ }
+ for {
+ select {
+ case <-trigger:
+ // iterate until:
+ // - last index Item is reached
+ // - subscription stop is called
+ // - context is done
+ err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ select {
+ case chunkDescriptors <- ChunkDescriptor{
+ Address: item.Address,
+ StoreTimestamp: item.StoreTimestamp,
+ }:
+ // until chunk descriptor is sent
+ // break the iteration
+ if until != nil &&
+ (item.StoreTimestamp >= until.StoreTimestamp ||
+ bytes.Equal(item.Address, until.Address)) {
+ return true, errStopSubscription
+ }
+ // set next iteration start item
+ // when its chunk is successfully sent to channel
+ sinceItem = &item
+ return false, nil
+ case <-stopChan:
+ // gracefully stop the iteration
+ // on stop
+ return true, nil
+ case <-db.close:
+ // gracefully stop the iteration
+ // on database close
+ return true, nil
+ case <-ctx.Done():
+ return true, ctx.Err()
+ }
+ }, &shed.IterateOptions{
+ StartFrom: sinceItem,
+ // sinceItem was sent as the last Address in the previous
+ // iterator call, skip it in this one
+ SkipStartFromItem: true,
+ Prefix: []byte{bin},
+ })
+ if err != nil {
+ if err == errStopSubscription {
+ // stop subscription without any errors
+ // if until is reached
+ return
+ }
+ log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
+ return
+ }
+ case <-stopChan:
+ // terminate the subscription
+ // on stop
+ return
+ case <-db.close:
+ // terminate the subscription
+ // on database close
+ return
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err != nil {
+ log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
+ }
+ return
+ }
+ }
+ }()
+
+ stop = func() {
+ stopChanOnce.Do(func() {
+ close(stopChan)
+ })
+
+ db.pullTriggersMu.Lock()
+ defer db.pullTriggersMu.Unlock()
+
+ for i, t := range db.pullTriggers[bin] {
+ if t == trigger {
+ db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
+ break
+ }
+ }
+ }
+
+ return chunkDescriptors, stop
+}
+
+// ChunkDescriptor holds information required for Pull syncing. This struct
+// is provided by subscribing to pull index.
+type ChunkDescriptor struct {
+ Address storage.Address
+ StoreTimestamp int64
+}
+
+func (c *ChunkDescriptor) String() string {
+ if c == nil {
+ return "none"
+ }
+ return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
+}
+
+// triggerPullSubscriptions is used internally for starting iterations
+// on Pull subscriptions for a particular bin. When new item with address
+// that is in particular bin for DB's baseKey is added to pull index
+// this function should be called.
+func (db *DB) triggerPullSubscriptions(bin uint8) {
+ db.pullTriggersMu.RLock()
+ triggers, ok := db.pullTriggers[bin]
+ db.pullTriggersMu.RUnlock()
+ if !ok {
+ return
+ }
+
+ for _, t := range triggers {
+ select {
+ case t <- struct{}{}:
+ default:
+ }
+ }
+}
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
new file mode 100644
index 000000000..5c99e0dec
--- /dev/null
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -0,0 +1,478 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TestDB_SubscribePull uploads some chunks before and after
+// pull syncing subscription is created and validates if
+// all addresses are received in the right order
+// for expected proximity order bins.
+func TestDB_SubscribePull(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]storage.Address)
+ var addrsMu sync.Mutex
+ var wantedChunksCount int
+
+ // prepopulate database with some chunks
+ // before the subscription
+ uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ {
+ ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ }
+
+ // upload some chunks just after subscribe
+ uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+
+ time.Sleep(200 * time.Millisecond)
+
+ // upload some chunks after some short time
+ // to ensure that subscription will include them
+ // in a dynamic environment
+ uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+
+ checkErrChan(ctx, t, errChan, wantedChunksCount)
+}
+
+// TestDB_SubscribePull_multiple uploads chunks before and after
+// multiple pull syncing subscriptions are created and
+// validates if all addresses are received in the right order
+// for expected proximity order bins.
+func TestDB_SubscribePull_multiple(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]storage.Address)
+ var addrsMu sync.Mutex
+ var wantedChunksCount int
+
+ // prepopulate database with some chunks
+ // before the subscription
+ uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ subsCount := 10
+
+ // start a number of subscriptions
+ // that all of them will write every address error to errChan
+ for j := 0; j < subsCount; j++ {
+ for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ {
+ ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ }
+ }
+
+ // upload some chunks just after subscribe
+ uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+
+ time.Sleep(200 * time.Millisecond)
+
+ // upload some chunks after some short time
+ // to ensure that subscription will include them
+ // in a dynamic environment
+ uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+
+ checkErrChan(ctx, t, errChan, wantedChunksCount*subsCount)
+}
+
+// TestDB_SubscribePull_since uploads chunks before and after
+// pull syncing subscriptions are created with a since argument
+// and validates if all expected addresses are received in the
+// right order for expected proximity order bins.
+func TestDB_SubscribePull_since(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]storage.Address)
+ var addrsMu sync.Mutex
+ var wantedChunksCount int
+
+ lastTimestamp := time.Now().UTC().UnixNano()
+ var lastTimestampMu sync.RWMutex
+ defer setNow(func() (t int64) {
+ lastTimestampMu.Lock()
+ defer lastTimestampMu.Unlock()
+ lastTimestamp++
+ return lastTimestamp
+ })()
+
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ last = make(map[uint8]ChunkDescriptor)
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ bin := db.po(chunk.Address())
+
+ addrsMu.Lock()
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]storage.Address, 0)
+ }
+ if wanted {
+ addrs[bin] = append(addrs[bin], chunk.Address())
+ wantedChunksCount++
+ }
+ addrsMu.Unlock()
+
+ lastTimestampMu.RLock()
+ storeTimestamp := lastTimestamp
+ lastTimestampMu.RUnlock()
+
+ last[bin] = ChunkDescriptor{
+ Address: chunk.Address(),
+ StoreTimestamp: storeTimestamp,
+ }
+ }
+ return last
+ }
+
+ // prepopulate database with some chunks
+ // before the subscription
+ last := uploadRandomChunks(30, false)
+
+ uploadRandomChunks(25, true)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ {
+ var since *ChunkDescriptor
+ if c, ok := last[bin]; ok {
+ since = &c
+ }
+ ch, stop := db.SubscribePull(ctx, bin, since, nil)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+
+ }
+
+ // upload some chunks just after subscribe
+ uploadRandomChunks(15, true)
+
+ checkErrChan(ctx, t, errChan, wantedChunksCount)
+}
+
+// TestDB_SubscribePull_until uploads chunks before and after
+// pull syncing subscriptions are created with an until argument
+// and validates if all expected addresses are received in the
+// right order for expected proximity order bins.
+func TestDB_SubscribePull_until(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]storage.Address)
+ var addrsMu sync.Mutex
+ var wantedChunksCount int
+
+ lastTimestamp := time.Now().UTC().UnixNano()
+ var lastTimestampMu sync.RWMutex
+ defer setNow(func() (t int64) {
+ lastTimestampMu.Lock()
+ defer lastTimestampMu.Unlock()
+ lastTimestamp++
+ return lastTimestamp
+ })()
+
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ last = make(map[uint8]ChunkDescriptor)
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ bin := db.po(chunk.Address())
+
+ addrsMu.Lock()
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]storage.Address, 0)
+ }
+ if wanted {
+ addrs[bin] = append(addrs[bin], chunk.Address())
+ wantedChunksCount++
+ }
+ addrsMu.Unlock()
+
+ lastTimestampMu.RLock()
+ storeTimestamp := lastTimestamp
+ lastTimestampMu.RUnlock()
+
+ last[bin] = ChunkDescriptor{
+ Address: chunk.Address(),
+ StoreTimestamp: storeTimestamp,
+ }
+ }
+ return last
+ }
+
+ // prepopulate database with some chunks
+ // before the subscription
+ last := uploadRandomChunks(30, true)
+
+ uploadRandomChunks(25, false)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ {
+ until, ok := last[bin]
+ if !ok {
+ continue
+ }
+ ch, stop := db.SubscribePull(ctx, bin, nil, &until)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ }
+
+ // upload some chunks just after subscribe
+ uploadRandomChunks(15, false)
+
+ checkErrChan(ctx, t, errChan, wantedChunksCount)
+}
+
+// TestDB_SubscribePull_sinceAndUntil uploads chunks before and
+// after pull syncing subscriptions are created with since
+// and until arguments, and validates if all expected addresses
+// are received in the right order for expected proximity order bins.
+func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]storage.Address)
+ var addrsMu sync.Mutex
+ var wantedChunksCount int
+
+ lastTimestamp := time.Now().UTC().UnixNano()
+ var lastTimestampMu sync.RWMutex
+ defer setNow(func() (t int64) {
+ lastTimestampMu.Lock()
+ defer lastTimestampMu.Unlock()
+ lastTimestamp++
+ return lastTimestamp
+ })()
+
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ last = make(map[uint8]ChunkDescriptor)
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ bin := db.po(chunk.Address())
+
+ addrsMu.Lock()
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]storage.Address, 0)
+ }
+ if wanted {
+ addrs[bin] = append(addrs[bin], chunk.Address())
+ wantedChunksCount++
+ }
+ addrsMu.Unlock()
+
+ lastTimestampMu.RLock()
+ storeTimestamp := lastTimestamp
+ lastTimestampMu.RUnlock()
+
+ last[bin] = ChunkDescriptor{
+ Address: chunk.Address(),
+ StoreTimestamp: storeTimestamp,
+ }
+ }
+ return last
+ }
+
+ // all chunks from upload1 are not expected
+ // as upload1 chunk is used as since for subscriptions
+ upload1 := uploadRandomChunks(100, false)
+
+ // all chunks from upload2 are expected
+ // as upload2 chunk is used as until for subscriptions
+ upload2 := uploadRandomChunks(100, true)
+
+ // upload some chunks before subscribe but after
+ // wanted chunks
+ uploadRandomChunks(8, false)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ {
+ var since *ChunkDescriptor
+ if c, ok := upload1[bin]; ok {
+ since = &c
+ }
+ until, ok := upload2[bin]
+ if !ok {
+ // no chunks un this bin uploaded in the upload2
+ // skip this bin from testing
+ continue
+ }
+ ch, stop := db.SubscribePull(ctx, bin, since, &until)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ }
+
+ // upload some chunks just after subscribe
+ uploadRandomChunks(15, false)
+
+ checkErrChan(ctx, t, errChan, wantedChunksCount)
+}
+
+// uploadRandomChunksBin uploads random chunks to database and adds them to
+// the map of addresses ber bin.
+func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ addrsMu.Lock()
+ bin := db.po(chunk.Address())
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]storage.Address, 0)
+ }
+ addrs[bin] = append(addrs[bin], chunk.Address())
+ addrsMu.Unlock()
+
+ *wantedChunksCount++
+ }
+}
+
+// readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and
+// sends error to errChan, even if it is nil, to count the number of ChunkDescriptors
+// returned by the channel.
+func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, errChan chan error) {
+ var i int // address index
+ for {
+ select {
+ case got, ok := <-ch:
+ if !ok {
+ return
+ }
+ addrsMu.Lock()
+ if i+1 > len(addrs[bin]) {
+ errChan <- fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin)
+ }
+ want := addrs[bin][i]
+ addrsMu.Unlock()
+ var err error
+ if !bytes.Equal(got.Address, want) {
+ err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want)
+ }
+ i++
+ // send one and only one error per received address
+ errChan <- err
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// checkErrChan expects the number of wantedChunksCount errors from errChan
+// and calls t.Error for the ones that are not nil.
+func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedChunksCount int) {
+ t.Helper()
+
+ for i := 0; i < wantedChunksCount; i++ {
+ select {
+ case err := <-errChan:
+ if err != nil {
+ t.Error(err)
+ }
+ case <-ctx.Done():
+ t.Fatal(ctx.Err())
+ }
+ }
+}
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
new file mode 100644
index 000000000..b13f29399
--- /dev/null
+++ b/swarm/storage/localstore/subscription_push.go
@@ -0,0 +1,145 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "context"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
+// Returned stop function will terminate current and further iterations, and also it will close
+// the returned channel without any errors. Make sure that you check the second returned parameter
+// from the channel to stop iteration when its value is false.
+func (db *DB) SubscribePush(ctx context.Context) (c <-chan storage.Chunk, stop func()) {
+ chunks := make(chan storage.Chunk)
+ trigger := make(chan struct{}, 1)
+
+ db.pushTriggersMu.Lock()
+ db.pushTriggers = append(db.pushTriggers, trigger)
+ db.pushTriggersMu.Unlock()
+
+ // send signal for the initial iteration
+ trigger <- struct{}{}
+
+ stopChan := make(chan struct{})
+ var stopChanOnce sync.Once
+
+ go func() {
+ // close the returned chunkInfo channel at the end to
+ // signal that the subscription is done
+ defer close(chunks)
+ // sinceItem is the Item from which the next iteration
+ // should start. The first iteration starts from the first Item.
+ var sinceItem *shed.Item
+ for {
+ select {
+ case <-trigger:
+ // iterate until:
+ // - last index Item is reached
+ // - subscription stop is called
+ // - context is done
+ err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
+ // get chunk data
+ dataItem, err := db.retrievalDataIndex.Get(item)
+ if err != nil {
+ return true, err
+ }
+
+ select {
+ case chunks <- storage.NewChunk(dataItem.Address, dataItem.Data):
+ // set next iteration start item
+ // when its chunk is successfully sent to channel
+ sinceItem = &item
+ return false, nil
+ case <-stopChan:
+ // gracefully stop the iteration
+ // on stop
+ return true, nil
+ case <-db.close:
+ // gracefully stop the iteration
+ // on database close
+ return true, nil
+ case <-ctx.Done():
+ return true, ctx.Err()
+ }
+ }, &shed.IterateOptions{
+ StartFrom: sinceItem,
+ // sinceItem was sent as the last Address in the previous
+ // iterator call, skip it in this one
+ SkipStartFromItem: true,
+ })
+ if err != nil {
+ log.Error("localstore push subscription iteration", "err", err)
+ return
+ }
+ case <-stopChan:
+ // terminate the subscription
+ // on stop
+ return
+ case <-db.close:
+ // terminate the subscription
+ // on database close
+ return
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err != nil {
+ log.Error("localstore push subscription", "err", err)
+ }
+ return
+ }
+ }
+ }()
+
+ stop = func() {
+ stopChanOnce.Do(func() {
+ close(stopChan)
+ })
+
+ db.pushTriggersMu.Lock()
+ defer db.pushTriggersMu.Unlock()
+
+ for i, t := range db.pushTriggers {
+ if t == trigger {
+ db.pushTriggers = append(db.pushTriggers[:i], db.pushTriggers[i+1:]...)
+ break
+ }
+ }
+ }
+
+ return chunks, stop
+}
+
+// triggerPushSubscriptions is used internally for starting iterations
+// on Push subscriptions. Whenever new item is added to the push index,
+// this function should be called.
+func (db *DB) triggerPushSubscriptions() {
+ db.pushTriggersMu.RLock()
+ triggers := db.pushTriggers
+ db.pushTriggersMu.RUnlock()
+
+ for _, t := range triggers {
+ select {
+ case t <- struct{}{}:
+ default:
+ }
+ }
+}
diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go
new file mode 100644
index 000000000..73e7c25f7
--- /dev/null
+++ b/swarm/storage/localstore/subscription_push_test.go
@@ -0,0 +1,200 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package localstore
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TestDB_SubscribePush uploads some chunks before and after
+// push syncing subscription is created and validates if
+// all addresses are received in the right order.
+func TestDB_SubscribePush(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ chunks := make([]storage.Chunk, 0)
+ var chunksMu sync.Mutex
+
+ uploadRandomChunks := func(count int) {
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ chunksMu.Lock()
+ chunks = append(chunks, chunk)
+ chunksMu.Unlock()
+ }
+ }
+
+ // prepopulate database with some chunks
+ // before the subscription
+ uploadRandomChunks(10)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ ch, stop := db.SubscribePush(ctx)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go func() {
+ var i int // address index
+ for {
+ select {
+ case got, ok := <-ch:
+ if !ok {
+ return
+ }
+ chunksMu.Lock()
+ want := chunks[i]
+ chunksMu.Unlock()
+ var err error
+ if !bytes.Equal(got.Data(), want.Data()) {
+ err = fmt.Errorf("got chunk %v data %x, want %x", i, got.Data(), want.Data())
+ }
+ if !bytes.Equal(got.Address(), want.Address()) {
+ err = fmt.Errorf("got chunk %v address %s, want %s", i, got.Address().Hex(), want.Address().Hex())
+ }
+ i++
+ // send one and only one error per received address
+ errChan <- err
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ // upload some chunks just after subscribe
+ uploadRandomChunks(5)
+
+ time.Sleep(200 * time.Millisecond)
+
+ // upload some chunks after some short time
+ // to ensure that subscription will include them
+ // in a dynamic environment
+ uploadRandomChunks(3)
+
+ checkErrChan(ctx, t, errChan, len(chunks))
+}
+
+// TestDB_SubscribePush_multiple uploads chunks before and after
+// multiple push syncing subscriptions are created and
+// validates if all addresses are received in the right order.
+func TestDB_SubscribePush_multiple(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make([]storage.Address, 0)
+ var addrsMu sync.Mutex
+
+ uploadRandomChunks := func(count int) {
+ for i := 0; i < count; i++ {
+ chunk := generateRandomChunk()
+
+ err := uploader.Put(chunk)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ addrsMu.Lock()
+ addrs = append(addrs, chunk.Address())
+ addrsMu.Unlock()
+ }
+ }
+
+ // prepopulate database with some chunks
+ // before the subscription
+ uploadRandomChunks(10)
+
+ // set a timeout on subscription
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // collect all errors from validating addresses, even nil ones
+ // to validate the number of addresses received by the subscription
+ errChan := make(chan error)
+
+ subsCount := 10
+
+ // start a number of subscriptions
+ // that all of them will write every addresses error to errChan
+ for j := 0; j < subsCount; j++ {
+ ch, stop := db.SubscribePush(ctx)
+ defer stop()
+
+ // receive and validate addresses from the subscription
+ go func(j int) {
+ var i int // address index
+ for {
+ select {
+ case got, ok := <-ch:
+ if !ok {
+ return
+ }
+ addrsMu.Lock()
+ want := addrs[i]
+ addrsMu.Unlock()
+ var err error
+ if !bytes.Equal(got.Address(), want) {
+ err = fmt.Errorf("got chunk %v address on subscription %v %s, want %s", i, j, got, want)
+ }
+ i++
+ // send one and only one error per received address
+ errChan <- err
+ case <-ctx.Done():
+ return
+ }
+ }
+ }(j)
+ }
+
+ // upload some chunks just after subscribe
+ uploadRandomChunks(5)
+
+ time.Sleep(200 * time.Millisecond)
+
+ // upload some chunks after some short time
+ // to ensure that subscription will include them
+ // in a dynamic environment
+ uploadRandomChunks(3)
+
+ // number of addresses received by all subscriptions
+ wantedChunksCount := len(addrs) * subsCount
+
+ checkErrChan(ctx, t, errChan, wantedChunksCount)
+}