aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/localstore.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/localstore/localstore.go')
-rw-r--r--swarm/storage/localstore/localstore.go80
1 files changed, 60 insertions, 20 deletions
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
index 98d4c7881..3b0bd8a93 100644
--- a/swarm/storage/localstore/localstore.go
+++ b/swarm/storage/localstore/localstore.go
@@ -23,11 +23,15 @@ import (
"time"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
)
+// DB implements chunk.Store.
+var _ chunk.Store = &DB{}
+
var (
// ErrInvalidMode is retuned when an unknown Mode
// is provided to the function.
@@ -69,6 +73,10 @@ type DB struct {
pullTriggers map[uint8][]chan struct{}
pullTriggersMu sync.RWMutex
+ // binIDs stores the latest chunk serial ID for every
+ // proximity order bin
+ binIDs shed.Uint64Vector
+
// garbage collection index
gcIndex shed.Index
@@ -124,7 +132,10 @@ type Options struct {
// One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if o == nil {
- o = new(Options)
+ // default options
+ o = &Options{
+ Capacity: 5000000,
+ }
}
db = &DB{
capacity: o.Capacity,
@@ -148,11 +159,23 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil {
return nil, err
}
+
// Identify current storage schema by arbitrary name.
db.schemaName, err = db.shed.NewStringField("schema-name")
if err != nil {
return nil, err
}
+ schemaName, err := db.schemaName.Get()
+ if err != nil {
+ return nil, err
+ }
+ if schemaName == "" {
+ // initial new localstore run
+ err := db.schemaName.Put(DbSchemaSanctuary)
+ if err != nil {
+ return nil, err
+ }
+ }
// Persist gc size.
db.gcSize, err = db.shed.NewUint64Field("gc-size")
if err != nil {
@@ -165,8 +188,9 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
)
if o.MockStore != nil {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, 8)
- binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ b := make([]byte, 16)
+ binary.BigEndian.PutUint64(b[:8], fields.BinID)
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
err = o.MockStore.Put(fields.Address, fields.Data)
if err != nil {
return nil, err
@@ -174,25 +198,28 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return b, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
+ e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data, err = o.MockStore.Get(keyItem.Address)
return e, err
}
} else {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, 8)
- binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
+ b := make([]byte, 16)
+ binary.BigEndian.PutUint64(b[:8], fields.BinID)
+ binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
- e.Data = value[8:]
+ e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
+ e.BinID = binary.BigEndian.Uint64(value[:8])
+ e.Data = value[16:]
return e, nil
}
}
- // Index storing actual chunk address, data and store timestamp.
- db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
+ // Index storing actual chunk address, data and bin id.
+ db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
@@ -230,33 +257,37 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return nil, err
}
// pull index allows history and live syncing per po bin
- db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(fields.Address)
- binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp))
- copy(key[9:], fields.Address[:])
+ binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
- e.Address = key[9:]
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9]))
+ e.BinID = binary.BigEndian.Uint64(key[1:9])
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
- return nil, nil
+ return fields.Address, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.Address = value
return e, nil
},
})
if err != nil {
return nil, err
}
+ // create a vector for bin IDs
+ db.binIDs, err = db.shed.NewUint64Vector("bin-ids")
+ if err != nil {
+ return nil, err
+ }
// create a pull syncing triggers used by SubscribePull function
db.pullTriggers = make(map[uint8][]chan struct{})
// push index contains as yet unsynced chunks
- db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
@@ -281,17 +312,17 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
- db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{
+ db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
- binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
+ binary.BigEndian.PutUint64(b[8:16], fields.BinID)
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
+ e.BinID = binary.BigEndian.Uint64(key[8:16])
e.Address = key[16:]
return e, nil
},
@@ -358,3 +389,12 @@ func init() {
return time.Now().UTC().UnixNano()
}
}
+
+// totalTimeMetric logs a message about time between provided start time
+// and the time when the function is called and sends a resetting timer metric
+// with provided name appended with ".total-time".
+func totalTimeMetric(name string, start time.Time) {
+ totalTime := time.Since(start)
+ log.Trace(name+" total time", "time", totalTime)
+ metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
+}