aboutsummaryrefslogtreecommitdiffstats
path: root/swarm
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-03-09 07:06:39 +0800
committerViktor Trón <viktor.tron@gmail.com>2019-03-09 07:06:39 +0800
commit9a58a9b91a9bb60be2768aea451576135fa7ebe0 (patch)
tree5feb8bd2dea06ea82dd7079a54dce7828a39010a /swarm
parentf82185a4a186835e69dbbf2ed6117d116757da34 (diff)
downloadgo-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.tar
go-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.tar.gz
go-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.tar.bz2
go-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.tar.lz
go-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.tar.xz
go-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.tar.zst
go-tangerine-9a58a9b91a9bb60be2768aea451576135fa7ebe0.zip
swarm/storage/localstore: global batch write lock (#19245)
* swarm/storage/localstore: most basic database * swarm/storage/localstore: fix typos and comments * swarm/shed: add uint64 field Dec and DecInBatch methods * swarm/storage/localstore: decrement size counter on ModeRemoval update * swarm/storage/localstore: unexport modeAccess and modeRemoval * swarm/storage/localstore: add WithRetrievalCompositeIndex * swarm/storage/localstore: add TestModeSyncing * swarm/storage/localstore: fix test name * swarm/storage/localstore: add TestModeUpload * swarm/storage/localstore: add TestModeRequest * swarm/storage/localstore: add TestModeSynced * swarm/storage/localstore: add TestModeAccess * swarm/storage/localstore: add TestModeRemoval * swarm/storage/localstore: add mock store option for chunk data * swarm/storage/localstore: add TestDB_pullIndex * swarm/storage/localstore: add TestDB_gcIndex * swarm/storage/localstore: change how batches are written * swarm/storage/localstore: add updateOnAccess function * swarm/storage/localhost: add DB.gcSize * swarm/storage/localstore: update comments * swarm/storage/localstore: add BenchmarkNew * swarm/storage/localstore: add retrieval tests benchmarks * swarm/storage/localstore: accessors redesign * swarm/storage/localstore: add semaphore for updateGC goroutine * swarm/storage/localstore: implement basic garbage collection * swarm/storage/localstore: optimize collectGarbage * swarm/storage/localstore: add more garbage collection tests cases * swarm/shed, swarm/storage/localstore: rename IndexItem to Item * swarm/shed: add Index.CountFrom * swarm/storage/localstore: persist gcSize * swarm/storage/localstore: remove composite retrieval index * swarm/shed: IterateWithPrefix and IterateWithPrefixFrom Index functions * swarm/storage/localstore: writeGCSize function with leveldb batch * swarm/storage/localstore: unexport modeSetRemove * swarm/storage/localstore: update writeGCSizeWorker comment * swarm/storage/localstore: add triggerGarbageCollection function * swarm/storage/localstore: call writeGCSize on DB Close * swarm/storage/localstore: additional comment in writeGCSizeWorker * swarm/storage/localstore: add MetricsPrefix option * swarm/storage/localstore: fix a typo * swamr/shed: only one Index Iterate function * swarm/storage/localstore: use shed Iterate function * swarm/shed: pass a new byte slice copy to index decode functions * swarm/storage/localstore: implement feed subscriptions * swarm/storage/localstore: add more subscriptions tests * swarm/storage/localsore: add parallel upload test * swarm/storage/localstore: use storage.MaxPO in subscription tests * swarm/storage/localstore: subscription of addresses instead chunks * swarm/storage/localstore: lock item address in collectGarbage iterator * swarm/storage/localstore: fix TestSubscribePull to include MaxPO * swarm/storage/localstore: improve subscriptions * swarm/storage/localstore: add TestDB_SubscribePull_sinceAndUntil test * swarm/storage/localstore: adjust pull sync tests * swarm/storage/localstore: remove writeGCSizeDelay and use literal * swarm/storage/localstore: adjust subscriptions tests delays and comments * swarm/storage/localstore: add godoc package overview * swarm/storage/localstore: fix a typo * swarm/storage/localstore: update package overview * swarm/storage/localstore: remove repeated index change * swarm/storage/localstore: rename ChunkInfo to ChunkDescriptor * swarm/storage/localstore: add comment in collectGarbageWorker * swarm/storage/localstore: replace atomics with mutexes for gcSize and tests * swarm/storage/localstore: protect addrs map in pull subs tests * swarm/storage/localstore: protect slices in push subs test * swarm/storage/localstore: protect chunks in TestModePutUpload_parallel * swarm/storage/localstore: fix a race in TestDB_updateGCSem defers * swarm/storage/localstore: remove parallel flag from tests * swarm/storage/localstore: fix a race in testDB_collectGarbageWorker * swarm/storage/localstore: remove unused code * swarm/storage/localstore: add more context to pull sub log messages * swarm/storage/localstore: BenchmarkPutUpload and global lock option * swarm/storage/localstore: pre-generate chunks in BenchmarkPutUpload * swarm/storage/localstore: correct useGlobalLock in collectGarbage * swarm/storage/localstore: fix typos and update comments * swarm/storage/localstore: update writeGCSize comment * swarm/storage/localstore: global batch write lock * swarm/storage/localstore: remove global lock option * swarm/storage/localstore: simplify DB.Close
Diffstat (limited to 'swarm')
-rw-r--r--swarm/storage/localstore/gc.go217
-rw-r--r--swarm/storage/localstore/gc_test.go39
-rw-r--r--swarm/storage/localstore/localstore.go126
-rw-r--r--swarm/storage/localstore/localstore_test.go91
-rw-r--r--swarm/storage/localstore/mode_get.go7
-rw-r--r--swarm/storage/localstore/mode_put.go16
-rw-r--r--swarm/storage/localstore/mode_set.go18
7 files changed, 96 insertions, 418 deletions
diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go
index ebaba2d8f..84c4f596d 100644
--- a/swarm/storage/localstore/gc.go
+++ b/swarm/storage/localstore/gc.go
@@ -14,84 +14,9 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-/*
-Counting number of items in garbage collection index
-
-The number of items in garbage collection index is not the same as the number of
-chunks in retrieval index (total number of stored chunks). Chunk can be garbage
-collected only when it is set to a synced state by ModSetSync, and only then can
-be counted into garbage collection size, which determines whether a number of
-chunk should be removed from the storage by the garbage collection. This opens a
-possibility that the storage size exceeds the limit if files are locally
-uploaded and the node is not connected to other nodes or there is a problem with
-syncing.
-
-Tracking of garbage collection size (gcSize) is focused on performance. Key
-points:
-
- 1. counting the number of key/value pairs in LevelDB takes around 0.7s for 1e6
- on a very fast ssd (unacceptable long time in reality)
- 2. locking leveldb batch writes with a global mutex (serial batch writes) is
- not acceptable, we should use locking per chunk address
-
-Because of point 1. we cannot count the number of items in garbage collection
-index in New constructor as it could last very long for realistic scenarios
-where limit is 5e6 and nodes are running on slower hdd disks or cloud providers
-with low IOPS.
-
-Point 2. is a performance optimization to allow parallel batch writes with
-getters, putters and setters. Every single batch that they create contain only
-information related to a single chunk, no relations with other chunks or shared
-statistical data (like gcSize). This approach avoids race conditions on writing
-batches in parallel, but creates a problem of synchronizing statistical data
-values like gcSize. With global mutex lock, any data could be written by any
-batch, but would not use utilize the full potential of leveldb parallel writes.
-
-To mitigate this two problems, the implementation of counting and persisting
-gcSize is split into two parts. One is the in-memory value (gcSize) that is fast
-to read and write with a dedicated mutex (gcSizeMu) if the batch which adds or
-removes items from garbage collection index is successful. The second part is
-the reliable persistence of this value to leveldb database, as storedGCSize
-field. This database field is saved by writeGCSizeWorker and writeGCSize
-functions when in-memory gcSize variable is changed, but no too often to avoid
-very frequent database writes. This database writes are triggered by
-writeGCSizeTrigger when a call is made to function incGCSize. Trigger ensures
-that no database writes are done only when gcSize is changed (contrary to a
-simpler periodic writes or checks). A backoff of 10s in writeGCSizeWorker
-ensures that no frequent batch writes are made. Saving the storedGCSize on
-database Close function ensures that in-memory gcSize is persisted when database
-is closed.
-
-This persistence must be resilient to failures like panics. For this purpose, a
-collection of hashes that are added to the garbage collection index, but still
-not persisted to storedGCSize, must be tracked to count them in when DB is
-constructed again with New function after the failure (swarm node restarts). On
-every batch write that adds a new item to garbage collection index, the same
-hash is added to gcUncountedHashesIndex. This ensures that there is a persisted
-information which hashes were added to the garbage collection index. But, when
-the storedGCSize is saved by writeGCSize function, this values are removed in
-the same batch in which storedGCSize is changed to ensure consistency. When the
-panic happen, or database Close method is not saved. The database storage
-contains all information to reliably and efficiently get the correct number of
-items in garbage collection index. This is performed in the New function when
-all hashes in gcUncountedHashesIndex are counted, added to the storedGCSize and
-saved to the disk before the database is constructed again. Index
-gcUncountedHashesIndex is acting as dirty bit for recovery that provides
-information what needs to be corrected. With a simple dirty bit, the whole
-garbage collection index should me counted on recovery instead only the items in
-gcUncountedHashesIndex. Because of the triggering mechanizm of writeGCSizeWorker
-and relatively short backoff time, the number of hashes in
-gcUncountedHashesIndex should be low and it should take a very short time to
-recover from the previous failure. If there was no failure and
-gcUncountedHashesIndex is empty, which is the usual case, New function will take
-the minimal time to return.
-*/
-
package localstore
import (
- "time"
-
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
@@ -109,7 +34,7 @@ var (
gcTargetRatio = 0.9
// gcBatchSize limits the number of chunks in a single
// leveldb batch on garbage collection.
- gcBatchSize int64 = 1000
+ gcBatchSize uint64 = 1000
)
// collectGarbageWorker is a long running function that waits for
@@ -149,20 +74,21 @@ func (db *DB) collectGarbageWorker() {
// is false, another call to this function is needed to collect
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
-func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) {
+func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
batch := new(leveldb.Batch)
target := db.gcTarget()
+ // protect database from changing idexes and gcSize
+ db.batchMu.Lock()
+ defer db.batchMu.Unlock()
+
+ gcSize, err := db.gcSize.Get()
+ if err != nil {
+ return 0, true, err
+ }
+
done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
- // protect parallel updates
- unlock, err := db.lockAddr(item.Address)
- if err != nil {
- return false, err
- }
- defer unlock()
-
- gcSize := db.getGCSize()
if gcSize-collectedCount <= target {
return true, nil
}
@@ -184,49 +110,19 @@ func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) {
return 0, false, err
}
+ db.gcSize.PutInBatch(batch, gcSize-collectedCount)
+
err = db.shed.WriteBatch(batch)
if err != nil {
return 0, false, err
}
- // batch is written, decrement gcSize
- db.incGCSize(-collectedCount)
return collectedCount, done, nil
}
// gcTrigger retruns the absolute value for garbage collection
// target value, calculated from db.capacity and gcTargetRatio.
-func (db *DB) gcTarget() (target int64) {
- return int64(float64(db.capacity) * gcTargetRatio)
-}
-
-// incGCSize increments gcSize by the provided number.
-// If count is negative, it will decrement gcSize.
-func (db *DB) incGCSize(count int64) {
- if count == 0 {
- return
- }
-
- db.gcSizeMu.Lock()
- new := db.gcSize + count
- db.gcSize = new
- db.gcSizeMu.Unlock()
-
- select {
- case db.writeGCSizeTrigger <- struct{}{}:
- default:
- }
- if new >= db.capacity {
- db.triggerGarbageCollection()
- }
-}
-
-// getGCSize returns gcSize value by locking it
-// with gcSizeMu mutex.
-func (db *DB) getGCSize() (count int64) {
- db.gcSizeMu.RLock()
- count = db.gcSize
- db.gcSizeMu.RUnlock()
- return count
+func (db *DB) gcTarget() (target uint64) {
+ return uint64(float64(db.capacity) * gcTargetRatio)
}
// triggerGarbageCollection signals collectGarbageWorker
@@ -239,68 +135,41 @@ func (db *DB) triggerGarbageCollection() {
}
}
-// writeGCSizeWorker writes gcSize on trigger event
-// and waits writeGCSizeDelay after each write.
-// It implements a linear backoff with delay of
-// writeGCSizeDelay duration to avoid very frequent
-// database operations.
-func (db *DB) writeGCSizeWorker() {
- defer close(db.writeGCSizeWorkerDone)
+// incGCSizeInBatch changes gcSize field value
+// by change which can be negative. This function
+// must be called under batchMu lock.
+func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
+ if change == 0 {
+ return nil
+ }
+ gcSize, err := db.gcSize.Get()
+ if err != nil {
+ return err
+ }
- for {
- select {
- case <-db.writeGCSizeTrigger:
- err := db.writeGCSize(db.getGCSize())
- if err != nil {
- log.Error("localstore write gc size", "err", err)
- }
- // Wait some time before writing gc size in the next
- // iteration. This prevents frequent I/O operations.
- select {
- case <-time.After(10 * time.Second):
- case <-db.close:
- return
- }
- case <-db.close:
- return
+ var new uint64
+ if change > 0 {
+ new = gcSize + uint64(change)
+ } else {
+ // 'change' is an int64 and is negative
+ // a conversion is needed with correct sign
+ c := uint64(-change)
+ if c > gcSize {
+ // protect uint64 undeflow
+ return nil
}
+ new = gcSize - c
}
-}
-
-// writeGCSize stores the number of items in gcIndex.
-// It removes all hashes from gcUncountedHashesIndex
-// not to include them on the next DB initialization
-// (New function) when gcSize is counted.
-func (db *DB) writeGCSize(gcSize int64) (err error) {
- const maxBatchSize = 1000
-
- batch := new(leveldb.Batch)
- db.storedGCSize.PutInBatch(batch, uint64(gcSize))
- batchSize := 1
+ db.gcSize.PutInBatch(batch, new)
- // use only one iterator as it acquires its snapshot
- // not to remove hashes from index that are added
- // after stored gc size is written
- err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) {
- db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
- batchSize++
- if batchSize >= maxBatchSize {
- err = db.shed.WriteBatch(batch)
- if err != nil {
- return false, err
- }
- batch.Reset()
- batchSize = 0
- }
- return false, nil
- }, nil)
- if err != nil {
- return err
+ // trigger garbage collection if we reached the capacity
+ if new >= db.capacity {
+ db.triggerGarbageCollection()
}
- return db.shed.WriteBatch(batch)
+ return nil
}
// testHookCollectGarbage is a hook that can provide
// information when a garbage collection run is done
// and how many items it removed.
-var testHookCollectGarbage func(collectedCount int64)
+var testHookCollectGarbage func(collectedCount uint64)
diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go
index 8ed34384d..081e0af80 100644
--- a/swarm/storage/localstore/gc_test.go
+++ b/swarm/storage/localstore/gc_test.go
@@ -38,7 +38,7 @@ func TestDB_collectGarbageWorker(t *testing.T) {
func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
// lower the maximal number of chunks in a single
// gc batch to ensure multiple batches.
- defer func(s int64) { gcBatchSize = s }(gcBatchSize)
+ defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2
testDB_collectGarbageWorker(t)
@@ -54,8 +54,8 @@ func testDB_collectGarbageWorker(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{
Capacity: 100,
})
- testHookCollectGarbageChan := make(chan int64)
- defer setTestHookCollectGarbage(func(collectedCount int64) {
+ testHookCollectGarbageChan := make(chan uint64)
+ defer setTestHookCollectGarbage(func(collectedCount uint64) {
select {
case testHookCollectGarbageChan <- collectedCount:
case <-db.close:
@@ -93,7 +93,10 @@ func testDB_collectGarbageWorker(t *testing.T) {
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
- gcSize := db.getGCSize()
+ gcSize, err := db.gcSize.Get()
+ if err != nil {
+ t.Fatal(err)
+ }
if gcSize == gcTarget {
break
}
@@ -134,8 +137,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
- testHookCollectGarbageChan := make(chan int64)
- defer setTestHookCollectGarbage(func(collectedCount int64) {
+ testHookCollectGarbageChan := make(chan uint64)
+ defer setTestHookCollectGarbage(func(collectedCount uint64) {
testHookCollectGarbageChan <- collectedCount
})()
@@ -202,7 +205,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
gcTarget := db.gcTarget()
- var totalCollectedCount int64
+ var totalCollectedCount uint64
for {
select {
case c := <-testHookCollectGarbageChan:
@@ -210,13 +213,16 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
- gcSize := db.getGCSize()
+ gcSize, err := db.gcSize.Get()
+ if err != nil {
+ t.Fatal(err)
+ }
if gcSize == gcTarget {
break
}
}
- wantTotalCollectedCount := int64(len(addrs)) - gcTarget
+ wantTotalCollectedCount := uint64(len(addrs)) - gcTarget
if totalCollectedCount != wantTotalCollectedCount {
t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount)
}
@@ -288,10 +294,7 @@ func TestDB_gcSize(t *testing.T) {
}
}
- // DB.Close writes gc size to disk, so
- // Instead calling Close, close the database
- // without it.
- if err := db.closeWithOptions(false); err != nil {
+ if err := db.Close(); err != nil {
t.Fatal(err)
}
@@ -302,14 +305,12 @@ func TestDB_gcSize(t *testing.T) {
defer db.Close()
t.Run("gc index size", newIndexGCSizeTest(db))
-
- t.Run("gc uncounted hashes index count", newItemsCountTest(db.gcUncountedHashesIndex, 0))
}
// setTestHookCollectGarbage sets testHookCollectGarbage and
// returns a function that will reset it to the
// value before the change.
-func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) {
+func setTestHookCollectGarbage(h func(collectedCount uint64)) (reset func()) {
current := testHookCollectGarbage
reset = func() { testHookCollectGarbage = current }
testHookCollectGarbage = h
@@ -321,7 +322,7 @@ func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) {
// resets the original function.
func TestSetTestHookCollectGarbage(t *testing.T) {
// Set the current function after the test finishes.
- defer func(h func(collectedCount int64)) { testHookCollectGarbage = h }(testHookCollectGarbage)
+ defer func(h func(collectedCount uint64)) { testHookCollectGarbage = h }(testHookCollectGarbage)
// expected value for the unchanged function
original := 1
@@ -332,7 +333,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
var got int
// define the original (unchanged) functions
- testHookCollectGarbage = func(_ int64) {
+ testHookCollectGarbage = func(_ uint64) {
got = original
}
@@ -345,7 +346,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
}
// set the new function
- reset := setTestHookCollectGarbage(func(_ int64) {
+ reset := setTestHookCollectGarbage(func(_ uint64) {
got = changed
})
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
index 35044115b..98d4c7881 100644
--- a/swarm/storage/localstore/localstore.go
+++ b/swarm/storage/localstore/localstore.go
@@ -18,7 +18,6 @@ package localstore
import (
"encoding/binary"
- "encoding/hex"
"errors"
"sync"
"time"
@@ -41,7 +40,7 @@ var (
var (
// Default value for Capacity DB option.
- defaultCapacity int64 = 5000000
+ defaultCapacity uint64 = 5000000
// Limit the number of goroutines created by Getters
// that call updateGC function. Value 0 sets no limit.
maxParallelUpdateGC = 1000
@@ -54,8 +53,6 @@ type DB struct {
// schema name of loaded data
schemaName shed.StringField
- // field that stores number of intems in gc index
- storedGCSize shed.Uint64Field
// retrieval indexes
retrievalDataIndex shed.Index
@@ -74,23 +71,16 @@ type DB struct {
// garbage collection index
gcIndex shed.Index
- // index that stores hashes that are not
- // counted in and saved to storedGCSize
- gcUncountedHashesIndex shed.Index
- // number of elements in garbage collection index
- // it must be always read by getGCSize and
- // set with incGCSize which are locking gcSizeMu
- gcSize int64
- gcSizeMu sync.RWMutex
+ // field that stores number of intems in gc index
+ gcSize shed.Uint64Field
+
// garbage collection is triggered when gcSize exceeds
// the capacity value
- capacity int64
+ capacity uint64
// triggers garbage collection event loop
collectGarbageTrigger chan struct{}
- // triggers write gc size event loop
- writeGCSizeTrigger chan struct{}
// a buffered channel acting as a semaphore
// to limit the maximal number of goroutines
@@ -102,7 +92,7 @@ type DB struct {
baseKey []byte
- addressLocks sync.Map
+ batchMu sync.Mutex
// this channel is closed when close function is called
// to terminate other goroutines
@@ -112,7 +102,6 @@ type DB struct {
// garbage collection and gc size write workers
// are done
collectGarbageWorkerDone chan struct{}
- writeGCSizeWorkerDone chan struct{}
}
// Options struct holds optional parameters for configuring DB.
@@ -125,7 +114,7 @@ type Options struct {
MockStore *mock.NodeStore
// Capacity is a limit that triggers garbage collection when
// number of items in gcIndex equals or exceeds it.
- Capacity int64
+ Capacity uint64
// MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string
}
@@ -140,15 +129,13 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
db = &DB{
capacity: o.Capacity,
baseKey: baseKey,
- // channels collectGarbageTrigger and writeGCSizeTrigger
- // need to be buffered with the size of 1
+ // channel collectGarbageTrigger
+ // needs to be buffered with the size of 1
// to signal another event if it
// is triggered during already running function
collectGarbageTrigger: make(chan struct{}, 1),
- writeGCSizeTrigger: make(chan struct{}, 1),
close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}),
- writeGCSizeWorkerDone: make(chan struct{}),
}
if db.capacity <= 0 {
db.capacity = defaultCapacity
@@ -167,7 +154,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return nil, err
}
// Persist gc size.
- db.storedGCSize, err = db.shed.NewUint64Field("gc-size")
+ db.gcSize, err = db.shed.NewUint64Field("gc-size")
if err != nil {
return nil, err
}
@@ -318,48 +305,6 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil {
return nil, err
}
- // gc uncounted hashes index keeps hashes that are in gc index
- // but not counted in and saved to storedGCSize
- db.gcUncountedHashesIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{
- EncodeKey: func(fields shed.Item) (key []byte, err error) {
- return fields.Address, nil
- },
- DecodeKey: func(key []byte) (e shed.Item, err error) {
- e.Address = key
- return e, nil
- },
- EncodeValue: func(fields shed.Item) (value []byte, err error) {
- return nil, nil
- },
- DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- return e, nil
- },
- })
- if err != nil {
- return nil, err
- }
-
- // count number of elements in garbage collection index
- gcSize, err := db.storedGCSize.Get()
- if err != nil {
- return nil, err
- }
- // get number of uncounted hashes
- gcUncountedSize, err := db.gcUncountedHashesIndex.Count()
- if err != nil {
- return nil, err
- }
- gcSize += uint64(gcUncountedSize)
- // remove uncounted hashes from the index and
- // save the total gcSize after uncounted hashes are removed
- err = db.writeGCSize(int64(gcSize))
- if err != nil {
- return nil, err
- }
- db.incGCSize(int64(gcSize))
-
- // start worker to write gc size
- go db.writeGCSizeWorker()
// start garbage collection worker
go db.collectGarbageWorker()
return db, nil
@@ -367,34 +312,16 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// Close closes the underlying database.
func (db *DB) Close() (err error) {
- return db.closeWithOptions(true)
-}
-
-// closeWithOptions provides a more control which part of closing
-// is done for tests.
-func (db *DB) closeWithOptions(writeGCSize bool) (err error) {
close(db.close)
db.updateGCWG.Wait()
- // wait for gc worker and gc size write workers to
+ // wait for gc worker to
// return before closing the shed
- timeout := time.After(5 * time.Second)
select {
case <-db.collectGarbageWorkerDone:
- case <-timeout:
+ case <-time.After(5 * time.Second):
log.Error("localstore: collect garbage worker did not return after db close")
}
- select {
- case <-db.writeGCSizeWorkerDone:
- case <-timeout:
- log.Error("localstore: write gc size worker did not return after db close")
- }
-
- if writeGCSize {
- if err := db.writeGCSize(db.getGCSize()); err != nil {
- log.Error("localstore: write gc size", "err", err)
- }
- }
return db.shed.Close()
}
@@ -404,35 +331,6 @@ func (db *DB) po(addr chunk.Address) (bin uint8) {
return uint8(chunk.Proximity(db.baseKey, addr))
}
-var (
- // Maximal time for lockAddr to wait until it
- // returns error.
- addressLockTimeout = 3 * time.Second
- // duration between two lock checks in lockAddr.
- addressLockCheckDelay = 30 * time.Microsecond
-)
-
-// lockAddr sets the lock on a particular address
-// using addressLocks sync.Map and returns unlock function.
-// If the address is locked this function will check it
-// in a for loop for addressLockTimeout time, after which
-// it will return ErrAddressLockTimeout error.
-func (db *DB) lockAddr(addr chunk.Address) (unlock func(), err error) {
- start := time.Now()
- lockKey := hex.EncodeToString(addr)
- for {
- _, loaded := db.addressLocks.LoadOrStore(lockKey, struct{}{})
- if !loaded {
- break
- }
- time.Sleep(addressLockCheckDelay)
- if time.Since(start) > addressLockTimeout {
- return nil, ErrAddressLockTimeout
- }
- }
- return func() { db.addressLocks.Delete(lockKey) }, nil
-}
-
// chunkToItem creates new Item with data provided by the Chunk.
func chunkToItem(ch chunk.Chunk) shed.Item {
return shed.Item{
diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go
index d10624173..42e762587 100644
--- a/swarm/storage/localstore/localstore_test.go
+++ b/swarm/storage/localstore/localstore_test.go
@@ -24,7 +24,6 @@ import (
"os"
"runtime"
"sort"
- "strconv"
"sync"
"testing"
"time"
@@ -137,89 +136,6 @@ func TestDB_updateGCSem(t *testing.T) {
}
}
-// BenchmarkNew measures the time that New function
-// needs to initialize and count the number of key/value
-// pairs in GC index.
-// This benchmark generates a number of chunks, uploads them,
-// sets them to synced state for them to enter the GC index,
-// and measures the execution time of New function by creating
-// new databases with the same data directory.
-//
-// This benchmark takes significant amount of time.
-//
-// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show
-// that New function executes around 1s for database with 1M chunks.
-//
-// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkNew -v -timeout 20m
-// goos: darwin
-// goarch: amd64
-// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
-// BenchmarkNew/1000-8 200 11672414 ns/op 9570960 B/op 10008 allocs/op
-// BenchmarkNew/10000-8 100 14890609 ns/op 10490118 B/op 7759 allocs/op
-// BenchmarkNew/100000-8 20 58334080 ns/op 17763157 B/op 22978 allocs/op
-// BenchmarkNew/1000000-8 2 748595153 ns/op 45297404 B/op 253242 allocs/op
-// PASS
-func BenchmarkNew(b *testing.B) {
- if testing.Short() {
- b.Skip("skipping benchmark in short mode")
- }
- for _, count := range []int{
- 1000,
- 10000,
- 100000,
- 1000000,
- } {
- b.Run(strconv.Itoa(count), func(b *testing.B) {
- dir, err := ioutil.TempDir("", "localstore-new-benchmark")
- if err != nil {
- b.Fatal(err)
- }
- defer os.RemoveAll(dir)
- baseKey := make([]byte, 32)
- if _, err := rand.Read(baseKey); err != nil {
- b.Fatal(err)
- }
- db, err := New(dir, baseKey, nil)
- if err != nil {
- b.Fatal(err)
- }
- defer db.Close()
- uploader := db.NewPutter(ModePutUpload)
- syncer := db.NewSetter(ModeSetSync)
- for i := 0; i < count; i++ {
- chunk := generateTestRandomChunk()
- err := uploader.Put(chunk)
- if err != nil {
- b.Fatal(err)
- }
- err = syncer.Set(chunk.Address())
- if err != nil {
- b.Fatal(err)
- }
- }
- err = db.Close()
- if err != nil {
- b.Fatal(err)
- }
- b.ResetTimer()
-
- for n := 0; n < b.N; n++ {
- b.StartTimer()
- db, err := New(dir, baseKey, nil)
- b.StopTimer()
-
- if err != nil {
- b.Fatal(err)
- }
- err = db.Close()
- if err != nil {
- b.Fatal(err)
- }
- }
- })
- }
-}
-
// newTestDB is a helper function that constructs a
// temporary database and returns a cleanup function that must
// be called to remove the data.
@@ -411,7 +327,7 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
// value is the same as the number of items in DB.gcIndex.
func newIndexGCSizeTest(db *DB) func(t *testing.T) {
return func(t *testing.T) {
- var want int64
+ var want uint64
err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
want++
return
@@ -419,7 +335,10 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got := db.getGCSize()
+ got, err := db.gcSize.Get()
+ if err != nil {
+ t.Fatal(err)
+ }
if got != want {
t.Errorf("got gc size %v, want %v", got, want)
}
diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go
index 9640cd27e..a6353e141 100644
--- a/swarm/storage/localstore/mode_get.go
+++ b/swarm/storage/localstore/mode_get.go
@@ -113,11 +113,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
// only Address and Data fields with non zero values,
// which is ensured by the get function.
func (db *DB) updateGC(item shed.Item) (err error) {
- unlock, err := db.lockAddr(item.Address)
- if err != nil {
- return err
- }
- defer unlock()
+ db.batchMu.Lock()
+ defer db.batchMu.Unlock()
batch := new(leveldb.Batch)
diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go
index 81df43535..1599ca8e3 100644
--- a/swarm/storage/localstore/mode_put.go
+++ b/swarm/storage/localstore/mode_put.go
@@ -64,11 +64,8 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) {
// with their nil values.
func (db *DB) put(mode ModePut, item shed.Item) (err error) {
// protect parallel updates
- unlock, err := db.lockAddr(item.Address)
- if err != nil {
- return err
- }
- defer unlock()
+ db.batchMu.Lock()
+ defer db.batchMu.Unlock()
batch := new(leveldb.Batch)
@@ -116,7 +113,6 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index
db.gcIndex.PutInBatch(batch, item)
- db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++
db.retrievalDataIndex.PutInBatch(batch, item)
@@ -143,12 +139,14 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
return ErrInvalidMode
}
- err = db.shed.WriteBatch(batch)
+ err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
return err
}
- if gcSizeChange != 0 {
- db.incGCSize(gcSizeChange)
+
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return err
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go
index a7c9875fe..83fcbea52 100644
--- a/swarm/storage/localstore/mode_set.go
+++ b/swarm/storage/localstore/mode_set.go
@@ -63,11 +63,8 @@ func (s *Setter) Set(addr chunk.Address) (err error) {
// of this function for the same address in parallel.
func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
// protect parallel updates
- unlock, err := db.lockAddr(addr)
- if err != nil {
- return err
- }
- defer unlock()
+ db.batchMu.Lock()
+ defer db.batchMu.Unlock()
batch := new(leveldb.Batch)
@@ -113,7 +110,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.pullIndex.PutInBatch(batch, item)
triggerPullFeed = true
db.gcIndex.PutInBatch(batch, item)
- db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++
case ModeSetSync:
@@ -151,7 +147,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.retrievalAccessIndex.PutInBatch(batch, item)
db.pushIndex.DeleteInBatch(batch, item)
db.gcIndex.PutInBatch(batch, item)
- db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++
case modeSetRemove:
@@ -179,7 +174,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.retrievalAccessIndex.DeleteInBatch(batch, item)
db.pullIndex.DeleteInBatch(batch, item)
db.gcIndex.DeleteInBatch(batch, item)
- db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
// a check is needed for decrementing gcSize
// as delete is not reporting if the key/value pair
// is deleted or not
@@ -191,12 +185,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
return ErrInvalidMode
}
- err = db.shed.WriteBatch(batch)
+ err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
return err
}
- if gcSizeChange != 0 {
- db.incGCSize(gcSizeChange)
+
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return err
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))