aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/mode_put.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/localstore/mode_put.go')
-rw-r--r--swarm/storage/localstore/mode_put.go118
1 files changed, 68 insertions, 50 deletions
diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go
index 1599ca8e3..a8e355ad0 100644
--- a/swarm/storage/localstore/mode_put.go
+++ b/swarm/storage/localstore/mode_put.go
@@ -17,44 +17,31 @@
package localstore
import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
-// ModePut enumerates different Putter modes.
-type ModePut int
-
-// Putter modes.
-const (
- // ModePutRequest: when a chunk is received as a result of retrieve request and delivery
- ModePutRequest ModePut = iota
- // ModePutSync: when a chunk is received via syncing
- ModePutSync
- // ModePutUpload: when a chunk is created by local upload
- ModePutUpload
-)
+// Put stores the Chunk to database and depending
+// on the Putter mode, it updates required indexes.
+// Put is required to implement chunk.Store
+// interface.
+func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
+ metricName := fmt.Sprintf("localstore.Put.%s", mode)
-// Putter provides Put method to store Chunks
-// to database.
-type Putter struct {
- db *DB
- mode ModePut
-}
+ metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+ defer totalTimeMetric(metricName, time.Now())
-// NewPutter returns a new Putter on database
-// with a specific Mode.
-func (db *DB) NewPutter(mode ModePut) *Putter {
- return &Putter{
- mode: mode,
- db: db,
+ exists, err = db.put(mode, chunkToItem(ch))
+ if err != nil {
+ metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
-}
-
-// Put stores the Chunk to database and depending
-// on the Putter mode, it updates required indexes.
-func (p *Putter) Put(ch chunk.Chunk) (err error) {
- return p.db.put(p.mode, chunkToItem(ch))
+ return exists, err
}
// put stores Item to database and updates other
@@ -62,7 +49,7 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) {
// of this function for the same address in parallel.
// Item fields Address and Data must not be
// with their nil values.
-func (db *DB) put(mode ModePut, item shed.Item) (err error) {
+func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
@@ -76,7 +63,7 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
var triggerPushFeed bool // signal push feed subscriptions to iterate
switch mode {
- case ModePutRequest:
+ case chunk.ModePutRequest:
// put to indexes: retrieve, gc; it does not enter the syncpool
// check if the chunk already is in the database
@@ -84,20 +71,25 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
+ exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
+ exists = false
// no chunk accesses
default:
- return err
+ return false, err
}
i, err = db.retrievalDataIndex.Get(item)
switch err {
case nil:
+ exists = true
item.StoreTimestamp = i.StoreTimestamp
+ item.BinID = i.BinID
case leveldb.ErrNotFound:
// no chunk accesses
+ exists = false
default:
- return err
+ return false, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
@@ -107,6 +99,12 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
+ if item.BinID == 0 {
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ }
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
@@ -117,36 +115,56 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
db.retrievalDataIndex.PutInBatch(batch, item)
- case ModePutUpload:
+ case chunk.ModePutUpload:
// put to indexes: retrieve, push, pull
- item.StoreTimestamp = now()
- db.retrievalDataIndex.PutInBatch(batch, item)
- db.pullIndex.PutInBatch(batch, item)
- triggerPullFeed = true
- db.pushIndex.PutInBatch(batch, item)
- triggerPushFeed = true
+ exists, err = db.retrievalDataIndex.Has(item)
+ if err != nil {
+ return false, err
+ }
+ if !exists {
+ item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ db.pushIndex.PutInBatch(batch, item)
+ triggerPushFeed = true
+ }
- case ModePutSync:
+ case chunk.ModePutSync:
// put to indexes: retrieve, pull
- item.StoreTimestamp = now()
- db.retrievalDataIndex.PutInBatch(batch, item)
- db.pullIndex.PutInBatch(batch, item)
- triggerPullFeed = true
+ exists, err = db.retrievalDataIndex.Has(item)
+ if err != nil {
+ return exists, err
+ }
+ if !exists {
+ item.StoreTimestamp = now()
+ item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
+ if err != nil {
+ return false, err
+ }
+ db.retrievalDataIndex.PutInBatch(batch, item)
+ db.pullIndex.PutInBatch(batch, item)
+ triggerPullFeed = true
+ }
default:
- return ErrInvalidMode
+ return false, ErrInvalidMode
}
err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
- return err
+ return false, err
}
err = db.shed.WriteBatch(batch)
if err != nil {
- return err
+ return false, err
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
@@ -154,5 +172,5 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
if triggerPushFeed {
db.triggerPushSubscriptions()
}
- return nil
+ return exists, nil
}