aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go40
1 files changed, 38 insertions, 2 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index fb571c856..cb5912185 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority)
+ return peer.SendPriority(context.TODO(), msg, priority, "")
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod
// nil as base takes the node's base; we need to pass 255 as `EachConn` runs
// from deepest bins backwards
kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
+ // nodes that do not provide stream protocol
+ // should not be subscribed, e.g. bootnodes
+ if !p.HasCap("stream") {
+ return true
+ }
//if the peer's bin is shallower than the kademlia depth,
//only the peer's bin should be subscribed
if po < kadDepth {
@@ -725,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
if err != nil {
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+
+ if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
@@ -929,3 +935,33 @@ func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, prior
func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
return api.streamer.Unsubscribe(peerId, s)
}
+
+/*
+GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
+It can be called via RPC.
+It returns a map of node IDs with an array of string representations of Stream objects.
+*/
+func (api *API) GetPeerSubscriptions() map[string][]string {
+ //create the empty map
+ pstreams := make(map[string][]string)
+
+ //iterate all streamer peers
+ api.streamer.peersMu.RLock()
+ defer api.streamer.peersMu.RUnlock()
+
+ for id, p := range api.streamer.peers {
+ var streams []string
+ //every peer has a map of stream servers
+ //every stream server represents a subscription
+ p.serverMu.RLock()
+ for s := range p.servers {
+ //append the string representation of the stream
+ //to the list for this peer
+ streams = append(streams, s.String())
+ }
+ p.serverMu.RUnlock()
+ //set the array of stream servers to the map
+ pstreams[id.String()] = streams
+ }
+ return pstreams
+}