aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/shed
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-01-07 20:20:11 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2019-01-07 20:20:11 +0800
commit356c49fa7ec88632f839226c9b0f1cf172ec6f9d (patch)
tree88e4b73e9b8be2b91268a7f66988de5d67557ebe /swarm/shed
parent428eabe28d1077356b16f25828d78d8693a766bb (diff)
downloaddexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.tar
dexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.tar.gz
dexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.tar.bz2
dexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.tar.lz
dexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.tar.xz
dexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.tar.zst
dexon-356c49fa7ec88632f839226c9b0f1cf172ec6f9d.zip
swarm: Shed Index and Uint64Field additions (#18398)
Diffstat (limited to 'swarm/shed')
-rw-r--r--swarm/shed/db.go2
-rw-r--r--swarm/shed/example_store_test.go40
-rw-r--r--swarm/shed/field_uint64.go38
-rw-r--r--swarm/shed/field_uint64_test.go106
-rw-r--r--swarm/shed/index.go156
-rw-r--r--swarm/shed/index_test.go459
6 files changed, 671 insertions, 130 deletions
diff --git a/swarm/shed/db.go b/swarm/shed/db.go
index 7377e12d2..d4e5d1b23 100644
--- a/swarm/shed/db.go
+++ b/swarm/shed/db.go
@@ -18,7 +18,7 @@
// more complex operations on storage data organized in fields and indexes.
//
// Only type which holds logical information about swarm storage chunks data
-// and metadata is IndexItem. This part is not generalized mostly for
+// and metadata is Item. This part is not generalized mostly for
// performance reasons.
package shed
diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go
index 908a1e446..9a83855e7 100644
--- a/swarm/shed/example_store_test.go
+++ b/swarm/shed/example_store_test.go
@@ -71,20 +71,20 @@ func New(path string) (s *Store, err error) {
}
// Index storing actual chunk address, data and store timestamp.
s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
- EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
- DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
- EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
},
- DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:]
return e, nil
@@ -96,19 +96,19 @@ func New(path string) (s *Store, err error) {
// Index storing access timestamp for a particular address.
// It is needed in order to update gc index keys for iteration order.
s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
- EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
- DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
- EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
return b, nil
},
- DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
return e, nil
},
@@ -118,23 +118,23 @@ func New(path string) (s *Store, err error) {
}
// Index with keys ordered by access timestamp for garbage collection prioritization.
s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
- EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
+ EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
key = append(b, fields.Address...)
return key, nil
},
- DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
+ DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
e.Address = key[16:]
return e, nil
},
- EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
+ EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
- DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
+ DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
@@ -146,7 +146,7 @@ func New(path string) (s *Store, err error) {
// Put stores the chunk and sets it store timestamp.
func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
- return s.retrievalIndex.Put(shed.IndexItem{
+ return s.retrievalIndex.Put(shed.Item{
Address: ch.Address(),
Data: ch.Data(),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -161,7 +161,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
batch := new(leveldb.Batch)
// Get the chunk data and storage timestamp.
- item, err := s.retrievalIndex.Get(shed.IndexItem{
+ item, err := s.retrievalIndex.Get(shed.Item{
Address: addr,
})
if err != nil {
@@ -172,13 +172,13 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
}
// Get the chunk access timestamp.
- accessItem, err := s.accessIndex.Get(shed.IndexItem{
+ accessItem, err := s.accessIndex.Get(shed.Item{
Address: addr,
})
switch err {
case nil:
// Remove gc index entry if access timestamp is found.
- err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{
+ err = s.gcIndex.DeleteInBatch(batch, shed.Item{
Address: item.Address,
StoreTimestamp: accessItem.AccessTimestamp,
AccessTimestamp: item.StoreTimestamp,
@@ -197,7 +197,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
accessTimestamp := time.Now().UTC().UnixNano()
// Put new access timestamp in access index.
- err = s.accessIndex.PutInBatch(batch, shed.IndexItem{
+ err = s.accessIndex.PutInBatch(batch, shed.Item{
Address: addr,
AccessTimestamp: accessTimestamp,
})
@@ -206,7 +206,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
}
// Put new access timestamp in gc index.
- err = s.gcIndex.PutInBatch(batch, shed.IndexItem{
+ err = s.gcIndex.PutInBatch(batch, shed.Item{
Address: item.Address,
AccessTimestamp: accessTimestamp,
StoreTimestamp: item.StoreTimestamp,
@@ -244,7 +244,7 @@ func (s *Store) CollectGarbage() (err error) {
// New batch for a new cg round.
trash := new(leveldb.Batch)
// Iterate through all index items and break when needed.
- err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) {
+ err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// Remove the chunk.
err = s.retrievalIndex.DeleteInBatch(trash, item)
if err != nil {
@@ -265,7 +265,7 @@ func (s *Store) CollectGarbage() (err error) {
return true, nil
}
return false, nil
- })
+ }, nil)
if err != nil {
return err
}
diff --git a/swarm/shed/field_uint64.go b/swarm/shed/field_uint64.go
index 80e0069ae..0417583ac 100644
--- a/swarm/shed/field_uint64.go
+++ b/swarm/shed/field_uint64.go
@@ -99,6 +99,44 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
return val, nil
}
+// Dec decrements a uint64 value in the database.
+// This operation is not goroutine save.
+// The field is protected from overflow to a negative value.
+func (f Uint64Field) Dec() (val uint64, err error) {
+ val, err = f.Get()
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ val = 0
+ } else {
+ return 0, err
+ }
+ }
+ if val != 0 {
+ val--
+ }
+ return val, f.Put(val)
+}
+
+// DecInBatch decrements a uint64 value in the batch
+// by retreiving a value from the database, not the same batch.
+// This operation is not goroutine save.
+// The field is protected from overflow to a negative value.
+func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error) {
+ val, err = f.Get()
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ val = 0
+ } else {
+ return 0, err
+ }
+ }
+ if val != 0 {
+ val--
+ }
+ f.PutInBatch(batch, val)
+ return val, nil
+}
+
// encode transforms uint64 to 8 byte long
// slice in big endian encoding.
func encodeUint64(val uint64) (b []byte) {
diff --git a/swarm/shed/field_uint64_test.go b/swarm/shed/field_uint64_test.go
index 69ade71ba..9462b56dd 100644
--- a/swarm/shed/field_uint64_test.go
+++ b/swarm/shed/field_uint64_test.go
@@ -192,3 +192,109 @@ func TestUint64Field_IncInBatch(t *testing.T) {
t.Errorf("got uint64 %v, want %v", got, want)
}
}
+
+// TestUint64Field_Dec validates Dec operation
+// of the Uint64Field.
+func TestUint64Field_Dec(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ counter, err := db.NewUint64Field("counter")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // test overflow protection
+ var want uint64
+ got, err := counter.Dec()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+
+ want = 32
+ err = counter.Put(want)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want = 31
+ got, err = counter.Dec()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+}
+
+// TestUint64Field_DecInBatch validates DecInBatch operation
+// of the Uint64Field.
+func TestUint64Field_DecInBatch(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ counter, err := db.NewUint64Field("counter")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ batch := new(leveldb.Batch)
+ var want uint64
+ got, err := counter.DecInBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err = counter.Get()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+
+ batch2 := new(leveldb.Batch)
+ want = 42
+ counter.PutInBatch(batch2, want)
+ err = db.WriteBatch(batch2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err = counter.Get()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+
+ batch3 := new(leveldb.Batch)
+ want = 41
+ got, err = counter.DecInBatch(batch3)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+ err = db.WriteBatch(batch3)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err = counter.Get()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got != want {
+ t.Errorf("got uint64 %v, want %v", got, want)
+ }
+}
diff --git a/swarm/shed/index.go b/swarm/shed/index.go
index ba803e3c2..df88b1b62 100644
--- a/swarm/shed/index.go
+++ b/swarm/shed/index.go
@@ -17,22 +17,24 @@
package shed
import (
+ "bytes"
+
"github.com/syndtr/goleveldb/leveldb"
)
-// IndexItem holds fields relevant to Swarm Chunk data and metadata.
+// Item holds fields relevant to Swarm Chunk data and metadata.
// All information required for swarm storage and operations
// on that storage must be defined here.
// This structure is logically connected to swarm storage,
// the only part of this package that is not generalized,
// mostly for performance reasons.
//
-// IndexItem is a type that is used for retrieving, storing and encoding
+// Item is a type that is used for retrieving, storing and encoding
// chunk data and metadata. It is passed as an argument to Index encoding
// functions, get function and put function.
// But it is also returned with additional data from get function call
// and as the argument in iterator function definition.
-type IndexItem struct {
+type Item struct {
Address []byte
Data []byte
AccessTimestamp int64
@@ -43,9 +45,9 @@ type IndexItem struct {
}
// Merge is a helper method to construct a new
-// IndexItem by filling up fields with default values
-// of a particular IndexItem with values from another one.
-func (i IndexItem) Merge(i2 IndexItem) (new IndexItem) {
+// Item by filling up fields with default values
+// of a particular Item with values from another one.
+func (i Item) Merge(i2 Item) (new Item) {
if i.Address == nil {
i.Address = i2.Address
}
@@ -67,26 +69,26 @@ func (i IndexItem) Merge(i2 IndexItem) (new IndexItem) {
// Index represents a set of LevelDB key value pairs that have common
// prefix. It holds functions for encoding and decoding keys and values
// to provide transparent actions on saved data which inclide:
-// - getting a particular IndexItem
-// - saving a particular IndexItem
+// - getting a particular Item
+// - saving a particular Item
// - iterating over a sorted LevelDB keys
// It implements IndexIteratorInterface interface.
type Index struct {
db *DB
prefix []byte
- encodeKeyFunc func(fields IndexItem) (key []byte, err error)
- decodeKeyFunc func(key []byte) (e IndexItem, err error)
- encodeValueFunc func(fields IndexItem) (value []byte, err error)
- decodeValueFunc func(value []byte) (e IndexItem, err error)
+ encodeKeyFunc func(fields Item) (key []byte, err error)
+ decodeKeyFunc func(key []byte) (e Item, err error)
+ encodeValueFunc func(fields Item) (value []byte, err error)
+ decodeValueFunc func(keyFields Item, value []byte) (e Item, err error)
}
// IndexFuncs structure defines functions for encoding and decoding
// LevelDB keys and values for a specific index.
type IndexFuncs struct {
- EncodeKey func(fields IndexItem) (key []byte, err error)
- DecodeKey func(key []byte) (e IndexItem, err error)
- EncodeValue func(fields IndexItem) (value []byte, err error)
- DecodeValue func(value []byte) (e IndexItem, err error)
+ EncodeKey func(fields Item) (key []byte, err error)
+ DecodeKey func(key []byte) (e Item, err error)
+ EncodeValue func(fields Item) (value []byte, err error)
+ DecodeValue func(keyFields Item, value []byte) (e Item, err error)
}
// NewIndex returns a new Index instance with defined name and
@@ -105,7 +107,7 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
// by appending the provided index id byte.
// This is needed to avoid collisions between keys of different
// indexes as all index ids are unique.
- encodeKeyFunc: func(e IndexItem) (key []byte, err error) {
+ encodeKeyFunc: func(e Item) (key []byte, err error) {
key, err = funcs.EncodeKey(e)
if err != nil {
return nil, err
@@ -115,7 +117,7 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
// This function reverses the encodeKeyFunc constructed key
// to transparently work with index keys without their index ids.
// It assumes that index keys are prefixed with only one byte.
- decodeKeyFunc: func(key []byte) (e IndexItem, err error) {
+ decodeKeyFunc: func(key []byte) (e Item, err error) {
return funcs.DecodeKey(key[1:])
},
encodeValueFunc: funcs.EncodeValue,
@@ -123,10 +125,10 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
}, nil
}
-// Get accepts key fields represented as IndexItem to retrieve a
+// Get accepts key fields represented as Item to retrieve a
// value from the index and return maximum available information
-// from the index represented as another IndexItem.
-func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) {
+// from the index represented as another Item.
+func (f Index) Get(keyFields Item) (out Item, err error) {
key, err := f.encodeKeyFunc(keyFields)
if err != nil {
return out, err
@@ -135,16 +137,16 @@ func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) {
if err != nil {
return out, err
}
- out, err = f.decodeValueFunc(value)
+ out, err = f.decodeValueFunc(keyFields, value)
if err != nil {
return out, err
}
return out.Merge(keyFields), nil
}
-// Put accepts IndexItem to encode information from it
+// Put accepts Item to encode information from it
// and save it to the database.
-func (f Index) Put(i IndexItem) (err error) {
+func (f Index) Put(i Item) (err error) {
key, err := f.encodeKeyFunc(i)
if err != nil {
return err
@@ -159,7 +161,7 @@ func (f Index) Put(i IndexItem) (err error) {
// PutInBatch is the same as Put method, but it just
// saves the key/value pair to the batch instead
// directly to the database.
-func (f Index) PutInBatch(batch *leveldb.Batch, i IndexItem) (err error) {
+func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error) {
key, err := f.encodeKeyFunc(i)
if err != nil {
return err
@@ -172,9 +174,9 @@ func (f Index) PutInBatch(batch *leveldb.Batch, i IndexItem) (err error) {
return nil
}
-// Delete accepts IndexItem to remove a key/value pair
+// Delete accepts Item to remove a key/value pair
// from the database based on its fields.
-func (f Index) Delete(keyFields IndexItem) (err error) {
+func (f Index) Delete(keyFields Item) (err error) {
key, err := f.encodeKeyFunc(keyFields)
if err != nil {
return err
@@ -184,7 +186,7 @@ func (f Index) Delete(keyFields IndexItem) (err error) {
// DeleteInBatch is the same as Delete just the operation
// is performed on the batch instead on the database.
-func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields IndexItem) (err error) {
+func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields Item) (err error) {
key, err := f.encodeKeyFunc(keyFields)
if err != nil {
return err
@@ -193,32 +195,71 @@ func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields IndexItem) (err err
return nil
}
-// IndexIterFunc is a callback on every IndexItem that is decoded
+// IndexIterFunc is a callback on every Item that is decoded
// by iterating on an Index keys.
// By returning a true for stop variable, iteration will
// stop, and by returning the error, that error will be
// propagated to the called iterator method on Index.
-type IndexIterFunc func(item IndexItem) (stop bool, err error)
+type IndexIterFunc func(item Item) (stop bool, err error)
+
+// IterateOptions defines optional parameters for Iterate function.
+type IterateOptions struct {
+ // StartFrom is the Item to start the iteration from.
+ StartFrom *Item
+ // If SkipStartFromItem is true, StartFrom item will not
+ // be iterated on.
+ SkipStartFromItem bool
+ // Iterate over items which keys have a common prefix.
+ Prefix []byte
+}
-// IterateAll iterates over all keys of the Index.
-func (f Index) IterateAll(fn IndexIterFunc) (err error) {
+// Iterate function iterates over keys of the Index.
+// If IterateOptions is nil, the iterations is over all keys.
+func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
+ if options == nil {
+ options = new(IterateOptions)
+ }
+ // construct a prefix with Index prefix and optional common key prefix
+ prefix := append(f.prefix, options.Prefix...)
+ // start from the prefix
+ startKey := prefix
+ if options.StartFrom != nil {
+ // start from the provided StartFrom Item key value
+ startKey, err = f.encodeKeyFunc(*options.StartFrom)
+ if err != nil {
+ return err
+ }
+ }
it := f.db.NewIterator()
defer it.Release()
- for ok := it.Seek(f.prefix); ok; ok = it.Next() {
+ // move the cursor to the start key
+ ok := it.Seek(startKey)
+ if !ok {
+ // stop iterator if seek has failed
+ return it.Error()
+ }
+ if options.SkipStartFromItem && bytes.Equal(startKey, it.Key()) {
+ // skip the start from Item if it is the first key
+ // and it is explicitly configured to skip it
+ ok = it.Next()
+ }
+ for ; ok; ok = it.Next() {
key := it.Key()
- if key[0] != f.prefix[0] {
+ if !bytes.HasPrefix(key, prefix) {
break
}
- keyIndexItem, err := f.decodeKeyFunc(key)
+ // create a copy of key byte slice not to share leveldb underlaying slice array
+ keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...))
if err != nil {
return err
}
- valueIndexItem, err := f.decodeValueFunc(it.Value())
+ // create a copy of value byte slice not to share leveldb underlaying slice array
+ valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...))
if err != nil {
return err
}
- stop, err := fn(keyIndexItem.Merge(valueIndexItem))
+ stop, err := fn(keyItem.Merge(valueItem))
if err != nil {
return err
}
@@ -229,12 +270,27 @@ func (f Index) IterateAll(fn IndexIterFunc) (err error) {
return it.Error()
}
-// IterateFrom iterates over Index keys starting from the key
-// encoded from the provided IndexItem.
-func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) {
+// Count returns the number of items in index.
+func (f Index) Count() (count int, err error) {
+ it := f.db.NewIterator()
+ defer it.Release()
+
+ for ok := it.Seek(f.prefix); ok; ok = it.Next() {
+ key := it.Key()
+ if key[0] != f.prefix[0] {
+ break
+ }
+ count++
+ }
+ return count, it.Error()
+}
+
+// CountFrom returns the number of items in index keys
+// starting from the key encoded from the provided Item.
+func (f Index) CountFrom(start Item) (count int, err error) {
startKey, err := f.encodeKeyFunc(start)
if err != nil {
- return err
+ return 0, err
}
it := f.db.NewIterator()
defer it.Release()
@@ -244,21 +300,7 @@ func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) {
if key[0] != f.prefix[0] {
break
}
- keyIndexItem, err := f.decodeKeyFunc(key)
- if err != nil {
- return err
- }
- valueIndexItem, err := f.decodeValueFunc(it.Value())
- if err != nil {
- return err
- }
- stop, err := fn(keyIndexItem.Merge(valueIndexItem))
- if err != nil {
- return err
- }
- if stop {
- break
- }
+ count++
}
- return it.Error()
+ return count, it.Error()
}
diff --git a/swarm/shed/index_test.go b/swarm/shed/index_test.go
index ba82216df..97d7c91f4 100644
--- a/swarm/shed/index_test.go
+++ b/swarm/shed/index_test.go
@@ -29,20 +29,20 @@ import (
// Index functions for the index that is used in tests in this file.
var retrievalIndexFuncs = IndexFuncs{
- EncodeKey: func(fields IndexItem) (key []byte, err error) {
+ EncodeKey: func(fields Item) (key []byte, err error) {
return fields.Address, nil
},
- DecodeKey: func(key []byte) (e IndexItem, err error) {
+ DecodeKey: func(key []byte) (e Item, err error) {
e.Address = key
return e, nil
},
- EncodeValue: func(fields IndexItem) (value []byte, err error) {
+ EncodeValue: func(fields Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
},
- DecodeValue: func(value []byte) (e IndexItem, err error) {
+ DecodeValue: func(keyItem Item, value []byte) (e Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:]
return e, nil
@@ -60,7 +60,7 @@ func TestIndex(t *testing.T) {
}
t.Run("put", func(t *testing.T) {
- want := IndexItem{
+ want := Item{
Address: []byte("put-hash"),
Data: []byte("DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -70,16 +70,16 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: want.Address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
t.Run("overwrite", func(t *testing.T) {
- want := IndexItem{
+ want := Item{
Address: []byte("put-hash"),
Data: []byte("New DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -89,18 +89,18 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: want.Address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
})
})
t.Run("put in batch", func(t *testing.T) {
- want := IndexItem{
+ want := Item{
Address: []byte("put-in-batch-hash"),
Data: []byte("DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -112,16 +112,16 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: want.Address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
t.Run("overwrite", func(t *testing.T) {
- want := IndexItem{
+ want := Item{
Address: []byte("put-in-batch-hash"),
Data: []byte("New DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -133,13 +133,13 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: want.Address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
})
})
@@ -150,13 +150,13 @@ func TestIndex(t *testing.T) {
address := []byte("put-in-batch-twice-hash")
// put the first item
- index.PutInBatch(batch, IndexItem{
+ index.PutInBatch(batch, Item{
Address: address,
Data: []byte("DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
})
- want := IndexItem{
+ want := Item{
Address: address,
Data: []byte("New DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -168,17 +168,17 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
})
t.Run("delete", func(t *testing.T) {
- want := IndexItem{
+ want := Item{
Address: []byte("delete-hash"),
Data: []byte("DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -188,15 +188,15 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: want.Address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
- err = index.Delete(IndexItem{
+ err = index.Delete(Item{
Address: want.Address,
})
if err != nil {
@@ -204,7 +204,7 @@ func TestIndex(t *testing.T) {
}
wantErr := leveldb.ErrNotFound
- got, err = index.Get(IndexItem{
+ got, err = index.Get(Item{
Address: want.Address,
})
if err != wantErr {
@@ -213,7 +213,7 @@ func TestIndex(t *testing.T) {
})
t.Run("delete in batch", func(t *testing.T) {
- want := IndexItem{
+ want := Item{
Address: []byte("delete-in-batch-hash"),
Data: []byte("DATA"),
StoreTimestamp: time.Now().UTC().UnixNano(),
@@ -223,16 +223,16 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- got, err := index.Get(IndexItem{
+ got, err := index.Get(Item{
Address: want.Address,
})
if err != nil {
t.Fatal(err)
}
- checkIndexItem(t, got, want)
+ checkItem(t, got, want)
batch := new(leveldb.Batch)
- index.DeleteInBatch(batch, IndexItem{
+ index.DeleteInBatch(batch, Item{
Address: want.Address,
})
err = db.WriteBatch(batch)
@@ -241,7 +241,7 @@ func TestIndex(t *testing.T) {
}
wantErr := leveldb.ErrNotFound
- got, err = index.Get(IndexItem{
+ got, err = index.Get(Item{
Address: want.Address,
})
if err != wantErr {
@@ -250,8 +250,9 @@ func TestIndex(t *testing.T) {
})
}
-// TestIndex_iterate validates index iterator functions for correctness.
-func TestIndex_iterate(t *testing.T) {
+// TestIndex_Iterate validates index Iterate
+// functions for correctness.
+func TestIndex_Iterate(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()
@@ -260,7 +261,7 @@ func TestIndex_iterate(t *testing.T) {
t.Fatal(err)
}
- items := []IndexItem{
+ items := []Item{
{
Address: []byte("iterate-hash-01"),
Data: []byte("data80"),
@@ -290,7 +291,7 @@ func TestIndex_iterate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- item04 := IndexItem{
+ item04 := Item{
Address: []byte("iterate-hash-04"),
Data: []byte("data0"),
}
@@ -306,31 +307,53 @@ func TestIndex_iterate(t *testing.T) {
t.Run("all", func(t *testing.T) {
var i int
- err := index.IterateAll(func(item IndexItem) (stop bool, err error) {
+ err := index.Iterate(func(item Item) (stop bool, err error) {
if i > len(items)-1 {
return true, fmt.Errorf("got unexpected index item: %#v", item)
}
want := items[i]
- checkIndexItem(t, item, want)
+ checkItem(t, item, want)
i++
return false, nil
- })
+ }, nil)
if err != nil {
t.Fatal(err)
}
})
- t.Run("from", func(t *testing.T) {
+ t.Run("start from", func(t *testing.T) {
startIndex := 2
i := startIndex
- err := index.IterateFrom(items[startIndex], func(item IndexItem) (stop bool, err error) {
+ err := index.Iterate(func(item Item) (stop bool, err error) {
if i > len(items)-1 {
return true, fmt.Errorf("got unexpected index item: %#v", item)
}
want := items[i]
- checkIndexItem(t, item, want)
+ checkItem(t, item, want)
i++
return false, nil
+ }, &IterateOptions{
+ StartFrom: &items[startIndex],
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+
+ t.Run("skip start from", func(t *testing.T) {
+ startIndex := 2
+ i := startIndex + 1
+ err := index.Iterate(func(item Item) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", item)
+ }
+ want := items[i]
+ checkItem(t, item, want)
+ i++
+ return false, nil
+ }, &IterateOptions{
+ StartFrom: &items[startIndex],
+ SkipStartFromItem: true,
})
if err != nil {
t.Fatal(err)
@@ -341,19 +364,19 @@ func TestIndex_iterate(t *testing.T) {
var i int
stopIndex := 3
var count int
- err := index.IterateAll(func(item IndexItem) (stop bool, err error) {
+ err := index.Iterate(func(item Item) (stop bool, err error) {
if i > len(items)-1 {
return true, fmt.Errorf("got unexpected index item: %#v", item)
}
want := items[i]
- checkIndexItem(t, item, want)
+ checkItem(t, item, want)
count++
if i == stopIndex {
return true, nil
}
i++
return false, nil
- })
+ }, nil)
if err != nil {
t.Fatal(err)
}
@@ -369,46 +392,378 @@ func TestIndex_iterate(t *testing.T) {
t.Fatal(err)
}
- secondIndexItem := IndexItem{
+ secondItem := Item{
Address: []byte("iterate-hash-10"),
Data: []byte("data-second"),
}
- err = secondIndex.Put(secondIndexItem)
+ err = secondIndex.Put(secondItem)
if err != nil {
t.Fatal(err)
}
var i int
- err = index.IterateAll(func(item IndexItem) (stop bool, err error) {
+ err = index.Iterate(func(item Item) (stop bool, err error) {
if i > len(items)-1 {
return true, fmt.Errorf("got unexpected index item: %#v", item)
}
want := items[i]
- checkIndexItem(t, item, want)
+ checkItem(t, item, want)
i++
return false, nil
- })
+ }, nil)
if err != nil {
t.Fatal(err)
}
i = 0
- err = secondIndex.IterateAll(func(item IndexItem) (stop bool, err error) {
+ err = secondIndex.Iterate(func(item Item) (stop bool, err error) {
if i > 1 {
return true, fmt.Errorf("got unexpected index item: %#v", item)
}
- checkIndexItem(t, item, secondIndexItem)
+ checkItem(t, item, secondItem)
+ i++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+}
+
+// TestIndex_Iterate_withPrefix validates index Iterate
+// function for correctness.
+func TestIndex_Iterate_withPrefix(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ index, err := db.NewIndex("retrieval", retrievalIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ allItems := []Item{
+ {Address: []byte("want-hash-00"), Data: []byte("data80")},
+ {Address: []byte("skip-hash-01"), Data: []byte("data81")},
+ {Address: []byte("skip-hash-02"), Data: []byte("data82")},
+ {Address: []byte("skip-hash-03"), Data: []byte("data83")},
+ {Address: []byte("want-hash-04"), Data: []byte("data84")},
+ {Address: []byte("want-hash-05"), Data: []byte("data85")},
+ {Address: []byte("want-hash-06"), Data: []byte("data86")},
+ {Address: []byte("want-hash-07"), Data: []byte("data87")},
+ {Address: []byte("want-hash-08"), Data: []byte("data88")},
+ {Address: []byte("want-hash-09"), Data: []byte("data89")},
+ {Address: []byte("skip-hash-10"), Data: []byte("data90")},
+ }
+ batch := new(leveldb.Batch)
+ for _, i := range allItems {
+ index.PutInBatch(batch, i)
+ }
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ prefix := []byte("want")
+
+ items := make([]Item, 0)
+ for _, item := range allItems {
+ if bytes.HasPrefix(item.Address, prefix) {
+ items = append(items, item)
+ }
+ }
+ sort.SliceStable(items, func(i, j int) bool {
+ return bytes.Compare(items[i].Address, items[j].Address) < 0
+ })
+
+ t.Run("with prefix", func(t *testing.T) {
+ var i int
+ err := index.Iterate(func(item Item) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", item)
+ }
+ want := items[i]
+ checkItem(t, item, want)
i++
return false, nil
+ }, &IterateOptions{
+ Prefix: prefix,
})
if err != nil {
t.Fatal(err)
}
+ if i != len(items) {
+ t.Errorf("got %v items, want %v", i, len(items))
+ }
+ })
+
+ t.Run("with prefix and start from", func(t *testing.T) {
+ startIndex := 2
+ var count int
+ i := startIndex
+ err := index.Iterate(func(item Item) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", item)
+ }
+ want := items[i]
+ checkItem(t, item, want)
+ i++
+ count++
+ return false, nil
+ }, &IterateOptions{
+ StartFrom: &items[startIndex],
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantCount := len(items) - startIndex
+ if count != wantCount {
+ t.Errorf("got %v items, want %v", count, wantCount)
+ }
+ })
+
+ t.Run("with prefix and skip start from", func(t *testing.T) {
+ startIndex := 2
+ var count int
+ i := startIndex + 1
+ err := index.Iterate(func(item Item) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", item)
+ }
+ want := items[i]
+ checkItem(t, item, want)
+ i++
+ count++
+ return false, nil
+ }, &IterateOptions{
+ StartFrom: &items[startIndex],
+ SkipStartFromItem: true,
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantCount := len(items) - startIndex - 1
+ if count != wantCount {
+ t.Errorf("got %v items, want %v", count, wantCount)
+ }
+ })
+
+ t.Run("stop", func(t *testing.T) {
+ var i int
+ stopIndex := 3
+ var count int
+ err := index.Iterate(func(item Item) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", item)
+ }
+ want := items[i]
+ checkItem(t, item, want)
+ count++
+ if i == stopIndex {
+ return true, nil
+ }
+ i++
+ return false, nil
+ }, &IterateOptions{
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantItemsCount := stopIndex + 1
+ if count != wantItemsCount {
+ t.Errorf("got %v items, expected %v", count, wantItemsCount)
+ }
+ })
+
+ t.Run("no overflow", func(t *testing.T) {
+ secondIndex, err := db.NewIndex("second-index", retrievalIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ secondItem := Item{
+ Address: []byte("iterate-hash-10"),
+ Data: []byte("data-second"),
+ }
+ err = secondIndex.Put(secondItem)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var i int
+ err = index.Iterate(func(item Item) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", item)
+ }
+ want := items[i]
+ checkItem(t, item, want)
+ i++
+ return false, nil
+ }, &IterateOptions{
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if i != len(items) {
+ t.Errorf("got %v items, want %v", i, len(items))
+ }
+ })
+}
+
+// TestIndex_count tests if Index.Count and Index.CountFrom
+// returns the correct number of items.
+func TestIndex_count(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ index, err := db.NewIndex("retrieval", retrievalIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ items := []Item{
+ {
+ Address: []byte("iterate-hash-01"),
+ Data: []byte("data80"),
+ },
+ {
+ Address: []byte("iterate-hash-02"),
+ Data: []byte("data84"),
+ },
+ {
+ Address: []byte("iterate-hash-03"),
+ Data: []byte("data22"),
+ },
+ {
+ Address: []byte("iterate-hash-04"),
+ Data: []byte("data41"),
+ },
+ {
+ Address: []byte("iterate-hash-05"),
+ Data: []byte("data1"),
+ },
+ }
+ batch := new(leveldb.Batch)
+ for _, i := range items {
+ index.PutInBatch(batch, i)
+ }
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("Count", func(t *testing.T) {
+ got, err := index.Count()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := len(items)
+ if got != want {
+ t.Errorf("got %v items count, want %v", got, want)
+ }
+ })
+
+ t.Run("CountFrom", func(t *testing.T) {
+ got, err := index.CountFrom(Item{
+ Address: items[1].Address,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := len(items) - 1
+ if got != want {
+ t.Errorf("got %v items count, want %v", got, want)
+ }
+ })
+
+ // update the index with another item
+ t.Run("add item", func(t *testing.T) {
+ item04 := Item{
+ Address: []byte("iterate-hash-06"),
+ Data: []byte("data0"),
+ }
+ err = index.Put(item04)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ count := len(items) + 1
+
+ t.Run("Count", func(t *testing.T) {
+ got, err := index.Count()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := count
+ if got != want {
+ t.Errorf("got %v items count, want %v", got, want)
+ }
+ })
+
+ t.Run("CountFrom", func(t *testing.T) {
+ got, err := index.CountFrom(Item{
+ Address: items[1].Address,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := count - 1
+ if got != want {
+ t.Errorf("got %v items count, want %v", got, want)
+ }
+ })
+ })
+
+ // delete some items
+ t.Run("delete items", func(t *testing.T) {
+ deleteCount := 3
+
+ for _, item := range items[:deleteCount] {
+ err := index.Delete(item)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ count := len(items) + 1 - deleteCount
+
+ t.Run("Count", func(t *testing.T) {
+ got, err := index.Count()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := count
+ if got != want {
+ t.Errorf("got %v items count, want %v", got, want)
+ }
+ })
+
+ t.Run("CountFrom", func(t *testing.T) {
+ got, err := index.CountFrom(Item{
+ Address: items[deleteCount+1].Address,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := count - 1
+ if got != want {
+ t.Errorf("got %v items count, want %v", got, want)
+ }
+ })
})
}
-// checkIndexItem is a test helper function that compares if two Index items are the same.
-func checkIndexItem(t *testing.T, got, want IndexItem) {
+// checkItem is a test helper function that compares if two Index items are the same.
+func checkItem(t *testing.T, got, want Item) {
t.Helper()
if !bytes.Equal(got.Address, want.Address) {