diff options
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r-- | swarm/network/stream/streamer_test.go | 1214 |
1 files changed, 0 insertions, 1214 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go deleted file mode 100644 index 767112b2b..000000000 --- a/swarm/network/stream/streamer_test.go +++ /dev/null @@ -1,1214 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package stream - -import ( - "bytes" - "context" - "errors" - "fmt" - "os" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - p2ptest "github.com/ethereum/go-ethereum/p2p/testing" - "github.com/ethereum/go-ethereum/swarm/network" - "github.com/ethereum/go-ethereum/swarm/network/simulation" - "github.com/ethereum/go-ethereum/swarm/state" - "github.com/ethereum/go-ethereum/swarm/testutil" - "golang.org/x/crypto/sha3" -) - -func TestStreamerSubscribe(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", true) - err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top) - if err == nil || err.Error() != "stream foo not registered" { - t.Fatalf("Expected error %v, got %v", "stream foo not registered", err) - } -} - -func TestStreamerRequestSubscription(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", false) - err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top) - if err == nil || err.Error() != "stream foo not registered" { - t.Fatalf("Expected error %v, got %v", "stream foo not registered", err) - } -} - -var ( - hash0 = sha3.Sum256([]byte{0}) - hash1 = sha3.Sum256([]byte{1}) - hash2 = sha3.Sum256([]byte{2}) - hashesTmp = append(hash0[:], hash1[:]...) - hashes = append(hashesTmp, hash2[:]...) - corruptHashes = append(hashes[:40]) -) - -type testClient struct { - t string - wait0 chan bool - wait2 chan bool - batchDone chan bool - receivedHashes map[string][]byte -} - -func newTestClient(t string) *testClient { - return &testClient{ - t: t, - wait0: make(chan bool), - wait2: make(chan bool), - batchDone: make(chan bool), - receivedHashes: make(map[string][]byte), - } -} - -func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { - self.receivedHashes[string(hash)] = hash - if bytes.Equal(hash, hash0[:]) { - return func(context.Context) error { - <-self.wait0 - return nil - } - } else if bytes.Equal(hash, hash2[:]) { - return func(context.Context) error { - <-self.wait2 - return nil - } - } - return nil -} - -func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { - close(self.batchDone) - return nil -} - -func (self *testClient) Close() {} - -type testServer struct { - t string - sessionIndex uint64 -} - -func newTestServer(t string, sessionIndex uint64) *testServer { - return &testServer{ - t: t, - sessionIndex: sessionIndex, - } -} - -func (s *testServer) SessionIndex() (uint64, error) { - return s.sessionIndex, nil -} - -func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - return make([]byte, HashSize), from + 1, to + 1, nil, nil -} - -func (self *testServer) GetData(context.Context, []byte) ([]byte, error) { - return nil, nil -} - -func (self *testServer) Close() { -} - -func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { - return newTestClient(t), nil - }) - - node := tester.Nodes[0] - - stream := NewStream("foo", "", true) - err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges( - p2ptest.Exchange{ - Label: "Subscribe message", - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, - // trigger OfferedHashesMsg to actually create the client - p2ptest.Exchange{ - Label: "OfferedHashes message", - Triggers: []p2ptest.Trigger{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: hashes, - From: 5, - To: 8, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 2, - Msg: &WantedHashesMsg{ - Stream: stream, - Want: []byte{5}, - From: 9, - To: 0, - }, - Peer: node.ID(), - }, - }, - }, - ) - if err != nil { - t.Fatal(err) - } - - err = streamer.Unsubscribe(node.ID(), stream) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Unsubscribe message", - Expects: []p2ptest.Expect{ - { - Code: 0, - Msg: &UnsubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", false) - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 10), nil - }) - - node := tester.Nodes[0] - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: make([]byte, HashSize), - From: 6, - To: 9, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "unsubscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 0, - Msg: &UnsubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", true) - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 0), nil - }) - - node := tester.Nodes[0] - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: make([]byte, HashSize), - From: 1, - To: 0, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "unsubscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 0, - Msg: &UnsubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 0), nil - }) - - stream := NewStream("bar", "", true) - - node := tester.Nodes[0] - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 7, - Msg: &SubscribeErrorMsg{ - Error: "stream bar not registered", - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", true) - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 10), nil - }) - - node := tester.Nodes[0] - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: NewStream("foo", "", false), - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: make([]byte, HashSize), - From: 6, - To: 9, - }, - Peer: node.ID(), - }, - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - From: 11, - To: 0, - Hashes: make([]byte, HashSize), - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", true) - - var tc *testClient - - streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { - tc = newTestClient(t) - return tc, nil - }) - - node := tester.Nodes[0] - - err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, - p2ptest.Exchange{ - Label: "Corrupt offered hash message", - Triggers: []p2ptest.Trigger{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: corruptHashes, - From: 5, - To: 8, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatal(err) - } - - expectedError := errors.New("subprotocol error") - if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { - t.Fatal(err) - } -} - -func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - stream := NewStream("foo", "", true) - - var tc *testClient - - streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { - tc = newTestClient(t) - return tc, nil - }) - - node := tester.Nodes[0] - - err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, - p2ptest.Exchange{ - Label: "WantedHashes message", - Triggers: []p2ptest.Trigger{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: hashes, - From: 5, - To: 8, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 2, - Msg: &WantedHashesMsg{ - Stream: stream, - Want: []byte{5}, - From: 9, - To: 0, - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatal(err) - } - - if len(tc.receivedHashes) != 3 { - t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes)) - } - - close(tc.wait0) - - timeout := time.NewTimer(100 * time.Millisecond) - defer timeout.Stop() - - select { - case <-tc.batchDone: - t.Fatal("batch done early") - case <-timeout.C: - } - - close(tc.wait2) - - timeout2 := time.NewTimer(10000 * time.Millisecond) - defer timeout2.Stop() - - select { - case <-tc.batchDone: - case <-timeout2.C: - t.Fatal("timeout waiting batchdone call") - } - -} - -func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(nil) - if err != nil { - t.Fatal(err) - } - defer teardown() - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 10), nil - }) - - node := tester.Nodes[0] - - stream := NewStream("foo", "", true) - err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges( - p2ptest.Exchange{ - Label: "RequestSubscription message", - Expects: []p2ptest.Expect{ - { - Code: 8, - Msg: &RequestSubscriptionMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, - p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: NewRange(5, 8), - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: NewStream("foo", "", false), - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: make([]byte, HashSize), - From: 6, - To: 9, - }, - Peer: node.ID(), - }, - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - From: 11, - To: 0, - Hashes: make([]byte, HashSize), - }, - Peer: node.ID(), - }, - }, - }, - ) - if err != nil { - t.Fatal(err) - } - - err = streamer.Quit(node.ID(), stream) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Quit message", - Expects: []p2ptest.Expect{ - { - Code: 9, - Msg: &QuitMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - historyStream := getHistoryStream(stream) - - err = streamer.Quit(node.ID(), historyStream) - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Quit message", - Expects: []p2ptest.Expect{ - { - Code: 9, - Msg: &QuitMsg{ - Stream: historyStream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } -} - -// TestMaxPeerServersWithUnsubscribe creates a registry with a limited -// number of stream servers, and performs a test with subscriptions and -// unsubscriptions, checking if unsubscriptions will remove streams, -// leaving place for new streams. -func TestMaxPeerServersWithUnsubscribe(t *testing.T) { - var maxPeerServers = 6 - tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Syncing: SyncingDisabled, - MaxPeerServers: maxPeerServers, - }) - if err != nil { - t.Fatal(err) - } - defer teardown() - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 0), nil - }) - - node := tester.Nodes[0] - - for i := 0; i < maxPeerServers+10; i++ { - stream := NewStream("foo", strconv.Itoa(i), true) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: make([]byte, HashSize), - From: 1, - To: 0, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "unsubscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 0, - Msg: &UnsubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - } -} - -// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited -// number of stream servers, and performs subscriptions to detect subscriptions -// error message exchange. -func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { - var maxPeerServers = 6 - tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - MaxPeerServers: maxPeerServers, - }) - if err != nil { - t.Fatal(err) - } - defer teardown() - - streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { - return newTestServer(t, 0), nil - }) - - node := tester.Nodes[0] - - for i := 0; i < maxPeerServers+10; i++ { - stream := NewStream("foo", strconv.Itoa(i), true) - - if i >= maxPeerServers { - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 7, - Msg: &SubscribeErrorMsg{ - Error: ErrMaxPeerServers.Error(), - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - continue - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "Subscribe message", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: make([]byte, HashSize), - From: 1, - To: 0, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - } -} - -//TestHasPriceImplementation is to check that the Registry has a -//`Price` interface implementation -func TestHasPriceImplementation(t *testing.T) { - _, r, _, teardown, err := newStreamerTester(&RegistryOptions{ - Syncing: SyncingDisabled, - }) - if err != nil { - t.Fatal(err) - } - defer teardown() - - if r.prices == nil { - t.Fatal("No prices implementation available for the stream protocol") - } - - pricesInstance, ok := r.prices.(*StreamerPrices) - if !ok { - t.Fatal("`Registry` does not have the expected Prices instance") - } - price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{}) - if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() { - t.Fatal("No prices set for chunk delivery msg") - } - - price = pricesInstance.Price(&RetrieveRequestMsg{}) - if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() { - t.Fatal("No prices set for chunk delivery msg") - } -} - -// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function -func TestGetServerSubscriptions(t *testing.T) { - // create an amount of dummy peers - testPeerCount := 8 - // every peer will have this amount of dummy servers - testServerCount := 4 - // the peerMap which will store this data for the registry - peerMap := make(map[enode.ID]*Peer) - // create the registry - r := &Registry{} - api := NewAPI(r) - // call once, at this point should be empty - regs := api.GetPeerServerSubscriptions() - if len(regs) != 0 { - t.Fatal("Expected subscription count to be 0, but it is not") - } - - // now create a number of dummy servers for each node - for i := 0; i < testPeerCount; i++ { - addr := network.RandomAddr() - id := addr.ID() - p := &Peer{} - p.servers = make(map[Stream]*server) - for k := 0; k < testServerCount; k++ { - s := Stream{ - Name: strconv.Itoa(k), - Key: "", - Live: false, - } - p.servers[s] = &server{} - } - peerMap[id] = p - } - r.peers = peerMap - - // call the subscriptions again - regs = api.GetPeerServerSubscriptions() - // count how many (fake) subscriptions there are - cnt := 0 - for _, reg := range regs { - for range reg { - cnt++ - } - } - // check expected value - expectedCount := testPeerCount * testServerCount - if cnt != expectedCount { - t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt) - } -} - -/* -TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, -starts the simulation, waits for SyncUpdateDelay in order to kick off -stream registration, then tests that there are subscriptions. -*/ -func TestGetServerSubscriptionsRPC(t *testing.T) { - - if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { - t.Skip("flaky with -race on Travis") - // Note: related ticket https://github.com/ethersphere/go-ethereum/issues/1234 - } - - // arbitrarily set to 4 - nodeCount := 4 - // set the syncUpdateDelay for sync registrations to start - syncUpdateDelay := 200 * time.Millisecond - // run with more nodes if `longrunning` flag is set - if *longrunning { - nodeCount = 64 - syncUpdateDelay = 10 * time.Second - } - // holds the msg code for SubscribeMsg - var subscribeMsgCode uint64 - var ok bool - var expectedMsgCount counter - - // this channel signalizes that the expected amount of subscriptiosn is done - allSubscriptionsDone := make(chan struct{}) - // after the test, we need to reset the subscriptionFunc to the default - defer func() { subscriptionFunc = doRequestSubscription }() - - // we use this subscriptionFunc for this test: just increases count and calls the actual subscription - subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error { - // syncing starts after syncUpdateDelay and loops after that Duration; we only want to count at the first iteration - // in the first iteration, subs will be empty (no existing subscriptions), thus we can use this check - // this avoids flakyness - expectedMsgCount.inc() - doRequestSubscription(r, id, bin) - return nil - } - // create a standard sim - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) - if err != nil { - return nil, nil, err - } - - // configure so that sync registrations actually happen - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Syncing: SyncingAutoSubscribe, //enable sync registrations - SyncUpdateDelay: syncUpdateDelay, - }, nil) - - // get the SubscribeMsg code - subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{}) - if !ok { - t.Fatal("Message code for SubscribeMsg not found") - } - - cleanup = func() { - r.Close() - clean() - } - - return r, cleanup, nil - }, - }) - defer sim.Close() - - ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancelSimRun() - - // setup the filter for SubscribeMsg - msgs := sim.PeerEvents( - context.Background(), - sim.UpNodeIDs(), - simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode), - ) - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) - if err := sim.UploadSnapshot(ctx, filename); err != nil { - t.Fatal(err) - } - - // strategy: listen to all SubscribeMsg events; after every event we wait - // if after `waitDuration` no more messages are being received, we assume the - // subscription phase has terminated! - - // the loop in this go routine will either wait for new message events - // or times out after 1 second, which signals that we are not receiving - // any new subscriptions any more - go func() { - //for long running sims, waiting 1 sec will not be enough - waitDuration := 1 * time.Second - if *longrunning { - waitDuration = 3 * time.Second - } - for { - select { - case <-ctx.Done(): - return - case m := <-msgs: // just reset the loop - if m.Error != nil { - log.Error("stream message", "err", m.Error) - continue - } - log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID) - case <-time.After(waitDuration): - // one second passed, don't assume more subscriptions - allSubscriptionsDone <- struct{}{} - log.Info("All subscriptions received") - return - - } - } - }() - - //run the simulation - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { - log.Info("Simulation running") - nodes := sim.Net.Nodes - - //wait until all subscriptions are done - select { - case <-allSubscriptionsDone: - case <-ctx.Done(): - return errors.New("Context timed out") - } - - log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count()) - //now iterate again, this time we call each node via RPC to get its subscriptions - realCount := 0 - for _, node := range nodes { - //create rpc client - client, err := node.Client() - if err != nil { - return fmt.Errorf("create node 1 rpc client fail: %v", err) - } - - //ask it for subscriptions - pstreams := make(map[string][]string) - err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") - if err != nil { - return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) - } - //length of the subscriptions can not be smaller than number of peers - log.Debug("node subscriptions", "node", node.String()) - for p, ps := range pstreams { - log.Debug("... with", "peer", p) - for _, s := range ps { - log.Debug(".......", "stream", s) - // each node also has subscriptions to RETRIEVE_REQUEST streams, - // we need to ignore those, we are only counting SYNC streams - if !strings.HasPrefix(s, "RETRIEVE_REQUEST") { - realCount++ - } - } - } - log.Debug("All node streams counted", "realCount", realCount) - } - emc := expectedMsgCount.count() - // after a subscription request, internally a live AND a history stream will be subscribed, - // thus the real count should be half of the actual request subscriptions sent - if realCount/2 != emc { - return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc) - } - return nil - }) - if result.Error != nil { - t.Fatal(result.Error) - } -} - -// counter is used to concurrently increment -// and read an integer value. -type counter struct { - v int - mu sync.RWMutex -} - -// Increment the counter. -func (c *counter) inc() { - c.mu.Lock() - defer c.mu.Unlock() - - c.v++ -} - -// Read the counter value. -func (c *counter) count() int { - c.mu.RLock() - defer c.mu.RUnlock() - - return c.v -} |