aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r--swarm/network/stream/delivery_test.go97
1 files changed, 69 insertions, 28 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 949645558..29b4f2f69 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -38,8 +38,13 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
+//Tests initializing a retrieve request
func TestStreamerRetrieveRequest(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
+ regOpts := &RegistryOptions{
+ Retrieval: RetrievalClientOnly,
+ Syncing: SyncingDisabled,
+ }
+ tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -55,10 +60,21 @@ func TestStreamerRetrieveRequest(t *testing.T) {
)
streamer.delivery.RequestFromPeers(ctx, req)
+ stream := NewStream(swarmChunkServerStreamName, "", true)
+
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Expects: []p2ptest.Expect{
- {
+ { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ { //expect a retrieve request message for the given hash
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: hash0[:],
@@ -74,9 +90,12 @@ func TestStreamerRetrieveRequest(t *testing.T) {
}
}
+//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
+//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingDisabled, //do no syncing
})
defer teardown()
if err != nil {
@@ -89,16 +108,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
peer := streamer.getPeer(node.ID())
+ stream := NewStream(swarmChunkServerStreamName, "", true)
+ //simulate pre-subscription to RETRIEVE_REQUEST stream on peer
peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
- Stream: NewStream(swarmChunkServerStreamName, "", true),
+ Stream: stream,
History: nil,
Priority: Top,
})
+ //test the exchange
err = tester.TestExchanges(p2ptest.Exchange{
+ Expects: []p2ptest.Expect{
+ { //first expect a subscription to the RETRIEVE_REQUEST stream
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
- {
+ { //then the actual RETRIEVE_REQUEST....
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: chunk.Address()[:],
@@ -107,7 +141,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
},
},
Expects: []p2ptest.Expect{
- {
+ { //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
@@ -120,7 +154,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
},
})
- expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
+ //should fail with a timeout as the peer we are requesting
+ //the chunk from does not have the chunk
+ expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
if err == nil || err.Error() != expectedError {
t.Fatalf("Expected error %v, got %v", expectedError, err)
}
@@ -130,7 +166,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
@@ -138,6 +175,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
node := tester.Nodes[0]
+
peer := streamer.getPeer(node.ID())
stream := NewStream(swarmChunkServerStreamName, "", true)
@@ -156,6 +194,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
err = tester.TestExchanges(p2ptest.Exchange{
+ Expects: []p2ptest.Expect{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
{
@@ -226,7 +276,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
@@ -241,6 +292,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
node := tester.Nodes[0]
+ //subscribe to custom stream
stream := NewStream("foo", "", true)
err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
if err != nil {
@@ -253,7 +305,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Expects: []p2ptest.Expect{
- {
+ { //first expect subscription to the custom stream...
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
@@ -267,7 +319,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
p2ptest.Exchange{
Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
- {
+ { //...then trigger a chunk delivery for the given chunk from peer in order for
+ //local node to get the chunk delivered
Code: 6,
Msg: &ChunkDeliveryMsg{
Addr: chunkKey,
@@ -342,8 +395,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- DoServeRetrieve: true,
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalEnabled,
})
bucket.Store(bucketKeyRegistry, r)
@@ -408,20 +462,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return err
}
- //each of the nodes (except pivot node) subscribes to the stream of the next node
- for j, node := range nodeIDs[0 : nodes-1] {
- sid := nodeIDs[j+1]
- item, ok := sim.NodeItem(node, bucketKeyRegistry)
- if !ok {
- return fmt.Errorf("No registry")
- }
- registry := item.(*Registry)
- err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top)
- if err != nil {
- return err
- }
- }
-
//get the pivot node's filestore
item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore)
if !ok {
@@ -530,7 +570,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
- DoSync: true,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
})