aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/streamer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r--swarm/network/stream/streamer_test.go295
1 files changed, 268 insertions, 27 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index cdaeb92d0..e92ee3783 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -22,23 +22,29 @@ import (
"errors"
"fmt"
"strconv"
+ "strings"
+ "sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/state"
"golang.org/x/crypto/sha3"
)
func TestStreamerSubscribe(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
@@ -48,11 +54,11 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
@@ -139,11 +145,11 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return newTestClient(t), nil
@@ -232,11 +238,11 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
@@ -299,11 +305,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -365,11 +371,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -409,11 +415,11 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -472,11 +478,11 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -537,11 +543,11 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -636,11 +642,11 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 10), nil
@@ -769,15 +775,15 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
// leaving place for new streams.
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -845,13 +851,13 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
// error message exchange.
func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -930,14 +936,14 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
//TestHasPriceImplementation is to check that the Registry has a
//`Price` interface implementation
func TestHasPriceImplementation(t *testing.T) {
- _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ _, r, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
@@ -1105,7 +1111,6 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
}
-
// print some output
for p, subs := range fakeSubscriptions {
log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p))
@@ -1114,3 +1119,239 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
}
+
+// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function
+func TestGetSubscriptions(t *testing.T) {
+ // create an amount of dummy peers
+ testPeerCount := 8
+ // every peer will have this amount of dummy servers
+ testServerCount := 4
+ // the peerMap which will store this data for the registry
+ peerMap := make(map[enode.ID]*Peer)
+ // create the registry
+ r := &Registry{}
+ api := NewAPI(r)
+ // call once, at this point should be empty
+ regs := api.GetPeerSubscriptions()
+ if len(regs) != 0 {
+ t.Fatal("Expected subscription count to be 0, but it is not")
+ }
+
+ // now create a number of dummy servers for each node
+ for i := 0; i < testPeerCount; i++ {
+ addr := network.RandomAddr()
+ id := addr.ID()
+ p := &Peer{}
+ p.servers = make(map[Stream]*server)
+ for k := 0; k < testServerCount; k++ {
+ s := Stream{
+ Name: strconv.Itoa(k),
+ Key: "",
+ Live: false,
+ }
+ p.servers[s] = &server{}
+ }
+ peerMap[id] = p
+ }
+ r.peers = peerMap
+
+ // call the subscriptions again
+ regs = api.GetPeerSubscriptions()
+ // count how many (fake) subscriptions there are
+ cnt := 0
+ for _, reg := range regs {
+ for range reg {
+ cnt++
+ }
+ }
+ // check expected value
+ expectedCount := testPeerCount * testServerCount
+ if cnt != expectedCount {
+ t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt)
+ }
+}
+
+/*
+TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
+starts the simulation, waits for SyncUpdateDelay in order to kick off
+stream registration, then tests that there are subscriptions.
+*/
+func TestGetSubscriptionsRPC(t *testing.T) {
+
+ // arbitrarily set to 4
+ nodeCount := 4
+ // run with more nodes if `longrunning` flag is set
+ if *longrunning {
+ nodeCount = 64
+ }
+ // set the syncUpdateDelay for sync registrations to start
+ syncUpdateDelay := 200 * time.Millisecond
+ // holds the msg code for SubscribeMsg
+ var subscribeMsgCode uint64
+ var ok bool
+ var expectedMsgCount counter
+
+ // this channel signalizes that the expected amount of subscriptiosn is done
+ allSubscriptionsDone := make(chan struct{})
+ // after the test, we need to reset the subscriptionFunc to the default
+ defer func() { subscriptionFunc = doRequestSubscription }()
+
+ // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
+ subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
+ expectedMsgCount.inc()
+ doRequestSubscription(r, p, bin, subs)
+ return true
+ }
+ // create a standard sim
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // configure so that sync registrations actually happen
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe, //enable sync registrations
+ SyncUpdateDelay: syncUpdateDelay,
+ }, nil)
+
+ // get the SubscribeMsg code
+ subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
+ if !ok {
+ t.Fatal("Message code for SubscribeMsg not found")
+ }
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
+
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancelSimRun()
+
+ // upload a snapshot
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // setup the filter for SubscribeMsg
+ msgs := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
+ )
+
+ // strategy: listen to all SubscribeMsg events; after every event we wait
+ // if after `waitDuration` no more messages are being received, we assume the
+ // subscription phase has terminated!
+
+ // the loop in this go routine will either wait for new message events
+ // or times out after 1 second, which signals that we are not receiving
+ // any new subscriptions any more
+ go func() {
+ //for long running sims, waiting 1 sec will not be enough
+ waitDuration := time.Duration(nodeCount/16) * time.Second
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case m := <-msgs: // just reset the loop
+ if m.Error != nil {
+ log.Error("stream message", "err", m.Error)
+ continue
+ }
+ log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
+ case <-time.After(waitDuration):
+ // one second passed, don't assume more subscriptions
+ allSubscriptionsDone <- struct{}{}
+ log.Info("All subscriptions received")
+ return
+
+ }
+ }
+ }()
+
+ //run the simulation
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ log.Info("Simulation running")
+ nodes := sim.Net.Nodes
+
+ //wait until all subscriptions are done
+ select {
+ case <-allSubscriptionsDone:
+ case <-ctx.Done():
+ return errors.New("Context timed out")
+ }
+
+ log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
+ //now iterate again, this time we call each node via RPC to get its subscriptions
+ realCount := 0
+ for _, node := range nodes {
+ //create rpc client
+ client, err := node.Client()
+ if err != nil {
+ return fmt.Errorf("create node 1 rpc client fail: %v", err)
+ }
+
+ //ask it for subscriptions
+ pstreams := make(map[string][]string)
+ err = client.Call(&pstreams, "stream_getPeerSubscriptions")
+ if err != nil {
+ return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
+ }
+ //length of the subscriptions can not be smaller than number of peers
+ log.Debug("node subscriptions", "node", node.String())
+ for p, ps := range pstreams {
+ log.Debug("... with", "peer", p)
+ for _, s := range ps {
+ log.Debug(".......", "stream", s)
+ // each node also has subscriptions to RETRIEVE_REQUEST streams,
+ // we need to ignore those, we are only counting SYNC streams
+ if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
+ realCount++
+ }
+ }
+ }
+ }
+ // every node is mutually subscribed to each other, so the actual count is half of it
+ emc := expectedMsgCount.count()
+ if realCount/2 != emc {
+ return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
+ }
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
+
+// counter is used to concurrently increment
+// and read an integer value.
+type counter struct {
+ v int
+ mu sync.RWMutex
+}
+
+// Increment the counter.
+func (c *counter) inc() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.v++
+}
+
+// Read the counter value.
+func (c *counter) count() int {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ return c.v
+}