aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/streamer_test.go
diff options
context:
space:
mode:
authorJanos Guljas <janos@resenje.org>2018-09-24 23:40:22 +0800
committerJanos Guljas <janos@resenje.org>2018-09-24 23:56:00 +0800
commit9e99a0c2b94daa351325688702e2a49e34f60dbf (patch)
treea66c070a9c55eff5dda766cb35b013657eab37df /swarm/network/stream/streamer_test.go
parent1f45ba9bb1c19489a6c8bf9caf100e56dcb79788 (diff)
downloaddexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.tar
dexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.tar.gz
dexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.tar.bz2
dexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.tar.lz
dexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.tar.xz
dexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.tar.zst
dexon-9e99a0c2b94daa351325688702e2a49e34f60dbf.zip
cmd/swarm, swarm: add stream peer servers limit
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r--swarm/network/stream/streamer_test.go181
1 files changed, 172 insertions, 9 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 06e96b9a9..4f480b0ac 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -19,6 +19,7 @@ package stream
import (
"bytes"
"context"
+ "strconv"
"testing"
"time"
@@ -27,7 +28,7 @@ import (
)
func TestStreamerSubscribe(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -41,7 +42,7 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -125,7 +126,7 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -218,7 +219,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -285,7 +286,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -351,7 +352,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -395,7 +396,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -460,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -559,7 +560,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -685,3 +686,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
t.Fatal(err)
}
}
+
+// TestMaxPeerServersWithUnsubscribe creates a registry with a limited
+// number of stream servers, and performs a test with subscriptions and
+// unsubscriptions, checking if unsubscriptions will remove streams,
+// leaving place for new streams.
+func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
+ var maxPeerServers = 6
+ tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ MaxPeerServers: maxPeerServers,
+ })
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
+ return newTestServer(t), nil
+ })
+
+ peerID := tester.IDs[0]
+
+ for i := 0; i < maxPeerServers+10; i++ {
+ stream := NewStream("foo", strconv.Itoa(i), true)
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ Priority: Top,
+ },
+ Peer: peerID,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ Stream: stream,
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: make([]byte, HashSize),
+ From: 1,
+ To: 1,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "unsubscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 0,
+ Msg: &UnsubscribeMsg{
+ Stream: stream,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
+// number of stream servers, and performs subscriptions to detect sunscriptions
+// error message exchange.
+func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
+ var maxPeerServers = 6
+ tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ MaxPeerServers: maxPeerServers,
+ })
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
+ return newTestServer(t), nil
+ })
+
+ peerID := tester.IDs[0]
+
+ for i := 0; i < maxPeerServers+10; i++ {
+ stream := NewStream("foo", strconv.Itoa(i), true)
+
+ if i >= maxPeerServers {
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ Priority: Top,
+ },
+ Peer: peerID,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 7,
+ Msg: &SubscribeErrorMsg{
+ Error: ErrMaxPeerServers.Error(),
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ continue
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ Priority: Top,
+ },
+ Peer: peerID,
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ Stream: stream,
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: make([]byte, HashSize),
+ From: 1,
+ To: 1,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}