diff options
-rw-r--r-- | cmd/swarm/config.go | 52 | ||||
-rw-r--r-- | cmd/swarm/main.go | 7 | ||||
-rw-r--r-- | swarm/api/config.go | 41 | ||||
-rw-r--r-- | swarm/network/stream/common_test.go | 4 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 10 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 11 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 3 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 181 | ||||
-rw-r--r-- | swarm/swarm.go | 1 |
10 files changed, 258 insertions, 60 deletions
diff --git a/cmd/swarm/config.go b/cmd/swarm/config.go index ae4b5816e..22b92d214 100644 --- a/cmd/swarm/config.go +++ b/cmd/swarm/config.go @@ -59,27 +59,28 @@ var ( //constants for environment variables const ( - SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR" - SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT" - SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR" - SWARM_ENV_PORT = "SWARM_PORT" - SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID" - SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE" - SWARM_ENV_SWAP_API = "SWARM_SWAP_API" - SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE" - SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY" - SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE" - SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK" - SWARM_ENV_ENS_API = "SWARM_ENS_API" - SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR" - SWARM_ENV_CORS = "SWARM_CORS" - SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES" - SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE" - SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH" - SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY" - SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY" - SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD" - GETH_ENV_DATADIR = "GETH_DATADIR" + SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR" + SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT" + SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR" + SWARM_ENV_PORT = "SWARM_PORT" + SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID" + SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE" + SWARM_ENV_SWAP_API = "SWARM_SWAP_API" + SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE" + SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY" + SWARM_ENV_MAX_STREAM_PEER_SERVERS = "SWARM_ENV_MAX_STREAM_PEER_SERVERS" + SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE" + SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK" + SWARM_ENV_ENS_API = "SWARM_ENS_API" + SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR" + SWARM_ENV_CORS = "SWARM_CORS" + SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES" + SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE" + SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH" + SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY" + SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY" + SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD" + GETH_ENV_DATADIR = "GETH_DATADIR" ) // These settings ensure that TOML keys use the same names as Go struct fields. @@ -207,6 +208,9 @@ func cmdLineOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Con currentConfig.SyncUpdateDelay = d } + // any value including 0 is acceptable + currentConfig.MaxStreamPeerServers = ctx.GlobalInt(SwarmMaxStreamPeerServersFlag.Name) + if ctx.GlobalIsSet(SwarmLightNodeEnabled.Name) { currentConfig.LightNodeEnabled = true } @@ -304,6 +308,12 @@ func envVarsOverride(currentConfig *bzzapi.Config) (config *bzzapi.Config) { } } + if max := os.Getenv(SWARM_ENV_MAX_STREAM_PEER_SERVERS); max != "" { + if m, err := strconv.Atoi(max); err == nil { + currentConfig.MaxStreamPeerServers = m + } + } + if lne := os.Getenv(SWARM_ENV_LIGHT_NODE_ENABLE); lne != "" { if lightnode, err := strconv.ParseBool(lne); err != nil { currentConfig.LightNodeEnabled = lightnode diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go index c93344c42..8967936b6 100644 --- a/cmd/swarm/main.go +++ b/cmd/swarm/main.go @@ -116,6 +116,12 @@ var ( Usage: "Duration for sync subscriptions update after no new peers are added (default 15s)", EnvVar: SWARM_ENV_SYNC_UPDATE_DELAY, } + SwarmMaxStreamPeerServersFlag = cli.IntFlag{ + Name: "max-stream-peer-servers", + Usage: "Limit of Stream peer servers, 0 denotes unlimited", + EnvVar: SWARM_ENV_MAX_STREAM_PEER_SERVERS, + Value: 10000, // A very large default value is possible as stream servers have very small memory footprint + } SwarmLightNodeEnabled = cli.BoolFlag{ Name: "lightnode", Usage: "Enable Swarm LightNode (default false)", @@ -542,6 +548,7 @@ pv(1) tool to get a progress bar: SwarmSwapAPIFlag, SwarmSyncDisabledFlag, SwarmSyncUpdateDelay, + SwarmMaxStreamPeerServersFlag, SwarmLightNodeEnabled, SwarmDeliverySkipCheckFlag, SwarmListenAddrFlag, diff --git a/swarm/api/config.go b/swarm/api/config.go index baa13105a..8b900249b 100644 --- a/swarm/api/config.go +++ b/swarm/api/config.go @@ -50,26 +50,27 @@ type Config struct { Swap *swap.LocalProfile Pss *pss.PssParams //*network.SyncParams - Contract common.Address - EnsRoot common.Address - EnsAPIs []string - Path string - ListenAddr string - Port string - PublicKey string - BzzKey string - NodeID string - NetworkID uint64 - SwapEnabled bool - SyncEnabled bool - SyncingSkipCheck bool - DeliverySkipCheck bool - LightNodeEnabled bool - SyncUpdateDelay time.Duration - SwapAPI string - Cors string - BzzAccount string - privateKey *ecdsa.PrivateKey + Contract common.Address + EnsRoot common.Address + EnsAPIs []string + Path string + ListenAddr string + Port string + PublicKey string + BzzKey string + NodeID string + NetworkID uint64 + SwapEnabled bool + SyncEnabled bool + SyncingSkipCheck bool + DeliverySkipCheck bool + MaxStreamPeerServers int + LightNodeEnabled bool + SyncUpdateDelay time.Duration + SwapAPI string + Cors string + BzzAccount string + privateKey *ecdsa.PrivateKey } //create a default config with all parameters to set to defaults diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index e0d776e34..24de51158 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -84,7 +84,7 @@ func createGlobalStore() (string, *mockdb.GlobalStore, error) { return globalStoreDir, globalStore, nil } -func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { +func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address to := network.NewKademlia(addr.OAddr, network.NewKadParams()) @@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora delivery := NewDelivery(to, netStore) netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) + streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), registryOptions) teardown := func() { streamer.Close() removeDataDir() diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index ece54d4ee..016a8d15e 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -39,7 +39,7 @@ import ( ) func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -75,7 +75,7 @@ func TestStreamerRetrieveRequest(t *testing.T) { } func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -127,7 +127,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t) + tester, streamer, localStore, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -221,7 +221,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t) + tester, streamer, localStore, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 2e1a81e82..482af25f5 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -84,11 +84,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e defer func() { if err != nil { - if e := p.Send(context.TODO(), SubscribeErrorMsg{ + // The error will be sent as a subscribe error message + // and will not be returned as it will prevent any new message + // exchange between peers over p2p. Instead, error will be returned + // only if there is one from sending subscribe error message. + err = p.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), - }); e != nil { - log.Error("send stream subscribe error message", "err", err) - } + }) } }() diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 1466a7a9c..aeaf7bbfa 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -18,6 +18,7 @@ package stream import ( "context" + "errors" "fmt" "sync" "time" @@ -46,6 +47,10 @@ func (e *notFoundError) Error() string { return fmt.Sprintf("%s not found for stream %q", e.t, e.s) } +// ErrMaxPeerServers will be returned if peer server limit is reached. +// It will be sent in the SubscribeErrorMsg. +var ErrMaxPeerServers = errors.New("max peer servers") + // Peer is the Peer extension for the streaming protocol type Peer struct { *protocols.Peer @@ -204,6 +209,11 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) { if p.servers[s] != nil { return nil, fmt.Errorf("server %s already registered", s) } + + if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers { + return nil, ErrMaxPeerServers + } + os := &server{ Server: o, stream: s, @@ -346,6 +356,7 @@ func (p *Peer) removeClient(s Stream) error { return newNotFoundError("client", s) } client.close() + delete(p.clients, s) return nil } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 319fc62c9..6dcf31165 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -60,6 +60,7 @@ type Registry struct { delivery *Delivery intervalsStore state.Store doRetrieve bool + maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. @@ -68,6 +69,7 @@ type RegistryOptions struct { DoSync bool DoRetrieve bool SyncUpdateDelay time.Duration + MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor @@ -87,6 +89,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore stora delivery: delivery, intervalsStore: intervalsStore, doRetrieve: options.DoRetrieve, + maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 06e96b9a9..4f480b0ac 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -19,6 +19,7 @@ package stream import ( "bytes" "context" + "strconv" "testing" "time" @@ -27,7 +28,7 @@ import ( ) func TestStreamerSubscribe(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -41,7 +42,7 @@ func TestStreamerSubscribe(t *testing.T) { } func TestStreamerRequestSubscription(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -125,7 +126,7 @@ func (self *testServer) Close() { } func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -218,7 +219,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -285,7 +286,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -351,7 +352,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { } func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -395,7 +396,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -460,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -559,7 +560,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -685,3 +686,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { t.Fatal(err) } } + +// TestMaxPeerServersWithUnsubscribe creates a registry with a limited +// number of stream servers, and performs a test with subscriptions and +// unsubscriptions, checking if unsubscriptions will remove streams, +// leaving place for new streams. +func TestMaxPeerServersWithUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + peerID := tester.IDs[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "unsubscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: &UnsubscribeMsg{ + Stream: stream, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} + +// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited +// number of stream servers, and performs subscriptions to detect sunscriptions +// error message exchange. +func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + peerID := tester.IDs[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + if i >= maxPeerServers { + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 7, + Msg: &SubscribeErrorMsg{ + Error: ErrMaxPeerServers.Error(), + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + continue + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} diff --git a/swarm/swarm.go b/swarm/swarm.go index 13aa1125d..5db17d25f 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -186,6 +186,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e DoSync: config.SyncEnabled, DoRetrieve: true, SyncUpdateDelay: config.SyncUpdateDelay, + MaxPeerServers: config.MaxStreamPeerServers, }) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage |