aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/streamer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r--swarm/network/stream/streamer_test.go70
1 files changed, 46 insertions, 24 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index c2aee61b7..e92ee3783 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -41,10 +41,10 @@ import (
func TestStreamerSubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
@@ -55,10 +55,10 @@ func TestStreamerSubscribe(t *testing.T) {
func TestStreamerRequestSubscription(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
@@ -146,10 +146,10 @@ func (self *testServer) Close() {
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return newTestClient(t), nil
@@ -239,10 +239,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
@@ -306,10 +306,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -372,10 +372,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -416,10 +416,10 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -479,10 +479,10 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -544,10 +544,10 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -643,10 +643,10 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(nil)
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 10), nil
@@ -780,10 +780,10 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -854,10 +854,10 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -940,10 +940,10 @@ func TestHasPriceImplementation(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
@@ -1177,6 +1177,7 @@ starts the simulation, waits for SyncUpdateDelay in order to kick off
stream registration, then tests that there are subscriptions.
*/
func TestGetSubscriptionsRPC(t *testing.T) {
+
// arbitrarily set to 4
nodeCount := 4
// run with more nodes if `longrunning` flag is set
@@ -1188,19 +1189,16 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// holds the msg code for SubscribeMsg
var subscribeMsgCode uint64
var ok bool
- var expectedMsgCount = 0
+ var expectedMsgCount counter
// this channel signalizes that the expected amount of subscriptiosn is done
allSubscriptionsDone := make(chan struct{})
- lock := sync.RWMutex{}
// after the test, we need to reset the subscriptionFunc to the default
defer func() { subscriptionFunc = doRequestSubscription }()
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
- lock.Lock()
- expectedMsgCount++
- lock.Unlock()
+ expectedMsgCount.inc()
doRequestSubscription(r, p, bin, subs)
return true
}
@@ -1290,24 +1288,24 @@ func TestGetSubscriptionsRPC(t *testing.T) {
select {
case <-allSubscriptionsDone:
case <-ctx.Done():
- t.Fatal("Context timed out")
+ return errors.New("Context timed out")
}
- log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount)
+ log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
//now iterate again, this time we call each node via RPC to get its subscriptions
realCount := 0
for _, node := range nodes {
//create rpc client
client, err := node.Client()
if err != nil {
- t.Fatalf("create node 1 rpc client fail: %v", err)
+ return fmt.Errorf("create node 1 rpc client fail: %v", err)
}
//ask it for subscriptions
pstreams := make(map[string][]string)
err = client.Call(&pstreams, "stream_getPeerSubscriptions")
if err != nil {
- t.Fatal(err)
+ return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
}
//length of the subscriptions can not be smaller than number of peers
log.Debug("node subscriptions", "node", node.String())
@@ -1324,8 +1322,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
}
}
// every node is mutually subscribed to each other, so the actual count is half of it
- if realCount/2 != expectedMsgCount {
- return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, expectedMsgCount)
+ emc := expectedMsgCount.count()
+ if realCount/2 != emc {
+ return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
}
return nil
})
@@ -1333,3 +1332,26 @@ func TestGetSubscriptionsRPC(t *testing.T) {
t.Fatal(result.Error)
}
}
+
+// counter is used to concurrently increment
+// and read an integer value.
+type counter struct {
+ v int
+ mu sync.RWMutex
+}
+
+// Increment the counter.
+func (c *counter) inc() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.v++
+}
+
+// Read the counter value.
+func (c *counter) count() int {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ return c.v
+}