aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go93
1 files changed, 71 insertions, 22 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 2e2c3c418..fb571c856 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
- "github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@@ -73,6 +72,11 @@ const (
RetrievalEnabled
)
+// subscriptionFunc is used to determine what to do in order to perform subscriptions
+// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
+// (see TestRequestPeerSubscriptions in streamer_test.go)
+var subscriptionFunc func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool = doRequestSubscription
+
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
addr enode.ID
@@ -88,9 +92,9 @@ type Registry struct {
intervalsStore state.Store
autoRetrieval bool // automatically subscribe to retrieve request stream
maxPeerServers int
- balance protocols.Balance // implements protocols.Balance, for accounting
- prices protocols.Prices // implements protocols.Prices, provides prices to accounting
- spec *protocols.Spec // this protocol's spec
+ spec *protocols.Spec //this protocol's spec
+ balance protocols.Balance //implements protocols.Balance, for accounting
+ prices protocols.Prices //implements protocols.Prices, provides prices to accounting
}
// RegistryOptions holds optional values for NewRegistry constructor.
@@ -125,6 +129,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
maxPeerServers: options.MaxPeerServers,
balance: balance,
}
+
streamer.setupSpec()
streamer.api = NewAPI(streamer)
@@ -467,24 +472,8 @@ func (r *Registry) updateSyncing() {
}
r.peersMu.RUnlock()
- // request subscriptions for all nodes and bins
- kad.EachBin(r.addr[:], pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
- log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
-
- // bin is always less then 256 and it is safe to convert it to type uint8
- stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
- if streams, ok := subs[p.ID()]; ok {
- // delete live and history streams from the map, so that it won't be removed with a Quit request
- delete(streams, stream)
- delete(streams, getHistoryStream(stream))
- }
- err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
- if err != nil {
- log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
- return false
- }
- return true
- })
+ // start requesting subscriptions from peers
+ r.requestPeerSubscriptions(kad, subs)
// remove SYNC servers that do not need to be subscribed
for id, streams := range subs {
@@ -505,6 +494,66 @@ func (r *Registry) updateSyncing() {
}
}
+// requestPeerSubscriptions calls on each live peer in the kademlia table
+// and sends a `RequestSubscription` to peers according to their bin
+// and their relationship with kademlia's depth.
+// Also check `TestRequestPeerSubscriptions` in order to understand the
+// expected behavior.
+// The function expects:
+// * the kademlia
+// * a map of subscriptions
+// * the actual function to subscribe
+// (in case of the test, it doesn't do real subscriptions)
+func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) {
+
+ var startPo int
+ var endPo int
+ var ok bool
+
+ // kademlia's depth
+ kadDepth := kad.NeighbourhoodDepth()
+ // request subscriptions for all nodes and bins
+ // nil as base takes the node's base; we need to pass 255 as `EachConn` runs
+ // from deepest bins backwards
+ kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
+ //if the peer's bin is shallower than the kademlia depth,
+ //only the peer's bin should be subscribed
+ if po < kadDepth {
+ startPo = po
+ endPo = po
+ } else {
+ //if the peer's bin is equal or deeper than the kademlia depth,
+ //each bin from the depth up to k.MaxProxDisplay should be subscribed
+ startPo = kadDepth
+ endPo = kad.MaxProxDisplay
+ }
+
+ for bin := startPo; bin <= endPo; bin++ {
+ //do the actual subscription
+ ok = subscriptionFunc(r, p, uint8(bin), subs)
+ }
+ return ok
+ })
+}
+
+// doRequestSubscription sends the actual RequestSubscription to the peer
+func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
+ log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin)
+ // bin is always less then 256 and it is safe to convert it to type uint8
+ stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
+ if streams, ok := subs[p.ID()]; ok {
+ // delete live and history streams from the map, so that it won't be removed with a Quit request
+ delete(streams, stream)
+ delete(streams, getHistoryStream(stream))
+ }
+ err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
+ if err != nil {
+ log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
+ return false
+ }
+ return true
+}
+
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, r.spec)
bp := network.NewBzzPeer(peer)