aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-03-02 15:44:22 +0800
committerViktor Trón <viktor.tron@gmail.com>2019-03-02 15:44:22 +0800
commitb797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b (patch)
treece2cb19a39b9973a730713139dfdfad3bf1a7832
parent729bf365b5f17325be9107b63b233da54100eec6 (diff)
downloadgo-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.gz
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.bz2
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.lz
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.xz
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.zst
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.zip
swarm/shed, swarm/storage/localstore: add LastPullSubscriptionChunk (#19190)
* swarm/shed, swarm/storage/localstore: add LastPullSubscriptionChunk * swarm/shed: fix comments * swarm/shed: fix TestIncByteSlice test * swarm/storage/localstore: fix TestDB_LastPullSubscriptionChunk
-rw-r--r--swarm/shed/db_test.go8
-rw-r--r--swarm/shed/index.go99
-rw-r--r--swarm/shed/index_test.go146
-rw-r--r--swarm/storage/localstore/subscription_pull.go18
-rw-r--r--swarm/storage/localstore/subscription_pull_test.go72
5 files changed, 328 insertions, 15 deletions
diff --git a/swarm/shed/db_test.go b/swarm/shed/db_test.go
index 65fdac4a6..4e8276f74 100644
--- a/swarm/shed/db_test.go
+++ b/swarm/shed/db_test.go
@@ -100,11 +100,13 @@ func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) {
if err != nil {
t.Fatal(err)
}
- cleanupFunc = func() { os.RemoveAll(dir) }
db, err = NewDB(dir, "")
if err != nil {
- cleanupFunc()
+ os.RemoveAll(dir)
t.Fatal(err)
}
- return db, cleanupFunc
+ return db, func() {
+ db.Close()
+ os.RemoveAll(dir)
+ }
}
diff --git a/swarm/shed/index.go b/swarm/shed/index.go
index df88b1b62..d02bf1a00 100644
--- a/swarm/shed/index.go
+++ b/swarm/shed/index.go
@@ -20,6 +20,7 @@ import (
"bytes"
"github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
)
// Item holds fields relevant to Swarm Chunk data and metadata.
@@ -245,21 +246,14 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
ok = it.Next()
}
for ; ok; ok = it.Next() {
- key := it.Key()
- if !bytes.HasPrefix(key, prefix) {
- break
- }
- // create a copy of key byte slice not to share leveldb underlaying slice array
- keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...))
- if err != nil {
- return err
- }
- // create a copy of value byte slice not to share leveldb underlaying slice array
- valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...))
+ item, err := f.itemFromIterator(it, prefix)
if err != nil {
+ if err == leveldb.ErrNotFound {
+ break
+ }
return err
}
- stop, err := fn(keyItem.Merge(valueItem))
+ stop, err := fn(item)
if err != nil {
return err
}
@@ -270,6 +264,87 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
return it.Error()
}
+// First returns the first item in the Index which encoded key starts with a prefix.
+// If the prefix is nil, the first element of the whole index is returned.
+// If Index has no elements, a leveldb.ErrNotFound error is returned.
+func (f Index) First(prefix []byte) (i Item, err error) {
+ it := f.db.NewIterator()
+ defer it.Release()
+
+ totalPrefix := append(f.prefix, prefix...)
+ it.Seek(totalPrefix)
+
+ return f.itemFromIterator(it, totalPrefix)
+}
+
+// itemFromIterator returns the Item from the current iterator position.
+// If the complete encoded key does not start with totalPrefix,
+// leveldb.ErrNotFound is returned. Value for totalPrefix must start with
+// Index prefix.
+func (f Index) itemFromIterator(it iterator.Iterator, totalPrefix []byte) (i Item, err error) {
+ key := it.Key()
+ if !bytes.HasPrefix(key, totalPrefix) {
+ return i, leveldb.ErrNotFound
+ }
+ // create a copy of key byte slice not to share leveldb underlaying slice array
+ keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...))
+ if err != nil {
+ return i, err
+ }
+ // create a copy of value byte slice not to share leveldb underlaying slice array
+ valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...))
+ if err != nil {
+ return i, err
+ }
+ return keyItem.Merge(valueItem), it.Error()
+}
+
+// Last returns the last item in the Index which encoded key starts with a prefix.
+// If the prefix is nil, the last element of the whole index is returned.
+// If Index has no elements, a leveldb.ErrNotFound error is returned.
+func (f Index) Last(prefix []byte) (i Item, err error) {
+ it := f.db.NewIterator()
+ defer it.Release()
+
+ // get the next prefix in line
+ // since leveldb iterator Seek seeks to the
+ // next key if the key that it seeks to is not found
+ // and by getting the previous key, the last one for the
+ // actual prefix is found
+ nextPrefix := incByteSlice(prefix)
+ l := len(prefix)
+
+ if l > 0 && nextPrefix != nil {
+ it.Seek(append(f.prefix, nextPrefix...))
+ it.Prev()
+ } else {
+ it.Last()
+ }
+
+ totalPrefix := append(f.prefix, prefix...)
+ return f.itemFromIterator(it, totalPrefix)
+}
+
+// incByteSlice returns the byte slice of the same size
+// of the provided one that is by one incremented in its
+// total value. If all bytes in provided slice are equal
+// to 255 a nil slice would be returned indicating that
+// increment can not happen for the same length.
+func incByteSlice(b []byte) (next []byte) {
+ l := len(b)
+ next = make([]byte, l)
+ copy(next, b)
+ for i := l - 1; i >= 0; i-- {
+ if b[i] == 255 {
+ next[i] = 0
+ } else {
+ next[i] = b[i] + 1
+ return next
+ }
+ }
+ return nil
+}
+
// Count returns the number of items in index.
func (f Index) Count() (count int, err error) {
it := f.db.NewIterator()
diff --git a/swarm/shed/index_test.go b/swarm/shed/index_test.go
index 97d7c91f4..489f001bd 100644
--- a/swarm/shed/index_test.go
+++ b/swarm/shed/index_test.go
@@ -779,3 +779,149 @@ func checkItem(t *testing.T, got, want Item) {
t.Errorf("got access timestamp %v, expected %v", got.AccessTimestamp, want.AccessTimestamp)
}
}
+
+// TestIndex_firstAndLast validates that index First and Last methods
+// are returning expected results based on the provided prefix.
+func TestIndex_firstAndLast(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ index, err := db.NewIndex("retrieval", retrievalIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ addrs := [][]byte{
+ {0, 0, 0, 0, 0},
+ {0, 1},
+ {0, 1, 0, 0, 0},
+ {0, 1, 0, 0, 1},
+ {0, 1, 0, 0, 2},
+ {0, 2, 0, 0, 1},
+ {0, 4, 0, 0, 0},
+ {0, 10, 0, 0, 10},
+ {0, 10, 0, 0, 11},
+ {0, 10, 0, 0, 20},
+ {1, 32, 255, 0, 1},
+ {1, 32, 255, 0, 2},
+ {1, 32, 255, 0, 3},
+ {255, 255, 255, 255, 32},
+ {255, 255, 255, 255, 64},
+ {255, 255, 255, 255, 255},
+ }
+
+ // ensure that the addresses are sorted for
+ // validation of nil prefix
+ sort.Slice(addrs, func(i, j int) (less bool) {
+ return bytes.Compare(addrs[i], addrs[j]) == -1
+ })
+
+ batch := new(leveldb.Batch)
+ for _, addr := range addrs {
+ index.PutInBatch(batch, Item{
+ Address: addr,
+ })
+ }
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ for _, tc := range []struct {
+ prefix []byte
+ first []byte
+ last []byte
+ err error
+ }{
+ {
+ prefix: nil,
+ first: addrs[0],
+ last: addrs[len(addrs)-1],
+ },
+ {
+ prefix: []byte{0, 0},
+ first: []byte{0, 0, 0, 0, 0},
+ last: []byte{0, 0, 0, 0, 0},
+ },
+ {
+ prefix: []byte{0},
+ first: []byte{0, 0, 0, 0, 0},
+ last: []byte{0, 10, 0, 0, 20},
+ },
+ {
+ prefix: []byte{0, 1},
+ first: []byte{0, 1},
+ last: []byte{0, 1, 0, 0, 2},
+ },
+ {
+ prefix: []byte{0, 10},
+ first: []byte{0, 10, 0, 0, 10},
+ last: []byte{0, 10, 0, 0, 20},
+ },
+ {
+ prefix: []byte{1, 32, 255},
+ first: []byte{1, 32, 255, 0, 1},
+ last: []byte{1, 32, 255, 0, 3},
+ },
+ {
+ prefix: []byte{255},
+ first: []byte{255, 255, 255, 255, 32},
+ last: []byte{255, 255, 255, 255, 255},
+ },
+ {
+ prefix: []byte{255, 255, 255, 255, 255},
+ first: []byte{255, 255, 255, 255, 255},
+ last: []byte{255, 255, 255, 255, 255},
+ },
+ {
+ prefix: []byte{0, 3},
+ err: leveldb.ErrNotFound,
+ },
+ {
+ prefix: []byte{222},
+ err: leveldb.ErrNotFound,
+ },
+ } {
+ got, err := index.Last(tc.prefix)
+ if tc.err != err {
+ t.Errorf("got error %v for Last with prefix %v, want %v", err, tc.prefix, tc.err)
+ } else {
+ if !bytes.Equal(got.Address, tc.last) {
+ t.Errorf("got %v for Last with prefix %v, want %v", got.Address, tc.prefix, tc.last)
+ }
+ }
+
+ got, err = index.First(tc.prefix)
+ if tc.err != err {
+ t.Errorf("got error %v for First with prefix %v, want %v", err, tc.prefix, tc.err)
+ } else {
+ if !bytes.Equal(got.Address, tc.first) {
+ t.Errorf("got %v for First with prefix %v, want %v", got.Address, tc.prefix, tc.first)
+ }
+ }
+ }
+}
+
+// TestIncByteSlice validates returned values of incByteSlice function.
+func TestIncByteSlice(t *testing.T) {
+ for _, tc := range []struct {
+ b []byte
+ want []byte
+ }{
+ {b: nil, want: nil},
+ {b: []byte{}, want: nil},
+ {b: []byte{0}, want: []byte{1}},
+ {b: []byte{42}, want: []byte{43}},
+ {b: []byte{255}, want: nil},
+ {b: []byte{0, 0}, want: []byte{0, 1}},
+ {b: []byte{1, 0}, want: []byte{1, 1}},
+ {b: []byte{1, 255}, want: []byte{2, 0}},
+ {b: []byte{255, 255}, want: nil},
+ {b: []byte{32, 0, 255}, want: []byte{32, 1, 0}},
+ } {
+ got := incByteSlice(tc.b)
+ if !bytes.Equal(got, tc.want) {
+ t.Errorf("got %v, want %v", got, tc.want)
+ }
+ }
+}
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
index 0830eee70..0b96102e3 100644
--- a/swarm/storage/localstore/subscription_pull.go
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/syndtr/goleveldb/leveldb"
)
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
@@ -158,6 +159,23 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
return chunkDescriptors, stop
}
+// LastPullSubscriptionChunk returns ChunkDescriptor of the latest Chunk
+// in pull syncing index for a provided bin. If there are no chunks in
+// that bin, chunk.ErrChunkNotFound is returned.
+func (db *DB) LastPullSubscriptionChunk(bin uint8) (c *ChunkDescriptor, err error) {
+ item, err := db.pullIndex.Last([]byte{bin})
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ return nil, chunk.ErrChunkNotFound
+ }
+ return nil, err
+ }
+ return &ChunkDescriptor{
+ Address: item.Address,
+ StoreTimestamp: item.StoreTimestamp,
+ }, nil
+}
+
// ChunkDescriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type ChunkDescriptor struct {
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
index 130f0c9fe..d5ddae02b 100644
--- a/swarm/storage/localstore/subscription_pull_test.go
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -485,3 +485,75 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC
}
}
}
+
+// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk
+// is returning the last chunk descriptor for proximity order bins by
+// doing a few rounds of chunk uploads.
+func TestDB_LastPullSubscriptionChunk(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]chunk.Address)
+
+ lastTimestamp := time.Now().UTC().UnixNano()
+ var lastTimestampMu sync.RWMutex
+ defer setNow(func() (t int64) {
+ lastTimestampMu.Lock()
+ defer lastTimestampMu.Unlock()
+ lastTimestamp++
+ return lastTimestamp
+ })()
+
+ last := make(map[uint8]ChunkDescriptor)
+
+ // do a few rounds of uploads and check if
+ // last pull subscription chunk is correct
+ for _, count := range []int{1, 3, 10, 11, 100, 120} {
+
+ // upload
+ for i := 0; i < count; i++ {
+ ch := generateTestRandomChunk()
+
+ err := uploader.Put(ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ bin := db.po(ch.Address())
+
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]chunk.Address, 0)
+ }
+ addrs[bin] = append(addrs[bin], ch.Address())
+
+ lastTimestampMu.RLock()
+ storeTimestamp := lastTimestamp
+ lastTimestampMu.RUnlock()
+
+ last[bin] = ChunkDescriptor{
+ Address: ch.Address(),
+ StoreTimestamp: storeTimestamp,
+ }
+ }
+
+ // check
+ for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
+ want, ok := last[bin]
+ got, err := db.LastPullSubscriptionChunk(bin)
+ if ok {
+ if err != nil {
+ t.Errorf("got unexpected error value %v", err)
+ }
+ if !bytes.Equal(got.Address, want.Address) {
+ t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex())
+ }
+ } else {
+ if err != chunk.ErrChunkNotFound {
+ t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound)
+ }
+ }
+ }
+ }
+}