diff options
Diffstat (limited to 'swarm/storage/localstore')
-rw-r--r-- | swarm/storage/localstore/subscription_pull.go | 18 | ||||
-rw-r--r-- | swarm/storage/localstore/subscription_pull_test.go | 72 |
2 files changed, 90 insertions, 0 deletions
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go index 0830eee70..0b96102e3 100644 --- a/swarm/storage/localstore/subscription_pull.go +++ b/swarm/storage/localstore/subscription_pull.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/syndtr/goleveldb/leveldb" ) // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index. @@ -158,6 +159,23 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD return chunkDescriptors, stop } +// LastPullSubscriptionChunk returns ChunkDescriptor of the latest Chunk +// in pull syncing index for a provided bin. If there are no chunks in +// that bin, chunk.ErrChunkNotFound is returned. +func (db *DB) LastPullSubscriptionChunk(bin uint8) (c *ChunkDescriptor, err error) { + item, err := db.pullIndex.Last([]byte{bin}) + if err != nil { + if err == leveldb.ErrNotFound { + return nil, chunk.ErrChunkNotFound + } + return nil, err + } + return &ChunkDescriptor{ + Address: item.Address, + StoreTimestamp: item.StoreTimestamp, + }, nil +} + // ChunkDescriptor holds information required for Pull syncing. This struct // is provided by subscribing to pull index. type ChunkDescriptor struct { diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go index 130f0c9fe..d5ddae02b 100644 --- a/swarm/storage/localstore/subscription_pull_test.go +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -485,3 +485,75 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC } } } + +// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk +// is returning the last chunk descriptor for proximity order bins by +// doing a few rounds of chunk uploads. +func TestDB_LastPullSubscriptionChunk(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make(map[uint8][]chunk.Address) + + lastTimestamp := time.Now().UTC().UnixNano() + var lastTimestampMu sync.RWMutex + defer setNow(func() (t int64) { + lastTimestampMu.Lock() + defer lastTimestampMu.Unlock() + lastTimestamp++ + return lastTimestamp + })() + + last := make(map[uint8]ChunkDescriptor) + + // do a few rounds of uploads and check if + // last pull subscription chunk is correct + for _, count := range []int{1, 3, 10, 11, 100, 120} { + + // upload + for i := 0; i < count; i++ { + ch := generateTestRandomChunk() + + err := uploader.Put(ch) + if err != nil { + t.Fatal(err) + } + + bin := db.po(ch.Address()) + + if _, ok := addrs[bin]; !ok { + addrs[bin] = make([]chunk.Address, 0) + } + addrs[bin] = append(addrs[bin], ch.Address()) + + lastTimestampMu.RLock() + storeTimestamp := lastTimestamp + lastTimestampMu.RUnlock() + + last[bin] = ChunkDescriptor{ + Address: ch.Address(), + StoreTimestamp: storeTimestamp, + } + } + + // check + for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { + want, ok := last[bin] + got, err := db.LastPullSubscriptionChunk(bin) + if ok { + if err != nil { + t.Errorf("got unexpected error value %v", err) + } + if !bytes.Equal(got.Address, want.Address) { + t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex()) + } + } else { + if err != chunk.ErrChunkNotFound { + t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound) + } + } + } + } +} |