aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go811
1 files changed, 0 insertions, 811 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
deleted file mode 100644
index 9cdf5c04b..000000000
--- a/swarm/network/stream/stream.go
+++ /dev/null
@@ -1,811 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package stream
-
-import (
- "context"
- "fmt"
- "math"
- "reflect"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/protocols"
- "github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
- "github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
- "github.com/ethereum/go-ethereum/swarm/state"
- "github.com/ethereum/go-ethereum/swarm/storage"
-)
-
-const (
- Low uint8 = iota
- Mid
- High
- Top
- PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
- PriorityQueueCap = 4096 // queue capacity
- HashSize = 32
-)
-
-// Enumerate options for syncing and retrieval
-type SyncingOption int
-
-// Syncing options
-const (
- // Syncing disabled
- SyncingDisabled SyncingOption = iota
- // Register the client and the server but not subscribe
- SyncingRegisterOnly
- // Both client and server funcs are registered, subscribe sent automatically
- SyncingAutoSubscribe
-)
-
-// subscriptionFunc is used to determine what to do in order to perform subscriptions
-// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
-// (see TestRequestPeerSubscriptions in streamer_test.go)
-var subscriptionFunc = doRequestSubscription
-
-// Registry registry for outgoing and incoming streamer constructors
-type Registry struct {
- addr enode.ID
- api *API
- skipCheck bool
- clientMu sync.RWMutex
- serverMu sync.RWMutex
- peersMu sync.RWMutex
- serverFuncs map[string]func(*Peer, string, bool) (Server, error)
- clientFuncs map[string]func(*Peer, string, bool) (Client, error)
- peers map[enode.ID]*Peer
- delivery *Delivery
- intervalsStore state.Store
- maxPeerServers int
- spec *protocols.Spec //this protocol's spec
- balance protocols.Balance //implements protocols.Balance, for accounting
- prices protocols.Prices //implements protocols.Prices, provides prices to accounting
- quit chan struct{} // terminates registry goroutines
- syncMode SyncingOption
- syncUpdateDelay time.Duration
-}
-
-// RegistryOptions holds optional values for NewRegistry constructor.
-type RegistryOptions struct {
- SkipCheck bool
- Syncing SyncingOption // Defines syncing behavior
- SyncUpdateDelay time.Duration
- MaxPeerServers int // The limit of servers for each peer in registry
-}
-
-// NewRegistry is Streamer constructor
-func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
- if options == nil {
- options = &RegistryOptions{}
- }
- if options.SyncUpdateDelay <= 0 {
- options.SyncUpdateDelay = 15 * time.Second
- }
-
- quit := make(chan struct{})
-
- streamer := &Registry{
- addr: localID,
- skipCheck: options.SkipCheck,
- serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
- clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
- peers: make(map[enode.ID]*Peer),
- delivery: delivery,
- intervalsStore: intervalsStore,
- maxPeerServers: options.MaxPeerServers,
- balance: balance,
- quit: quit,
- syncUpdateDelay: options.SyncUpdateDelay,
- syncMode: options.Syncing,
- }
-
- streamer.setupSpec()
-
- streamer.api = NewAPI(streamer)
- delivery.getPeer = streamer.getPeer
-
- // If syncing is not disabled, the syncing functions are registered (both client and server)
- if options.Syncing != SyncingDisabled {
- RegisterSwarmSyncerServer(streamer, netStore)
- RegisterSwarmSyncerClient(streamer, netStore)
- }
-
- return streamer
-}
-
-// This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
-// For simulations to be able to run multiple nodes and not override the hook's balance,
-// we need to construct a spec instance per node instance
-func (r *Registry) setupSpec() {
- // first create the "bare" spec
- r.createSpec()
- // now create the pricing object
- r.createPriceOracle()
- // if balance is nil, this node has been started without swap support (swapEnabled flag is false)
- if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
- // swap is enabled, so setup the hook
- r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
- }
-}
-
-// RegisterClient registers an incoming streamer constructor
-func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
- r.clientMu.Lock()
- defer r.clientMu.Unlock()
-
- r.clientFuncs[stream] = f
-}
-
-// RegisterServer registers an outgoing streamer constructor
-func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error)) {
- r.serverMu.Lock()
- defer r.serverMu.Unlock()
-
- r.serverFuncs[stream] = f
-}
-
-// GetClient accessor for incoming streamer constructors
-func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error) {
- r.clientMu.RLock()
- defer r.clientMu.RUnlock()
-
- f := r.clientFuncs[stream]
- if f == nil {
- return nil, fmt.Errorf("stream %v not registered", stream)
- }
- return f, nil
-}
-
-// GetServer accessor for incoming streamer constructors
-func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error) {
- r.serverMu.RLock()
- defer r.serverMu.RUnlock()
-
- f := r.serverFuncs[stream]
- if f == nil {
- return nil, fmt.Errorf("stream %v not registered", stream)
- }
- return f, nil
-}
-
-func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error {
- // check if the stream is registered
- if _, err := r.GetServerFunc(s.Name); err != nil {
- return err
- }
-
- peer := r.getPeer(peerId)
- if peer == nil {
- return fmt.Errorf("peer not found %v", peerId)
- }
-
- if _, err := peer.getServer(s); err != nil {
- if e, ok := err.(*notFoundError); ok && e.t == "server" {
- // request subscription only if the server for this stream is not created
- log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h)
- return peer.Send(context.TODO(), &RequestSubscriptionMsg{
- Stream: s,
- History: h,
- Priority: prio,
- })
- }
- return err
- }
- log.Trace("RequestSubscription: already subscribed", "peer", peerId, "stream", s, "history", h)
- return nil
-}
-
-// Subscribe initiates the streamer
-func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error {
- // check if the stream is registered
- if _, err := r.GetClientFunc(s.Name); err != nil {
- return err
- }
-
- peer := r.getPeer(peerId)
- if peer == nil {
- return fmt.Errorf("peer not found %v", peerId)
- }
-
- var to uint64
- if !s.Live && h != nil {
- to = h.To
- }
-
- err := peer.setClientParams(s, newClientParams(priority, to))
- if err != nil {
- return err
- }
- if s.Live && h != nil {
- if err := peer.setClientParams(
- getHistoryStream(s),
- newClientParams(getHistoryPriority(priority), h.To),
- ); err != nil {
- return err
- }
- }
-
- msg := &SubscribeMsg{
- Stream: s,
- History: h,
- Priority: priority,
- }
- log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
-
- return peer.Send(context.TODO(), msg)
-}
-
-func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
- peer := r.getPeer(peerId)
- if peer == nil {
- return fmt.Errorf("peer not found %v", peerId)
- }
-
- msg := &UnsubscribeMsg{
- Stream: s,
- }
- log.Debug("Unsubscribe ", "peer", peerId, "stream", s)
-
- if err := peer.Send(context.TODO(), msg); err != nil {
- return err
- }
- return peer.removeClient(s)
-}
-
-// Quit sends the QuitMsg to the peer to remove the
-// stream peer client and terminate the streaming.
-func (r *Registry) Quit(peerId enode.ID, s Stream) error {
- peer := r.getPeer(peerId)
- if peer == nil {
- log.Debug("stream quit: peer not found", "peer", peerId, "stream", s)
- // if the peer is not found, abort the request
- return nil
- }
-
- msg := &QuitMsg{
- Stream: s,
- }
- log.Debug("Quit ", "peer", peerId, "stream", s)
-
- return peer.Send(context.TODO(), msg)
-}
-
-func (r *Registry) Close() error {
- // Stop sending neighborhood depth change and address count
- // change from Kademlia that were initiated in NewRegistry constructor.
- r.delivery.Close()
- close(r.quit)
- return r.intervalsStore.Close()
-}
-
-func (r *Registry) getPeer(peerId enode.ID) *Peer {
- r.peersMu.RLock()
- defer r.peersMu.RUnlock()
-
- return r.peers[peerId]
-}
-
-func (r *Registry) setPeer(peer *Peer) {
- r.peersMu.Lock()
- r.peers[peer.ID()] = peer
- metrics.GetOrRegisterCounter("registry.setpeer", nil).Inc(1)
- metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
- r.peersMu.Unlock()
-}
-
-func (r *Registry) deletePeer(peer *Peer) {
- r.peersMu.Lock()
- delete(r.peers, peer.ID())
- metrics.GetOrRegisterCounter("registry.deletepeer", nil).Inc(1)
- metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
- r.peersMu.Unlock()
-}
-
-func (r *Registry) peersCount() (c int) {
- r.peersMu.Lock()
- c = len(r.peers)
- r.peersMu.Unlock()
- return
-}
-
-// Run protocol run function
-func (r *Registry) Run(p *network.BzzPeer) error {
- sp := NewPeer(p, r)
- r.setPeer(sp)
-
- if r.syncMode == SyncingAutoSubscribe {
- go sp.runUpdateSyncing()
- }
-
- defer r.deletePeer(sp)
- defer close(sp.quit)
- defer sp.close()
-
- return sp.Run(sp.HandleMsg)
-}
-
-// doRequestSubscription sends the actual RequestSubscription to the peer
-func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error {
- log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin)
- // bin is always less then 256 and it is safe to convert it to type uint8
- stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
- err := r.RequestSubscription(id, stream, NewRange(0, 0), High)
- if err != nil {
- log.Debug("Request subscription", "err", err, "peer", id, "stream", stream)
- return err
- }
- return nil
-}
-
-func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- peer := protocols.NewPeer(p, rw, r.spec)
- bp := network.NewBzzPeer(peer)
- np := network.NewPeer(bp, r.delivery.kad)
- r.delivery.kad.On(np)
- defer r.delivery.kad.Off(np)
- return r.Run(bp)
-}
-
-// HandleMsg is the message handler that delegates incoming messages
-func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
- select {
- case <-p.streamer.quit:
- log.Trace("message received after the streamer is closed", "peer", p.ID())
- // return without an error since streamer is closed and
- // no messages should be handled as other subcomponents like
- // storage leveldb may be closed
- return nil
- default:
- }
-
- switch msg := msg.(type) {
-
- case *SubscribeMsg:
- return p.handleSubscribeMsg(ctx, msg)
-
- case *SubscribeErrorMsg:
- return p.handleSubscribeErrorMsg(msg)
-
- case *UnsubscribeMsg:
- return p.handleUnsubscribeMsg(msg)
-
- case *OfferedHashesMsg:
- go func() {
- err := p.handleOfferedHashesMsg(ctx, msg)
- if err != nil {
- log.Error(err.Error())
- p.Drop()
- }
- }()
- return nil
-
- case *TakeoverProofMsg:
- go func() {
- err := p.handleTakeoverProofMsg(ctx, msg)
- if err != nil {
- log.Error(err.Error())
- p.Drop()
- }
- }()
- return nil
-
- case *WantedHashesMsg:
- go func() {
- err := p.handleWantedHashesMsg(ctx, msg)
- if err != nil {
- log.Error(err.Error())
- p.Drop()
- }
- }()
- return nil
-
- case *ChunkDeliveryMsgRetrieval:
- // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
- go func() {
- err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
- if err != nil {
- log.Error(err.Error())
- p.Drop()
- }
- }()
- return nil
-
- case *ChunkDeliveryMsgSyncing:
- // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
- go func() {
- err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
- if err != nil {
- log.Error(err.Error())
- p.Drop()
- }
- }()
- return nil
-
- case *RetrieveRequestMsg:
- go func() {
- err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
- if err != nil {
- log.Error(err.Error())
- p.Drop()
- }
- }()
- return nil
-
- case *RequestSubscriptionMsg:
- return p.handleRequestSubscription(ctx, msg)
-
- case *QuitMsg:
- return p.handleQuitMsg(msg)
-
- default:
- return fmt.Errorf("unknown message type: %T", msg)
- }
-}
-
-type server struct {
- Server
- stream Stream
- priority uint8
- currentBatch []byte
- sessionIndex uint64
-}
-
-// setNextBatch adjusts passed interval based on session index and whether
-// stream is live or history. It calls Server SetNextBatch with adjusted
-// interval and returns batch hashes and their interval.
-func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- if s.stream.Live {
- if from == 0 {
- from = s.sessionIndex
- }
- if to <= from || from >= s.sessionIndex {
- to = math.MaxUint64
- }
- } else {
- if (to < from && to != 0) || from > s.sessionIndex {
- return nil, 0, 0, nil, nil
- }
- if to == 0 || to > s.sessionIndex {
- to = s.sessionIndex
- }
- }
- return s.SetNextBatch(from, to)
-}
-
-// Server interface for outgoing peer Streamer
-type Server interface {
- // SessionIndex is called when a server is initialized
- // to get the current cursor state of the stream data.
- // Based on this index, live and history stream intervals
- // will be adjusted before calling SetNextBatch.
- SessionIndex() (uint64, error)
- SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
- GetData(context.Context, []byte) ([]byte, error)
- Close()
-}
-
-type client struct {
- Client
- stream Stream
- priority uint8
- sessionAt uint64
- to uint64
- next chan error
- quit chan struct{}
-
- intervalsKey string
- intervalsStore state.Store
-}
-
-func peerStreamIntervalsKey(p *Peer, s Stream) string {
- return p.ID().String() + s.String()
-}
-
-func (c *client) AddInterval(start, end uint64) (err error) {
- i := &intervals.Intervals{}
- if err = c.intervalsStore.Get(c.intervalsKey, i); err != nil {
- return err
- }
- i.Add(start, end)
- return c.intervalsStore.Put(c.intervalsKey, i)
-}
-
-func (c *client) NextInterval() (start, end uint64, err error) {
- i := &intervals.Intervals{}
- err = c.intervalsStore.Get(c.intervalsKey, i)
- if err != nil {
- return 0, 0, err
- }
- start, end = i.Next()
- return start, end, nil
-}
-
-// Client interface for incoming peer Streamer
-type Client interface {
- NeedData(context.Context, []byte) func(context.Context) error
- BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
- Close()
-}
-
-func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
- if c.to > 0 && from >= c.to {
- return 0, 0
- }
- if c.stream.Live {
- return from, 0
- } else if from >= c.sessionAt {
- if c.to > 0 {
- return from, c.to
- }
- return from, math.MaxUint64
- }
- nextFrom, nextTo, err := c.NextInterval()
- if err != nil {
- log.Error("next intervals", "stream", c.stream)
- return
- }
- if nextTo > c.to {
- nextTo = c.to
- }
- if nextTo == 0 {
- nextTo = c.sessionAt
- }
- return
-}
-
-func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
- if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
- tp, err := tf()
- if err != nil {
- return err
- }
-
- if err := p.Send(context.TODO(), tp); err != nil {
- return err
- }
- if c.to > 0 && tp.Takeover.End >= c.to {
- return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
- }
- return nil
- }
- return c.AddInterval(req.From, req.To)
-}
-
-func (c *client) close() {
- select {
- case <-c.quit:
- default:
- close(c.quit)
- }
- c.Close()
-}
-
-// clientParams store parameters for the new client
-// between a subscription and initial offered hashes request handling.
-type clientParams struct {
- priority uint8
- to uint64
- // signal when the client is created
- clientCreatedC chan struct{}
-}
-
-func newClientParams(priority uint8, to uint64) *clientParams {
- return &clientParams{
- priority: priority,
- to: to,
- clientCreatedC: make(chan struct{}),
- }
-}
-
-func (c *clientParams) waitClient(ctx context.Context) error {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-c.clientCreatedC:
- return nil
- }
-}
-
-func (c *clientParams) clientCreated() {
- close(c.clientCreatedC)
-}
-
-// GetSpec returns the streamer spec to callers
-// This used to be a global variable but for simulations with
-// multiple nodes its fields (notably the Hook) would be overwritten
-func (r *Registry) GetSpec() *protocols.Spec {
- return r.spec
-}
-
-func (r *Registry) createSpec() {
- // Spec is the spec of the streamer protocol
- var spec = &protocols.Spec{
- Name: "stream",
- Version: 8,
- MaxMsgSize: 10 * 1024 * 1024,
- Messages: []interface{}{
- UnsubscribeMsg{},
- OfferedHashesMsg{},
- WantedHashesMsg{},
- TakeoverProofMsg{},
- SubscribeMsg{},
- RetrieveRequestMsg{},
- ChunkDeliveryMsgRetrieval{},
- SubscribeErrorMsg{},
- RequestSubscriptionMsg{},
- QuitMsg{},
- ChunkDeliveryMsgSyncing{},
- },
- }
- r.spec = spec
-}
-
-// An accountable message needs some meta information attached to it
-// in order to evaluate the correct price
-type StreamerPrices struct {
- priceMatrix map[reflect.Type]*protocols.Price
- registry *Registry
-}
-
-// Price implements the accounting interface and returns the price for a specific message
-func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
- t := reflect.TypeOf(msg).Elem()
- return sp.priceMatrix[t]
-}
-
-// Instead of hardcoding the price, get it
-// through a function - it could be quite complex in the future
-func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
- return uint64(1)
-}
-
-// Instead of hardcoding the price, get it
-// through a function - it could be quite complex in the future
-func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
- return uint64(1)
-}
-
-// createPriceOracle sets up a matrix which can be queried to get
-// the price for a message via the Price method
-func (r *Registry) createPriceOracle() {
- sp := &StreamerPrices{
- registry: r,
- }
- sp.priceMatrix = map[reflect.Type]*protocols.Price{
- reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
- Value: sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
- PerByte: true,
- Payer: protocols.Receiver,
- },
- reflect.TypeOf(RetrieveRequestMsg{}): {
- Value: sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
- PerByte: false,
- Payer: protocols.Sender,
- },
- }
- r.prices = sp
-}
-
-func (r *Registry) Protocols() []p2p.Protocol {
- return []p2p.Protocol{
- {
- Name: r.spec.Name,
- Version: r.spec.Version,
- Length: r.spec.Length(),
- Run: r.runProtocol,
- },
- }
-}
-
-func (r *Registry) APIs() []rpc.API {
- return []rpc.API{
- {
- Namespace: "stream",
- Version: "3.0",
- Service: r.api,
- Public: false,
- },
- }
-}
-
-func (r *Registry) Start(server *p2p.Server) error {
- log.Info("Streamer started")
- return nil
-}
-
-func (r *Registry) Stop() error {
- return nil
-}
-
-type Range struct {
- From, To uint64
-}
-
-func NewRange(from, to uint64) *Range {
- return &Range{
- From: from,
- To: to,
- }
-}
-
-func (r *Range) String() string {
- return fmt.Sprintf("%v-%v", r.From, r.To)
-}
-
-func getHistoryPriority(priority uint8) uint8 {
- if priority == 0 {
- return 0
- }
- return priority - 1
-}
-
-func getHistoryStream(s Stream) Stream {
- return NewStream(s.Name, s.Key, false)
-}
-
-type API struct {
- streamer *Registry
-}
-
-func NewAPI(r *Registry) *API {
- return &API{
- streamer: r,
- }
-}
-
-func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error {
- return api.streamer.Subscribe(peerId, s, history, priority)
-}
-
-func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
- return api.streamer.Unsubscribe(peerId, s)
-}
-
-/*
-GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
-It can be called via RPC.
-It returns a map of node IDs with an array of string representations of Stream objects.
-*/
-func (api *API) GetPeerServerSubscriptions() map[string][]string {
- pstreams := make(map[string][]string)
-
- api.streamer.peersMu.RLock()
- defer api.streamer.peersMu.RUnlock()
-
- for id, p := range api.streamer.peers {
- var streams []string
- //every peer has a map of stream servers
- //every stream server represents a subscription
- p.serverMu.RLock()
- for s := range p.servers {
- //append the string representation of the stream
- //to the list for this peer
- streams = append(streams, s.String())
- }
- p.serverMu.RUnlock()
- //set the array of stream servers to the map
- pstreams[id.String()] = streams
- }
- return pstreams
-}