aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/streamer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r--swarm/network/stream/streamer_test.go1214
1 files changed, 0 insertions, 1214 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
deleted file mode 100644
index 767112b2b..000000000
--- a/swarm/network/stream/streamer_test.go
+++ /dev/null
@@ -1,1214 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package stream
-
-import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "os"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
- "github.com/ethereum/go-ethereum/swarm/network"
- "github.com/ethereum/go-ethereum/swarm/network/simulation"
- "github.com/ethereum/go-ethereum/swarm/state"
- "github.com/ethereum/go-ethereum/swarm/testutil"
- "golang.org/x/crypto/sha3"
-)
-
-func TestStreamerSubscribe(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", true)
- err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
- if err == nil || err.Error() != "stream foo not registered" {
- t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
- }
-}
-
-func TestStreamerRequestSubscription(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", false)
- err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
- if err == nil || err.Error() != "stream foo not registered" {
- t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
- }
-}
-
-var (
- hash0 = sha3.Sum256([]byte{0})
- hash1 = sha3.Sum256([]byte{1})
- hash2 = sha3.Sum256([]byte{2})
- hashesTmp = append(hash0[:], hash1[:]...)
- hashes = append(hashesTmp, hash2[:]...)
- corruptHashes = append(hashes[:40])
-)
-
-type testClient struct {
- t string
- wait0 chan bool
- wait2 chan bool
- batchDone chan bool
- receivedHashes map[string][]byte
-}
-
-func newTestClient(t string) *testClient {
- return &testClient{
- t: t,
- wait0: make(chan bool),
- wait2: make(chan bool),
- batchDone: make(chan bool),
- receivedHashes: make(map[string][]byte),
- }
-}
-
-func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
- self.receivedHashes[string(hash)] = hash
- if bytes.Equal(hash, hash0[:]) {
- return func(context.Context) error {
- <-self.wait0
- return nil
- }
- } else if bytes.Equal(hash, hash2[:]) {
- return func(context.Context) error {
- <-self.wait2
- return nil
- }
- }
- return nil
-}
-
-func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
- close(self.batchDone)
- return nil
-}
-
-func (self *testClient) Close() {}
-
-type testServer struct {
- t string
- sessionIndex uint64
-}
-
-func newTestServer(t string, sessionIndex uint64) *testServer {
- return &testServer{
- t: t,
- sessionIndex: sessionIndex,
- }
-}
-
-func (s *testServer) SessionIndex() (uint64, error) {
- return s.sessionIndex, nil
-}
-
-func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- return make([]byte, HashSize), from + 1, to + 1, nil, nil
-}
-
-func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
- return nil, nil
-}
-
-func (self *testServer) Close() {
-}
-
-func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
- return newTestClient(t), nil
- })
-
- node := tester.Nodes[0]
-
- stream := NewStream("foo", "", true)
- err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(
- p2ptest.Exchange{
- Label: "Subscribe message",
- Expects: []p2ptest.Expect{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- },
- // trigger OfferedHashesMsg to actually create the client
- p2ptest.Exchange{
- Label: "OfferedHashes message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: hashes,
- From: 5,
- To: 8,
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 2,
- Msg: &WantedHashesMsg{
- Stream: stream,
- Want: []byte{5},
- From: 9,
- To: 0,
- },
- Peer: node.ID(),
- },
- },
- },
- )
- if err != nil {
- t.Fatal(err)
- }
-
- err = streamer.Unsubscribe(node.ID(), stream)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Unsubscribe message",
- Expects: []p2ptest.Expect{
- {
- Code: 0,
- Msg: &UnsubscribeMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", false)
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 10), nil
- })
-
- node := tester.Nodes[0]
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: stream,
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: make([]byte, HashSize),
- From: 6,
- To: 9,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "unsubscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 0,
- Msg: &UnsubscribeMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", true)
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 0), nil
- })
-
- node := tester.Nodes[0]
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: stream,
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: make([]byte, HashSize),
- From: 1,
- To: 0,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "unsubscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 0,
- Msg: &UnsubscribeMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 0), nil
- })
-
- stream := NewStream("bar", "", true)
-
- node := tester.Nodes[0]
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 7,
- Msg: &SubscribeErrorMsg{
- Error: "stream bar not registered",
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", true)
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 10), nil
- })
-
- node := tester.Nodes[0]
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: NewStream("foo", "", false),
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: make([]byte, HashSize),
- From: 6,
- To: 9,
- },
- Peer: node.ID(),
- },
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: stream,
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- From: 11,
- To: 0,
- Hashes: make([]byte, HashSize),
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", true)
-
- var tc *testClient
-
- streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
- tc = newTestClient(t)
- return tc, nil
- })
-
- node := tester.Nodes[0]
-
- err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Expects: []p2ptest.Expect{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- },
- p2ptest.Exchange{
- Label: "Corrupt offered hash message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: corruptHashes,
- From: 5,
- To: 8,
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
- if err != nil {
- t.Fatal(err)
- }
-
- expectedError := errors.New("subprotocol error")
- if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
- t.Fatal(err)
- }
-}
-
-func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- stream := NewStream("foo", "", true)
-
- var tc *testClient
-
- streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
- tc = newTestClient(t)
- return tc, nil
- })
-
- node := tester.Nodes[0]
-
- err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Expects: []p2ptest.Expect{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- },
- p2ptest.Exchange{
- Label: "WantedHashes message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: hashes,
- From: 5,
- To: 8,
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 2,
- Msg: &WantedHashesMsg{
- Stream: stream,
- Want: []byte{5},
- From: 9,
- To: 0,
- },
- Peer: node.ID(),
- },
- },
- })
- if err != nil {
- t.Fatal(err)
- }
-
- if len(tc.receivedHashes) != 3 {
- t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes))
- }
-
- close(tc.wait0)
-
- timeout := time.NewTimer(100 * time.Millisecond)
- defer timeout.Stop()
-
- select {
- case <-tc.batchDone:
- t.Fatal("batch done early")
- case <-timeout.C:
- }
-
- close(tc.wait2)
-
- timeout2 := time.NewTimer(10000 * time.Millisecond)
- defer timeout2.Stop()
-
- select {
- case <-tc.batchDone:
- case <-timeout2.C:
- t.Fatal("timeout waiting batchdone call")
- }
-
-}
-
-func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(nil)
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 10), nil
- })
-
- node := tester.Nodes[0]
-
- stream := NewStream("foo", "", true)
- err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(
- p2ptest.Exchange{
- Label: "RequestSubscription message",
- Expects: []p2ptest.Expect{
- {
- Code: 8,
- Msg: &RequestSubscriptionMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- },
- p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- History: NewRange(5, 8),
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: NewStream("foo", "", false),
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: make([]byte, HashSize),
- From: 6,
- To: 9,
- },
- Peer: node.ID(),
- },
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: stream,
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- From: 11,
- To: 0,
- Hashes: make([]byte, HashSize),
- },
- Peer: node.ID(),
- },
- },
- },
- )
- if err != nil {
- t.Fatal(err)
- }
-
- err = streamer.Quit(node.ID(), stream)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Quit message",
- Expects: []p2ptest.Expect{
- {
- Code: 9,
- Msg: &QuitMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-
- historyStream := getHistoryStream(stream)
-
- err = streamer.Quit(node.ID(), historyStream)
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Quit message",
- Expects: []p2ptest.Expect{
- {
- Code: 9,
- Msg: &QuitMsg{
- Stream: historyStream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-// TestMaxPeerServersWithUnsubscribe creates a registry with a limited
-// number of stream servers, and performs a test with subscriptions and
-// unsubscriptions, checking if unsubscriptions will remove streams,
-// leaving place for new streams.
-func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
- var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
- Syncing: SyncingDisabled,
- MaxPeerServers: maxPeerServers,
- })
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 0), nil
- })
-
- node := tester.Nodes[0]
-
- for i := 0; i < maxPeerServers+10; i++ {
- stream := NewStream("foo", strconv.Itoa(i), true)
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: stream,
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: make([]byte, HashSize),
- From: 1,
- To: 0,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "unsubscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 0,
- Msg: &UnsubscribeMsg{
- Stream: stream,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
- }
-}
-
-// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
-// number of stream servers, and performs subscriptions to detect subscriptions
-// error message exchange.
-func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
- var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
- MaxPeerServers: maxPeerServers,
- })
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t, 0), nil
- })
-
- node := tester.Nodes[0]
-
- for i := 0; i < maxPeerServers+10; i++ {
- stream := NewStream("foo", strconv.Itoa(i), true)
-
- if i >= maxPeerServers {
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 7,
- Msg: &SubscribeErrorMsg{
- Error: ErrMaxPeerServers.Error(),
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
- continue
- }
-
- err = tester.TestExchanges(p2ptest.Exchange{
- Label: "Subscribe message",
- Triggers: []p2ptest.Trigger{
- {
- Code: 4,
- Msg: &SubscribeMsg{
- Stream: stream,
- Priority: Top,
- },
- Peer: node.ID(),
- },
- },
- Expects: []p2ptest.Expect{
- {
- Code: 1,
- Msg: &OfferedHashesMsg{
- Stream: stream,
- HandoverProof: &HandoverProof{
- Handover: &Handover{},
- },
- Hashes: make([]byte, HashSize),
- From: 1,
- To: 0,
- },
- Peer: node.ID(),
- },
- },
- })
-
- if err != nil {
- t.Fatal(err)
- }
- }
-}
-
-//TestHasPriceImplementation is to check that the Registry has a
-//`Price` interface implementation
-func TestHasPriceImplementation(t *testing.T) {
- _, r, _, teardown, err := newStreamerTester(&RegistryOptions{
- Syncing: SyncingDisabled,
- })
- if err != nil {
- t.Fatal(err)
- }
- defer teardown()
-
- if r.prices == nil {
- t.Fatal("No prices implementation available for the stream protocol")
- }
-
- pricesInstance, ok := r.prices.(*StreamerPrices)
- if !ok {
- t.Fatal("`Registry` does not have the expected Prices instance")
- }
- price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{})
- if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() {
- t.Fatal("No prices set for chunk delivery msg")
- }
-
- price = pricesInstance.Price(&RetrieveRequestMsg{})
- if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() {
- t.Fatal("No prices set for chunk delivery msg")
- }
-}
-
-// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function
-func TestGetServerSubscriptions(t *testing.T) {
- // create an amount of dummy peers
- testPeerCount := 8
- // every peer will have this amount of dummy servers
- testServerCount := 4
- // the peerMap which will store this data for the registry
- peerMap := make(map[enode.ID]*Peer)
- // create the registry
- r := &Registry{}
- api := NewAPI(r)
- // call once, at this point should be empty
- regs := api.GetPeerServerSubscriptions()
- if len(regs) != 0 {
- t.Fatal("Expected subscription count to be 0, but it is not")
- }
-
- // now create a number of dummy servers for each node
- for i := 0; i < testPeerCount; i++ {
- addr := network.RandomAddr()
- id := addr.ID()
- p := &Peer{}
- p.servers = make(map[Stream]*server)
- for k := 0; k < testServerCount; k++ {
- s := Stream{
- Name: strconv.Itoa(k),
- Key: "",
- Live: false,
- }
- p.servers[s] = &server{}
- }
- peerMap[id] = p
- }
- r.peers = peerMap
-
- // call the subscriptions again
- regs = api.GetPeerServerSubscriptions()
- // count how many (fake) subscriptions there are
- cnt := 0
- for _, reg := range regs {
- for range reg {
- cnt++
- }
- }
- // check expected value
- expectedCount := testPeerCount * testServerCount
- if cnt != expectedCount {
- t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt)
- }
-}
-
-/*
-TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
-starts the simulation, waits for SyncUpdateDelay in order to kick off
-stream registration, then tests that there are subscriptions.
-*/
-func TestGetServerSubscriptionsRPC(t *testing.T) {
-
- if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" {
- t.Skip("flaky with -race on Travis")
- // Note: related ticket https://github.com/ethersphere/go-ethereum/issues/1234
- }
-
- // arbitrarily set to 4
- nodeCount := 4
- // set the syncUpdateDelay for sync registrations to start
- syncUpdateDelay := 200 * time.Millisecond
- // run with more nodes if `longrunning` flag is set
- if *longrunning {
- nodeCount = 64
- syncUpdateDelay = 10 * time.Second
- }
- // holds the msg code for SubscribeMsg
- var subscribeMsgCode uint64
- var ok bool
- var expectedMsgCount counter
-
- // this channel signalizes that the expected amount of subscriptiosn is done
- allSubscriptionsDone := make(chan struct{})
- // after the test, we need to reset the subscriptionFunc to the default
- defer func() { subscriptionFunc = doRequestSubscription }()
-
- // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
- subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error {
- // syncing starts after syncUpdateDelay and loops after that Duration; we only want to count at the first iteration
- // in the first iteration, subs will be empty (no existing subscriptions), thus we can use this check
- // this avoids flakyness
- expectedMsgCount.inc()
- doRequestSubscription(r, id, bin)
- return nil
- }
- // create a standard sim
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
- if err != nil {
- return nil, nil, err
- }
-
- // configure so that sync registrations actually happen
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Syncing: SyncingAutoSubscribe, //enable sync registrations
- SyncUpdateDelay: syncUpdateDelay,
- }, nil)
-
- // get the SubscribeMsg code
- subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
- if !ok {
- t.Fatal("Message code for SubscribeMsg not found")
- }
-
- cleanup = func() {
- r.Close()
- clean()
- }
-
- return r, cleanup, nil
- },
- })
- defer sim.Close()
-
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
- defer cancelSimRun()
-
- // setup the filter for SubscribeMsg
- msgs := sim.PeerEvents(
- context.Background(),
- sim.UpNodeIDs(),
- simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
- )
-
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
- defer cancel()
- filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
- if err := sim.UploadSnapshot(ctx, filename); err != nil {
- t.Fatal(err)
- }
-
- // strategy: listen to all SubscribeMsg events; after every event we wait
- // if after `waitDuration` no more messages are being received, we assume the
- // subscription phase has terminated!
-
- // the loop in this go routine will either wait for new message events
- // or times out after 1 second, which signals that we are not receiving
- // any new subscriptions any more
- go func() {
- //for long running sims, waiting 1 sec will not be enough
- waitDuration := 1 * time.Second
- if *longrunning {
- waitDuration = 3 * time.Second
- }
- for {
- select {
- case <-ctx.Done():
- return
- case m := <-msgs: // just reset the loop
- if m.Error != nil {
- log.Error("stream message", "err", m.Error)
- continue
- }
- log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
- case <-time.After(waitDuration):
- // one second passed, don't assume more subscriptions
- allSubscriptionsDone <- struct{}{}
- log.Info("All subscriptions received")
- return
-
- }
- }
- }()
-
- //run the simulation
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
- log.Info("Simulation running")
- nodes := sim.Net.Nodes
-
- //wait until all subscriptions are done
- select {
- case <-allSubscriptionsDone:
- case <-ctx.Done():
- return errors.New("Context timed out")
- }
-
- log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
- //now iterate again, this time we call each node via RPC to get its subscriptions
- realCount := 0
- for _, node := range nodes {
- //create rpc client
- client, err := node.Client()
- if err != nil {
- return fmt.Errorf("create node 1 rpc client fail: %v", err)
- }
-
- //ask it for subscriptions
- pstreams := make(map[string][]string)
- err = client.Call(&pstreams, "stream_getPeerServerSubscriptions")
- if err != nil {
- return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
- }
- //length of the subscriptions can not be smaller than number of peers
- log.Debug("node subscriptions", "node", node.String())
- for p, ps := range pstreams {
- log.Debug("... with", "peer", p)
- for _, s := range ps {
- log.Debug(".......", "stream", s)
- // each node also has subscriptions to RETRIEVE_REQUEST streams,
- // we need to ignore those, we are only counting SYNC streams
- if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
- realCount++
- }
- }
- }
- log.Debug("All node streams counted", "realCount", realCount)
- }
- emc := expectedMsgCount.count()
- // after a subscription request, internally a live AND a history stream will be subscribed,
- // thus the real count should be half of the actual request subscriptions sent
- if realCount/2 != emc {
- return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
- }
- return nil
- })
- if result.Error != nil {
- t.Fatal(result.Error)
- }
-}
-
-// counter is used to concurrently increment
-// and read an integer value.
-type counter struct {
- v int
- mu sync.RWMutex
-}
-
-// Increment the counter.
-func (c *counter) inc() {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- c.v++
-}
-
-// Read the counter value.
-func (c *counter) count() int {
- c.mu.RLock()
- defer c.mu.RUnlock()
-
- return c.v
-}