aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go390
1 files changed, 101 insertions, 289 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 1038e52d0..9cdf5c04b 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -18,7 +18,6 @@ package stream
import (
"context"
- "errors"
"fmt"
"math"
"reflect"
@@ -49,7 +48,6 @@ const (
// Enumerate options for syncing and retrieval
type SyncingOption int
-type RetrievalOption int
// Syncing options
const (
@@ -61,17 +59,6 @@ const (
SyncingAutoSubscribe
)
-const (
- // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
- RetrievalDisabled RetrievalOption = iota
- // Only the client side of the retrieve request is registered.
- // (light nodes do not serve retrieve requests)
- // once the client is registered, subscription to retrieve request stream is always sent
- RetrievalClientOnly
- // Both client and server funcs are registered, subscribe sent automatically
- RetrievalEnabled
-)
-
// subscriptionFunc is used to determine what to do in order to perform subscriptions
// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
// (see TestRequestPeerSubscriptions in streamer_test.go)
@@ -79,59 +66,58 @@ var subscriptionFunc = doRequestSubscription
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
- addr enode.ID
- api *API
- skipCheck bool
- clientMu sync.RWMutex
- serverMu sync.RWMutex
- peersMu sync.RWMutex
- serverFuncs map[string]func(*Peer, string, bool) (Server, error)
- clientFuncs map[string]func(*Peer, string, bool) (Client, error)
- peers map[enode.ID]*Peer
- delivery *Delivery
- intervalsStore state.Store
- autoRetrieval bool // automatically subscribe to retrieve request stream
- maxPeerServers int
- spec *protocols.Spec //this protocol's spec
- balance protocols.Balance //implements protocols.Balance, for accounting
- prices protocols.Prices //implements protocols.Prices, provides prices to accounting
- quit chan struct{} // terminates registry goroutines
+ addr enode.ID
+ api *API
+ skipCheck bool
+ clientMu sync.RWMutex
+ serverMu sync.RWMutex
+ peersMu sync.RWMutex
+ serverFuncs map[string]func(*Peer, string, bool) (Server, error)
+ clientFuncs map[string]func(*Peer, string, bool) (Client, error)
+ peers map[enode.ID]*Peer
+ delivery *Delivery
+ intervalsStore state.Store
+ maxPeerServers int
+ spec *protocols.Spec //this protocol's spec
+ balance protocols.Balance //implements protocols.Balance, for accounting
+ prices protocols.Prices //implements protocols.Prices, provides prices to accounting
+ quit chan struct{} // terminates registry goroutines
+ syncMode SyncingOption
+ syncUpdateDelay time.Duration
}
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- Syncing SyncingOption // Defines syncing behavior
- Retrieval RetrievalOption // Defines retrieval behavior
+ Syncing SyncingOption // Defines syncing behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
// NewRegistry is Streamer constructor
-func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
+func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
if options == nil {
options = &RegistryOptions{}
}
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
- // check if retrieval has been disabled
- retrieval := options.Retrieval != RetrievalDisabled
quit := make(chan struct{})
streamer := &Registry{
- addr: localID,
- skipCheck: options.SkipCheck,
- serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
- clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
- peers: make(map[enode.ID]*Peer),
- delivery: delivery,
- intervalsStore: intervalsStore,
- autoRetrieval: retrieval,
- maxPeerServers: options.MaxPeerServers,
- balance: balance,
- quit: quit,
+ addr: localID,
+ skipCheck: options.SkipCheck,
+ serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
+ clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
+ peers: make(map[enode.ID]*Peer),
+ delivery: delivery,
+ intervalsStore: intervalsStore,
+ maxPeerServers: options.MaxPeerServers,
+ balance: balance,
+ quit: quit,
+ syncUpdateDelay: options.SyncUpdateDelay,
+ syncMode: options.Syncing,
}
streamer.setupSpec()
@@ -139,124 +125,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
- if options.Retrieval == RetrievalEnabled {
- streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
- if !live {
- return nil, errors.New("only live retrieval requests supported")
- }
- return NewSwarmChunkServer(delivery.chunkStore), nil
- })
- }
-
- // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
- if options.Retrieval != RetrievalDisabled {
- streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
- })
- }
-
// If syncing is not disabled, the syncing functions are registered (both client and server)
if options.Syncing != SyncingDisabled {
- RegisterSwarmSyncerServer(streamer, syncChunkStore)
- RegisterSwarmSyncerClient(streamer, syncChunkStore)
- }
-
- // if syncing is set to automatically subscribe to the syncing stream, start the subscription process
- if options.Syncing == SyncingAutoSubscribe {
- // latestIntC function ensures that
- // - receiving from the in chan is not blocked by processing inside the for loop
- // - the latest int value is delivered to the loop after the processing is done
- // In context of NeighbourhoodDepthC:
- // after the syncing is done updating inside the loop, we do not need to update on the intermediate
- // depth changes, only to the latest one
- latestIntC := func(in <-chan int) <-chan int {
- out := make(chan int, 1)
-
- go func() {
- defer close(out)
-
- for {
- select {
- case i, ok := <-in:
- if !ok {
- return
- }
- select {
- case <-out:
- default:
- }
- out <- i
- case <-quit:
- return
- }
- }
- }()
-
- return out
- }
-
- kad := streamer.delivery.kad
- // get notification channels from Kademlia before returning
- // from this function to avoid race with Close method and
- // the goroutine created below
- depthC := latestIntC(kad.NeighbourhoodDepthC())
- addressBookSizeC := latestIntC(kad.AddrCountC())
-
- go func() {
- // wait for kademlia table to be healthy
- // but return if Registry is closed before
- select {
- case <-time.After(options.SyncUpdateDelay):
- case <-quit:
- return
- }
-
- // initial requests for syncing subscription to peers
- streamer.updateSyncing()
-
- for depth := range depthC {
- log.Debug("Kademlia neighbourhood depth change", "depth", depth)
-
- // Prevent too early sync subscriptions by waiting until there are no
- // new peers connecting. Sync streams updating will be done after no
- // peers are connected for at least SyncUpdateDelay period.
- timer := time.NewTimer(options.SyncUpdateDelay)
- // Hard limit to sync update delay, preventing long delays
- // on a very dynamic network
- maxTimer := time.NewTimer(3 * time.Minute)
- loop:
- for {
- select {
- case <-maxTimer.C:
- // force syncing update when a hard timeout is reached
- log.Trace("Sync subscriptions update on hard timeout")
- // request for syncing subscription to new peers
- streamer.updateSyncing()
- break loop
- case <-timer.C:
- // start syncing as no new peers has been added to kademlia
- // for some time
- log.Trace("Sync subscriptions update")
- // request for syncing subscription to new peers
- streamer.updateSyncing()
- break loop
- case size := <-addressBookSizeC:
- log.Trace("Kademlia address book size changed on depth change", "size", size)
- // new peers has been added to kademlia,
- // reset the timer to prevent early sync subscriptions
- if !timer.Stop() {
- <-timer.C
- }
- timer.Reset(options.SyncUpdateDelay)
- case <-quit:
- break loop
- }
- }
- timer.Stop()
- maxTimer.Stop()
- }
- }()
+ RegisterSwarmSyncerServer(streamer, netStore)
+ RegisterSwarmSyncerClient(streamer, netStore)
}
return streamer
@@ -381,7 +253,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority)
+ return peer.Send(context.TODO(), msg)
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -422,8 +294,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error {
func (r *Registry) Close() error {
// Stop sending neighborhood depth change and address count
// change from Kademlia that were initiated in NewRegistry constructor.
- r.delivery.kad.CloseNeighbourhoodDepthC()
- r.delivery.kad.CloseAddrCountC()
+ r.delivery.Close()
close(r.quit)
return r.intervalsStore.Close()
}
@@ -438,6 +309,7 @@ func (r *Registry) getPeer(peerId enode.ID) *Peer {
func (r *Registry) setPeer(peer *Peer) {
r.peersMu.Lock()
r.peers[peer.ID()] = peer
+ metrics.GetOrRegisterCounter("registry.setpeer", nil).Inc(1)
metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
r.peersMu.Unlock()
}
@@ -445,6 +317,7 @@ func (r *Registry) setPeer(peer *Peer) {
func (r *Registry) deletePeer(peer *Peer) {
r.peersMu.Lock()
delete(r.peers, peer.ID())
+ metrics.GetOrRegisterCounter("registry.deletepeer", nil).Inc(1)
metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
r.peersMu.Unlock()
}
@@ -458,132 +331,31 @@ func (r *Registry) peersCount() (c int) {
// Run protocol run function
func (r *Registry) Run(p *network.BzzPeer) error {
- sp := NewPeer(p.Peer, r)
+ sp := NewPeer(p, r)
r.setPeer(sp)
+
+ if r.syncMode == SyncingAutoSubscribe {
+ go sp.runUpdateSyncing()
+ }
+
defer r.deletePeer(sp)
defer close(sp.quit)
defer sp.close()
- if r.autoRetrieval && !p.LightNode {
- err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
- if err != nil {
- return err
- }
- }
-
return sp.Run(sp.HandleMsg)
}
-// updateSyncing subscribes to SYNC streams by iterating over the
-// kademlia connections and bins. If there are existing SYNC streams
-// and they are no longer required after iteration, request to Quit
-// them will be send to appropriate peers.
-func (r *Registry) updateSyncing() {
- kad := r.delivery.kad
- // map of all SYNC streams for all peers
- // used at the and of the function to remove servers
- // that are not needed anymore
- subs := make(map[enode.ID]map[Stream]struct{})
- r.peersMu.RLock()
- for id, peer := range r.peers {
- peer.serverMu.RLock()
- for stream := range peer.servers {
- if stream.Name == "SYNC" {
- if _, ok := subs[id]; !ok {
- subs[id] = make(map[Stream]struct{})
- }
- subs[id][stream] = struct{}{}
- }
- }
- peer.serverMu.RUnlock()
- }
- r.peersMu.RUnlock()
-
- // start requesting subscriptions from peers
- r.requestPeerSubscriptions(kad, subs)
-
- // remove SYNC servers that do not need to be subscribed
- for id, streams := range subs {
- if len(streams) == 0 {
- continue
- }
- peer := r.getPeer(id)
- if peer == nil {
- continue
- }
- for stream := range streams {
- log.Debug("Remove sync server", "peer", id, "stream", stream)
- err := r.Quit(peer.ID(), stream)
- if err != nil && err != p2p.ErrShuttingDown {
- log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream)
- }
- }
- }
-}
-
-// requestPeerSubscriptions calls on each live peer in the kademlia table
-// and sends a `RequestSubscription` to peers according to their bin
-// and their relationship with kademlia's depth.
-// Also check `TestRequestPeerSubscriptions` in order to understand the
-// expected behavior.
-// The function expects:
-// * the kademlia
-// * a map of subscriptions
-// * the actual function to subscribe
-// (in case of the test, it doesn't do real subscriptions)
-func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) {
-
- var startPo int
- var endPo int
- var ok bool
-
- // kademlia's depth
- kadDepth := kad.NeighbourhoodDepth()
- // request subscriptions for all nodes and bins
- // nil as base takes the node's base; we need to pass 255 as `EachConn` runs
- // from deepest bins backwards
- kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
- // nodes that do not provide stream protocol
- // should not be subscribed, e.g. bootnodes
- if !p.HasCap("stream") {
- return true
- }
- //if the peer's bin is shallower than the kademlia depth,
- //only the peer's bin should be subscribed
- if po < kadDepth {
- startPo = po
- endPo = po
- } else {
- //if the peer's bin is equal or deeper than the kademlia depth,
- //each bin from the depth up to k.MaxProxDisplay should be subscribed
- startPo = kadDepth
- endPo = kad.MaxProxDisplay
- }
-
- for bin := startPo; bin <= endPo; bin++ {
- //do the actual subscription
- ok = subscriptionFunc(r, p, uint8(bin), subs)
- }
- return ok
- })
-}
-
// doRequestSubscription sends the actual RequestSubscription to the peer
-func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
- log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin)
+func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error {
+ log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin)
// bin is always less then 256 and it is safe to convert it to type uint8
stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
- if streams, ok := subs[p.ID()]; ok {
- // delete live and history streams from the map, so that it won't be removed with a Quit request
- delete(streams, stream)
- delete(streams, getHistoryStream(stream))
- }
- err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
+ err := r.RequestSubscription(id, stream, NewRange(0, 0), High)
if err != nil {
- log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
- return false
+ log.Debug("Request subscription", "err", err, "peer", id, "stream", stream)
+ return err
}
- return true
+ return nil
}
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
@@ -619,24 +391,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
return p.handleUnsubscribeMsg(msg)
case *OfferedHashesMsg:
- return p.handleOfferedHashesMsg(ctx, msg)
+ go func() {
+ err := p.handleOfferedHashesMsg(ctx, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *TakeoverProofMsg:
- return p.handleTakeoverProofMsg(ctx, msg)
+ go func() {
+ err := p.handleTakeoverProofMsg(ctx, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *WantedHashesMsg:
- return p.handleWantedHashesMsg(ctx, msg)
+ go func() {
+ err := p.handleWantedHashesMsg(ctx, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *ChunkDeliveryMsgRetrieval:
// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
- return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+ go func() {
+ err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *ChunkDeliveryMsgSyncing:
// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
- return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+ go func() {
+ err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *RetrieveRequestMsg:
- return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
+ go func() {
+ err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
+ if err != nil {
+ log.Error(err.Error())
+ p.Drop()
+ }
+ }()
+ return nil
case *RequestSubscriptionMsg:
return p.handleRequestSubscription(ctx, msg)
@@ -767,7 +581,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+ if err := p.Send(context.TODO(), tp); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
@@ -969,15 +783,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
}
/*
-GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
+GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
It can be called via RPC.
It returns a map of node IDs with an array of string representations of Stream objects.
*/
-func (api *API) GetPeerSubscriptions() map[string][]string {
- //create the empty map
+func (api *API) GetPeerServerSubscriptions() map[string][]string {
pstreams := make(map[string][]string)
- //iterate all streamer peers
api.streamer.peersMu.RLock()
defer api.streamer.peersMu.RUnlock()