aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore
diff options
context:
space:
mode:
authorJanoš Guljaš <janos@users.noreply.github.com>2019-03-02 15:44:22 +0800
committerViktor Trón <viktor.tron@gmail.com>2019-03-02 15:44:22 +0800
commitb797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b (patch)
treece2cb19a39b9973a730713139dfdfad3bf1a7832 /swarm/storage/localstore
parent729bf365b5f17325be9107b63b233da54100eec6 (diff)
downloadgo-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.gz
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.bz2
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.lz
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.xz
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.tar.zst
go-tangerine-b797dd07d2f2cf0868d4fe79e120d5cf0b8fdc0b.zip
swarm/shed, swarm/storage/localstore: add LastPullSubscriptionChunk (#19190)
* swarm/shed, swarm/storage/localstore: add LastPullSubscriptionChunk * swarm/shed: fix comments * swarm/shed: fix TestIncByteSlice test * swarm/storage/localstore: fix TestDB_LastPullSubscriptionChunk
Diffstat (limited to 'swarm/storage/localstore')
-rw-r--r--swarm/storage/localstore/subscription_pull.go18
-rw-r--r--swarm/storage/localstore/subscription_pull_test.go72
2 files changed, 90 insertions, 0 deletions
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
index 0830eee70..0b96102e3 100644
--- a/swarm/storage/localstore/subscription_pull.go
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/syndtr/goleveldb/leveldb"
)
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
@@ -158,6 +159,23 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD
return chunkDescriptors, stop
}
+// LastPullSubscriptionChunk returns ChunkDescriptor of the latest Chunk
+// in pull syncing index for a provided bin. If there are no chunks in
+// that bin, chunk.ErrChunkNotFound is returned.
+func (db *DB) LastPullSubscriptionChunk(bin uint8) (c *ChunkDescriptor, err error) {
+ item, err := db.pullIndex.Last([]byte{bin})
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ return nil, chunk.ErrChunkNotFound
+ }
+ return nil, err
+ }
+ return &ChunkDescriptor{
+ Address: item.Address,
+ StoreTimestamp: item.StoreTimestamp,
+ }, nil
+}
+
// ChunkDescriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type ChunkDescriptor struct {
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
index 130f0c9fe..d5ddae02b 100644
--- a/swarm/storage/localstore/subscription_pull_test.go
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -485,3 +485,75 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC
}
}
}
+
+// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk
+// is returning the last chunk descriptor for proximity order bins by
+// doing a few rounds of chunk uploads.
+func TestDB_LastPullSubscriptionChunk(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ uploader := db.NewPutter(ModePutUpload)
+
+ addrs := make(map[uint8][]chunk.Address)
+
+ lastTimestamp := time.Now().UTC().UnixNano()
+ var lastTimestampMu sync.RWMutex
+ defer setNow(func() (t int64) {
+ lastTimestampMu.Lock()
+ defer lastTimestampMu.Unlock()
+ lastTimestamp++
+ return lastTimestamp
+ })()
+
+ last := make(map[uint8]ChunkDescriptor)
+
+ // do a few rounds of uploads and check if
+ // last pull subscription chunk is correct
+ for _, count := range []int{1, 3, 10, 11, 100, 120} {
+
+ // upload
+ for i := 0; i < count; i++ {
+ ch := generateTestRandomChunk()
+
+ err := uploader.Put(ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ bin := db.po(ch.Address())
+
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]chunk.Address, 0)
+ }
+ addrs[bin] = append(addrs[bin], ch.Address())
+
+ lastTimestampMu.RLock()
+ storeTimestamp := lastTimestamp
+ lastTimestampMu.RUnlock()
+
+ last[bin] = ChunkDescriptor{
+ Address: ch.Address(),
+ StoreTimestamp: storeTimestamp,
+ }
+ }
+
+ // check
+ for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
+ want, ok := last[bin]
+ got, err := db.LastPullSubscriptionChunk(bin)
+ if ok {
+ if err != nil {
+ t.Errorf("got unexpected error value %v", err)
+ }
+ if !bytes.Equal(got.Address, want.Address) {
+ t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex())
+ }
+ } else {
+ if err != chunk.ErrChunkNotFound {
+ t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound)
+ }
+ }
+ }
+ }
+}