From 9e99a0c2b94daa351325688702e2a49e34f60dbf Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 24 Sep 2018 17:40:22 +0200 Subject: cmd/swarm, swarm: add stream peer servers limit --- swarm/network/stream/streamer_test.go | 181 ++++++++++++++++++++++++++++++++-- 1 file changed, 172 insertions(+), 9 deletions(-) (limited to 'swarm/network/stream/streamer_test.go') diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 06e96b9a9..4f480b0ac 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -19,6 +19,7 @@ package stream import ( "bytes" "context" + "strconv" "testing" "time" @@ -27,7 +28,7 @@ import ( ) func TestStreamerSubscribe(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -41,7 +42,7 @@ func TestStreamerSubscribe(t *testing.T) { } func TestStreamerRequestSubscription(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -125,7 +126,7 @@ func (self *testServer) Close() { } func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -218,7 +219,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -285,7 +286,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -351,7 +352,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { } func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -395,7 +396,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -460,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -559,7 +560,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -685,3 +686,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { t.Fatal(err) } } + +// TestMaxPeerServersWithUnsubscribe creates a registry with a limited +// number of stream servers, and performs a test with subscriptions and +// unsubscriptions, checking if unsubscriptions will remove streams, +// leaving place for new streams. +func TestMaxPeerServersWithUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + peerID := tester.IDs[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "unsubscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: &UnsubscribeMsg{ + Stream: stream, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} + +// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited +// number of stream servers, and performs subscriptions to detect sunscriptions +// error message exchange. +func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + peerID := tester.IDs[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + if i >= maxPeerServers { + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 7, + Msg: &SubscribeErrorMsg{ + Error: ErrMaxPeerServers.Error(), + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + continue + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} -- cgit v1.2.3