aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorViktor TrĂ³n <viktor.tron@gmail.com>2018-09-28 17:04:07 +0800
committerGitHub <noreply@github.com>2018-09-28 17:04:07 +0800
commit0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999 (patch)
treea8ff65462707ec59c340a93b0a456657b2ab46d6
parentd8d86692716158f9d514dcaa06b69f129d43c3d8 (diff)
parent86f68cf04f5166848fd44150363866d7faa72b60 (diff)
downloadgo-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.tar
go-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.tar.gz
go-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.tar.bz2
go-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.tar.lz
go-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.tar.xz
go-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.tar.zst
go-tangerine-0da3b17a112a75b54c8b3e5a2bf65a27a1c8c999.zip
Merge pull request #17747 from ethersphere/max-stream-peer-servers
Add stream peer servers limit
-rw-r--r--cmd/swarm/config.go54
-rw-r--r--cmd/swarm/main.go7
-rw-r--r--swarm/api/config.go70
-rw-r--r--swarm/network/stream/common_test.go4
-rw-r--r--swarm/network/stream/delivery_test.go8
-rw-r--r--swarm/network/stream/messages.go10
-rw-r--r--swarm/network/stream/peer.go11
-rw-r--r--swarm/network/stream/stream.go3
-rw-r--r--swarm/network/stream/streamer_test.go183
-rw-r--r--swarm/swarm.go1
10 files changed, 276 insertions, 75 deletions
diff --git a/cmd/swarm/config.go b/cmd/swarm/config.go
index c2f885d25..9d6fe41a7 100644
--- a/cmd/swarm/config.go
+++ b/cmd/swarm/config.go
@@ -59,27 +59,28 @@ var (
//constants for environment variables
const (
- SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR"
- SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT"
- SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR"
- SWARM_ENV_PORT = "SWARM_PORT"
- SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID"
- SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE"
- SWARM_ENV_SWAP_API = "SWARM_SWAP_API"
- SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE"
- SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY"
- SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE"
- SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK"
- SWARM_ENV_ENS_API = "SWARM_ENS_API"
- SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR"
- SWARM_ENV_CORS = "SWARM_CORS"
- SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES"
- SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE"
- SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH"
- SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY"
- SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY"
- SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD"
- GETH_ENV_DATADIR = "GETH_DATADIR"
+ SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR"
+ SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT"
+ SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR"
+ SWARM_ENV_PORT = "SWARM_PORT"
+ SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID"
+ SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE"
+ SWARM_ENV_SWAP_API = "SWARM_SWAP_API"
+ SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE"
+ SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY"
+ SWARM_ENV_MAX_STREAM_PEER_SERVERS = "SWARM_ENV_MAX_STREAM_PEER_SERVERS"
+ SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE"
+ SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK"
+ SWARM_ENV_ENS_API = "SWARM_ENS_API"
+ SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR"
+ SWARM_ENV_CORS = "SWARM_CORS"
+ SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES"
+ SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE"
+ SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH"
+ SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY"
+ SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY"
+ SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD"
+ GETH_ENV_DATADIR = "GETH_DATADIR"
)
// These settings ensure that TOML keys use the same names as Go struct fields.
@@ -207,6 +208,9 @@ func cmdLineOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Con
currentConfig.SyncUpdateDelay = d
}
+ // any value including 0 is acceptable
+ currentConfig.MaxStreamPeerServers = ctx.GlobalInt(SwarmMaxStreamPeerServersFlag.Name)
+
if ctx.GlobalIsSet(SwarmLightNodeEnabled.Name) {
currentConfig.LightNodeEnabled = true
}
@@ -308,6 +312,14 @@ func envVarsOverride(currentConfig *bzzapi.Config) (config *bzzapi.Config) {
}
}
+ if max := os.Getenv(SWARM_ENV_MAX_STREAM_PEER_SERVERS); max != "" {
+ m, err := strconv.Atoi(max)
+ if err != nil {
+ utils.Fatalf("invalid environment variable %s: %v", SWARM_ENV_MAX_STREAM_PEER_SERVERS, err)
+ }
+ currentConfig.MaxStreamPeerServers = m
+ }
+
if lne := os.Getenv(SWARM_ENV_LIGHT_NODE_ENABLE); lne != "" {
if lightnode, err := strconv.ParseBool(lne); err != nil {
currentConfig.LightNodeEnabled = lightnode
diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go
index 4c9ce931e..71d707c2b 100644
--- a/cmd/swarm/main.go
+++ b/cmd/swarm/main.go
@@ -116,6 +116,12 @@ var (
Usage: "Duration for sync subscriptions update after no new peers are added (default 15s)",
EnvVar: SWARM_ENV_SYNC_UPDATE_DELAY,
}
+ SwarmMaxStreamPeerServersFlag = cli.IntFlag{
+ Name: "max-stream-peer-servers",
+ Usage: "Limit of Stream peer servers, 0 denotes unlimited",
+ EnvVar: SWARM_ENV_MAX_STREAM_PEER_SERVERS,
+ Value: 10000, // A very large default value is possible as stream servers have very small memory footprint
+ }
SwarmLightNodeEnabled = cli.BoolFlag{
Name: "lightnode",
Usage: "Enable Swarm LightNode (default false)",
@@ -542,6 +548,7 @@ pv(1) tool to get a progress bar:
SwarmSwapAPIFlag,
SwarmSyncDisabledFlag,
SwarmSyncUpdateDelay,
+ SwarmMaxStreamPeerServersFlag,
SwarmLightNodeEnabled,
SwarmDeliverySkipCheckFlag,
SwarmListenAddrFlag,
diff --git a/swarm/api/config.go b/swarm/api/config.go
index e753890e4..be7385408 100644
--- a/swarm/api/config.go
+++ b/swarm/api/config.go
@@ -50,26 +50,27 @@ type Config struct {
Swap *swap.LocalProfile
Pss *pss.PssParams
//*network.SyncParams
- Contract common.Address
- EnsRoot common.Address
- EnsAPIs []string
- Path string
- ListenAddr string
- Port string
- PublicKey string
- BzzKey string
- NodeID string
- NetworkID uint64
- SwapEnabled bool
- SyncEnabled bool
- SyncingSkipCheck bool
- DeliverySkipCheck bool
- LightNodeEnabled bool
- SyncUpdateDelay time.Duration
- SwapAPI string
- Cors string
- BzzAccount string
- privateKey *ecdsa.PrivateKey
+ Contract common.Address
+ EnsRoot common.Address
+ EnsAPIs []string
+ Path string
+ ListenAddr string
+ Port string
+ PublicKey string
+ BzzKey string
+ NodeID string
+ NetworkID uint64
+ SwapEnabled bool
+ SyncEnabled bool
+ SyncingSkipCheck bool
+ DeliverySkipCheck bool
+ MaxStreamPeerServers int
+ LightNodeEnabled bool
+ SyncUpdateDelay time.Duration
+ SwapAPI string
+ Cors string
+ BzzAccount string
+ privateKey *ecdsa.PrivateKey
}
//create a default config with all parameters to set to defaults
@@ -80,20 +81,21 @@ func NewConfig() (c *Config) {
FileStoreParams: storage.NewFileStoreParams(),
HiveParams: network.NewHiveParams(),
//SyncParams: network.NewDefaultSyncParams(),
- Swap: swap.NewDefaultSwapParams(),
- Pss: pss.NewPssParams(),
- ListenAddr: DefaultHTTPListenAddr,
- Port: DefaultHTTPPort,
- Path: node.DefaultDataDir(),
- EnsAPIs: nil,
- EnsRoot: ens.TestNetAddress,
- NetworkID: network.DefaultNetworkID,
- SwapEnabled: false,
- SyncEnabled: true,
- SyncingSkipCheck: false,
- DeliverySkipCheck: true,
- SyncUpdateDelay: 15 * time.Second,
- SwapAPI: "",
+ Swap: swap.NewDefaultSwapParams(),
+ Pss: pss.NewPssParams(),
+ ListenAddr: DefaultHTTPListenAddr,
+ Port: DefaultHTTPPort,
+ Path: node.DefaultDataDir(),
+ EnsAPIs: nil,
+ EnsRoot: ens.TestNetAddress,
+ NetworkID: network.DefaultNetworkID,
+ SwapEnabled: false,
+ SyncEnabled: true,
+ SyncingSkipCheck: false,
+ MaxStreamPeerServers: 10000,
+ DeliverySkipCheck: true,
+ SyncUpdateDelay: 15 * time.Second,
+ SwapAPI: "",
}
return
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index deb7e9815..72fdb2bd9 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -84,7 +84,7 @@ func createGlobalStore() (string, *mockdb.GlobalStore, error) {
return globalStoreDir, globalStore, nil
}
-func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
+func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
@@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), nil)
+ streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions)
teardown := func() {
streamer.Close()
removeDataDir()
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 9fb90eeba..b021b8771 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -39,7 +39,7 @@ import (
)
func TestStreamerRetrieveRequest(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -75,7 +75,7 @@ func TestStreamerRetrieveRequest(t *testing.T) {
}
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -127,7 +127,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// upstream request server receives a retrieve Request and responds with
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(t)
+ tester, streamer, localStore, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -221,7 +221,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(t)
+ tester, streamer, localStore, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 117f88044..74c785d58 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -84,11 +84,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
defer func() {
if err != nil {
- if e := p.Send(context.TODO(), SubscribeErrorMsg{
+ // The error will be sent as a subscribe error message
+ // and will not be returned as it will prevent any new message
+ // exchange between peers over p2p. Instead, error will be returned
+ // only if there is one from sending subscribe error message.
+ err = p.Send(context.TODO(), SubscribeErrorMsg{
Error: err.Error(),
- }); e != nil {
- log.Error("send stream subscribe error message", "err", err)
- }
+ })
}
}()
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 5fdaa7b87..ef6bbdf70 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -18,6 +18,7 @@ package stream
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
@@ -46,6 +47,10 @@ func (e *notFoundError) Error() string {
return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
}
+// ErrMaxPeerServers will be returned if peer server limit is reached.
+// It will be sent in the SubscribeErrorMsg.
+var ErrMaxPeerServers = errors.New("max peer servers")
+
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*protocols.Peer
@@ -204,6 +209,11 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
if p.servers[s] != nil {
return nil, fmt.Errorf("server %s already registered", s)
}
+
+ if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers {
+ return nil, ErrMaxPeerServers
+ }
+
os := &server{
Server: o,
stream: s,
@@ -346,6 +356,7 @@ func (p *Peer) removeClient(s Stream) error {
return newNotFoundError("client", s)
}
client.close()
+ delete(p.clients, s)
return nil
}
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 65b8dff5a..1eda06c6a 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -60,6 +60,7 @@ type Registry struct {
delivery *Delivery
intervalsStore state.Store
doRetrieve bool
+ maxPeerServers int
}
// RegistryOptions holds optional values for NewRegistry constructor.
@@ -68,6 +69,7 @@ type RegistryOptions struct {
DoSync bool
DoRetrieve bool
SyncUpdateDelay time.Duration
+ MaxPeerServers int // The limit of servers for each peer in registry
}
// NewRegistry is Streamer constructor
@@ -87,6 +89,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
delivery: delivery,
intervalsStore: intervalsStore,
doRetrieve: options.DoRetrieve,
+ maxPeerServers: options.MaxPeerServers,
}
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 0390a7b9b..0bdebefa7 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -20,6 +20,7 @@ import (
"bytes"
"context"
"errors"
+ "strconv"
"testing"
"time"
@@ -28,7 +29,7 @@ import (
)
func TestStreamerSubscribe(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -42,7 +43,7 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -127,7 +128,7 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -220,7 +221,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -287,7 +288,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -353,7 +354,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -397,7 +398,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -462,7 +463,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -527,7 +528,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -626,7 +627,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t)
+ tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -752,3 +753,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
t.Fatal(err)
}
}
+
+// TestMaxPeerServersWithUnsubscribe creates a registry with a limited
+// number of stream servers, and performs a test with subscriptions and
+// unsubscriptions, checking if unsubscriptions will remove streams,
+// leaving place for new streams.
+func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
+ var maxPeerServers = 6
+ tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ MaxPeerServers: maxPeerServers,
+ })
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
+ return newTestServer(t), nil
+ })
+
+ node := tester.Nodes[0]
+
+ for i := 0; i < maxPeerServers+10; i++ {
+ stream := NewStream("foo", strconv.Itoa(i), true)
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ Stream: stream,
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: make([]byte, HashSize),
+ From: 1,
+ To: 1,
+ },
+ Peer: node.ID(),
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "unsubscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 0,
+ Msg: &UnsubscribeMsg{
+ Stream: stream,
+ },
+ Peer: node.ID(),
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
+// number of stream servers, and performs subscriptions to detect subscriptions
+// error message exchange.
+func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
+ var maxPeerServers = 6
+ tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ MaxPeerServers: maxPeerServers,
+ })
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
+ return newTestServer(t), nil
+ })
+
+ node := tester.Nodes[0]
+
+ for i := 0; i < maxPeerServers+10; i++ {
+ stream := NewStream("foo", strconv.Itoa(i), true)
+
+ if i >= maxPeerServers {
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 7,
+ Msg: &SubscribeErrorMsg{
+ Error: ErrMaxPeerServers.Error(),
+ },
+ Peer: node.ID(),
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ continue
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ Expects: []p2ptest.Expect{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ Stream: stream,
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: make([]byte, HashSize),
+ From: 1,
+ To: 1,
+ },
+ Peer: node.ID(),
+ },
+ },
+ })
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
diff --git a/swarm/swarm.go b/swarm/swarm.go
index 8b2661529..0cd56d4eb 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -180,6 +180,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
DoSync: config.SyncEnabled,
DoRetrieve: true,
SyncUpdateDelay: config.SyncUpdateDelay,
+ MaxPeerServers: config.MaxStreamPeerServers,
})
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage