diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 40 |
1 files changed, 38 insertions, 2 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index fb571c856..cb5912185 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.SendPriority(context.TODO(), msg, priority, "") } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod // nil as base takes the node's base; we need to pass 255 as `EachConn` runs // from deepest bins backwards kad.EachConn(nil, 255, func(p *network.Peer, po int) bool { + // nodes that do not provide stream protocol + // should not be subscribed, e.g. bootnodes + if !p.HasCap("stream") { + return true + } //if the peer's bin is shallower than the kademlia depth, //only the peer's bin should be subscribed if po < kadDepth { @@ -725,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error if err != nil { return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + + if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -929,3 +935,33 @@ func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, prior func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { return api.streamer.Unsubscribe(peerId, s) } + +/* +GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +It can be called via RPC. +It returns a map of node IDs with an array of string representations of Stream objects. +*/ +func (api *API) GetPeerSubscriptions() map[string][]string { + //create the empty map + pstreams := make(map[string][]string) + + //iterate all streamer peers + api.streamer.peersMu.RLock() + defer api.streamer.peersMu.RUnlock() + + for id, p := range api.streamer.peers { + var streams []string + //every peer has a map of stream servers + //every stream server represents a subscription + p.serverMu.RLock() + for s := range p.servers { + //append the string representation of the stream + //to the list for this peer + streams = append(streams, s.String()) + } + p.serverMu.RUnlock() + //set the array of stream servers to the map + pstreams[id.String()] = streams + } + return pstreams +} |