aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r--swarm/network/stream/delivery_test.go93
1 files changed, 48 insertions, 45 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 972cc859a..ece54d4ee 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) {
peerID := tester.IDs[0]
- streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true)
+ ctx := context.Background()
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ streamer.delivery.RequestFromPeers(ctx, req)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{
Code: 5,
Msg: &RetrieveRequestMsg{
- Addr: chunk.Addr[:],
+ Addr: chunk.Address()[:],
},
Peer: peerID,
},
@@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
})
hash := storage.Address(hash0[:])
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = hash
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk := storage.NewChunk(hash, hash)
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
hash = storage.Address(hash1[:])
- chunk = storage.NewChunk(hash, nil)
- chunk.SData = hash1[:]
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk = storage.NewChunk(hash, hash1[:])
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
chunkKey := hash0[:]
chunkData := hash1[:]
- chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey)
-
- if !created {
- t.Fatal("chunk already exists")
- }
- select {
- case <-chunk.ReqC:
- t.Fatal("chunk is already received")
- default:
- }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
},
},
p2ptest.Exchange{
- Label: "ChunkDeliveryRequest message",
+ Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
{
Code: 6,
@@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
-
- timeout := time.NewTimer(1 * time.Second)
-
- select {
- case <-timeout.C:
- t.Fatal("timeout receiving chunk")
- case <-chunk.ReqC:
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ // wait for the chunk to get stored
+ storedChunk, err := localStore.Get(ctx, chunkKey)
+ for err != nil {
+ select {
+ case <-ctx.Done():
+ t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
+ default:
+ }
+ storedChunk, err = localStore.Get(ctx, chunkKey)
+ time.Sleep(50 * time.Millisecond)
}
- storedChunk, err := localStore.Get(context.TODO(), chunkKey)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
- if !bytes.Equal(storedChunk.SData, chunkData) {
+ if !bytes.Equal(storedChunk.Data(), chunkData) {
t.Fatal("Retrieved chunk has different data than original")
}
@@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
@@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
@@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
DoSync: true,
SyncUpdateDelay: 0,
})
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)