aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/subscription_pull_test.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2019-05-10 19:09:01 +0800
committerGitHub <noreply@github.com>2019-05-10 19:09:01 +0800
commit494f5d448a1685d5de4cb1524b863cd1fc9a13b0 (patch)
tree4db9d1afe4910c888f3488cd93e8537501d88314 /swarm/storage/localstore/subscription_pull_test.go
parentc94d582aa781b26412ba7d570f6707d193303a02 (diff)
parent9b1543c282f39d452f611eeee0307bdf828e8bc2 (diff)
downloadgo-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.gz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.bz2
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.lz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.xz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.zst
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.zip
Merge pull request #19550 from ethersphere/swarm-rather-stable
swarm v0.4-rc1
Diffstat (limited to 'swarm/storage/localstore/subscription_pull_test.go')
-rw-r--r--swarm/storage/localstore/subscription_pull_test.go243
1 files changed, 110 insertions, 133 deletions
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
index d5ddae02b..bf364ed44 100644
--- a/swarm/storage/localstore/subscription_pull_test.go
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/shed"
)
// TestDB_SubscribePull uploads some chunks before and after
@@ -35,15 +36,13 @@ func TestDB_SubscribePull(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
// prepopulate database with some chunks
// before the subscription
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -54,22 +53,22 @@ func TestDB_SubscribePull(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ ch, stop := db.SubscribePull(ctx, bin, 0, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3)
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
@@ -82,15 +81,13 @@ func TestDB_SubscribePull_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
// prepopulate database with some chunks
// before the subscription
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -106,23 +103,23 @@ func TestDB_SubscribePull_multiple(t *testing.T) {
// that all of them will write every address error to errChan
for j := 0; j < subsCount; j++ {
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- ch, stop := db.SubscribePull(ctx, bin, nil, nil)
+ ch, stop := db.SubscribePull(ctx, bin, 0, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
}
// upload some chunks just after subscribe
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
- uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3)
+ uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 3)
checkErrChan(ctx, t, errChan, wantedChunksCount*subsCount)
}
@@ -135,61 +132,52 @@ func TestDB_SubscribePull_since(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (first map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ first = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
bin := db.po(ch.Address())
- if _, ok := addrs[bin]; !ok {
- addrs[bin] = make([]chunk.Address, 0)
- }
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
+
if wanted {
+ if _, ok := addrs[bin]; !ok {
+ addrs[bin] = make([]chunk.Address, 0)
+ }
addrs[bin] = append(addrs[bin], ch.Address())
wantedChunksCount++
- }
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
-
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
+ if _, ok := first[bin]; !ok {
+ first[bin] = binIDCounter[bin]
+ }
}
}
- return last
+ return first
}
// prepopulate database with some chunks
// before the subscription
- last := uploadRandomChunks(30, false)
+ uploadRandomChunks(30, false)
- uploadRandomChunks(25, true)
+ first := uploadRandomChunks(25, true)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -200,21 +188,18 @@ func TestDB_SubscribePull_since(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- var since *ChunkDescriptor
- if c, ok := last[bin]; ok {
- since = &c
+ since, ok := first[bin]
+ if !ok {
+ continue
}
- ch, stop := db.SubscribePull(ctx, bin, since, nil)
+ ch, stop := db.SubscribePull(ctx, bin, since, 0)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
- // upload some chunks just after subscribe
- uploadRandomChunks(15, true)
-
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
@@ -226,30 +211,22 @@ func TestDB_SubscribePull_until(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ last = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -264,14 +241,11 @@ func TestDB_SubscribePull_until(t *testing.T) {
wantedChunksCount++
}
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
return last
}
@@ -295,11 +269,11 @@ func TestDB_SubscribePull_until(t *testing.T) {
if !ok {
continue
}
- ch, stop := db.SubscribePull(ctx, bin, nil, &until)
+ ch, stop := db.SubscribePull(ctx, bin, 0, until)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
@@ -316,30 +290,22 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
var addrsMu sync.Mutex
var wantedChunksCount int
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) {
+ uploadRandomChunks := func(count int, wanted bool) (last map[uint8]uint64) {
addrsMu.Lock()
defer addrsMu.Unlock()
- last = make(map[uint8]ChunkDescriptor)
+ last = make(map[uint8]uint64)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -354,14 +320,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
wantedChunksCount++
}
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
return last
}
@@ -387,9 +350,10 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
errChan := make(chan error)
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
- var since *ChunkDescriptor
- if c, ok := upload1[bin]; ok {
- since = &c
+ since, ok := upload1[bin]
+ if ok {
+ // start from the next uploaded chunk
+ since++
}
until, ok := upload2[bin]
if !ok {
@@ -397,11 +361,11 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
// skip this bin from testing
continue
}
- ch, stop := db.SubscribePull(ctx, bin, since, &until)
+ ch, stop := db.SubscribePull(ctx, bin, since, until)
defer stop()
// receive and validate addresses from the subscription
- go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan)
+ go readPullSubscriptionBin(ctx, db, bin, ch, addrs, &addrsMu, errChan)
}
// upload some chunks just after subscribe
@@ -412,14 +376,14 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) {
// uploadRandomChunksBin uploads random chunks to database and adds them to
// the map of addresses ber bin.
-func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
+func uploadRandomChunksBin(t *testing.T, db *DB, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) {
addrsMu.Lock()
defer addrsMu.Unlock()
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -434,10 +398,10 @@ func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uin
}
}
-// readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and
-// sends error to errChan, even if it is nil, to count the number of ChunkDescriptors
+// readPullSubscriptionBin is a helper function that reads all chunk.Descriptors from a channel and
+// sends error to errChan, even if it is nil, to count the number of chunk.Descriptors
// returned by the channel.
-func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) {
+func readPullSubscriptionBin(ctx context.Context, db *DB, bin uint8, ch <-chan chunk.Descriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) {
var i int // address index
for {
select {
@@ -450,9 +414,20 @@ func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDesc
if i+1 > len(addrs[bin]) {
err = fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin)
} else {
- want := addrs[bin][i]
- if !bytes.Equal(got.Address, want) {
- err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want)
+ addr := addrs[bin][i]
+ if !bytes.Equal(got.Address, addr) {
+ err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got.Address.Hex(), addr.Hex())
+ } else {
+ want, err := db.retrievalDataIndex.Get(shed.Item{
+ Address: addr,
+ })
+ if err != nil {
+ err = fmt.Errorf("got chunk (bin id %v in bin %v) from retrieval index %s: %v", i, bin, addrs[bin][i].Hex(), err)
+ } else {
+ if got.BinID != want.BinID {
+ err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got, want)
+ }
+ }
}
}
addrsMu.Unlock()
@@ -486,27 +461,19 @@ func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedC
}
}
-// TestDB_LastPullSubscriptionChunk validates that LastPullSubscriptionChunk
+// TestDB_LastPullSubscriptionBinID validates that LastPullSubscriptionBinID
// is returning the last chunk descriptor for proximity order bins by
// doing a few rounds of chunk uploads.
-func TestDB_LastPullSubscriptionChunk(t *testing.T) {
+func TestDB_LastPullSubscriptionBinID(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
- uploader := db.NewPutter(ModePutUpload)
-
addrs := make(map[uint8][]chunk.Address)
- lastTimestamp := time.Now().UTC().UnixNano()
- var lastTimestampMu sync.RWMutex
- defer setNow(func() (t int64) {
- lastTimestampMu.Lock()
- defer lastTimestampMu.Unlock()
- lastTimestamp++
- return lastTimestamp
- })()
+ binIDCounter := make(map[uint8]uint64)
+ var binIDCounterMu sync.RWMutex
- last := make(map[uint8]ChunkDescriptor)
+ last := make(map[uint8]uint64)
// do a few rounds of uploads and check if
// last pull subscription chunk is correct
@@ -516,7 +483,7 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
- err := uploader.Put(ch)
+ _, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
@@ -528,32 +495,42 @@ func TestDB_LastPullSubscriptionChunk(t *testing.T) {
}
addrs[bin] = append(addrs[bin], ch.Address())
- lastTimestampMu.RLock()
- storeTimestamp := lastTimestamp
- lastTimestampMu.RUnlock()
+ binIDCounterMu.RLock()
+ binIDCounter[bin]++
+ binIDCounterMu.RUnlock()
- last[bin] = ChunkDescriptor{
- Address: ch.Address(),
- StoreTimestamp: storeTimestamp,
- }
+ last[bin] = binIDCounter[bin]
}
// check
for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ {
want, ok := last[bin]
- got, err := db.LastPullSubscriptionChunk(bin)
+ got, err := db.LastPullSubscriptionBinID(bin)
if ok {
if err != nil {
t.Errorf("got unexpected error value %v", err)
}
- if !bytes.Equal(got.Address, want.Address) {
- t.Errorf("got last address %s, want %s", got.Address.Hex(), want.Address.Hex())
- }
- } else {
- if err != chunk.ErrChunkNotFound {
- t.Errorf("got unexpected error value %v, want %v", err, chunk.ErrChunkNotFound)
- }
}
+ if got != want {
+ t.Errorf("got last bin id %v, want %v", got, want)
+ }
+ }
+ }
+}
+
+// TestAddressInBin validates that function addressInBin
+// returns a valid address for every proximity order bin.
+func TestAddressInBin(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ for po := uint8(0); po < chunk.MaxPO; po++ {
+ addr := db.addressInBin(po)
+
+ got := db.po(addr)
+
+ if got != uint8(po) {
+ t.Errorf("got po %v, want %v", got, po)
}
}
}