diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 390 |
1 files changed, 101 insertions, 289 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 1038e52d0..9cdf5c04b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -18,7 +18,6 @@ package stream import ( "context" - "errors" "fmt" "math" "reflect" @@ -49,7 +48,6 @@ const ( // Enumerate options for syncing and retrieval type SyncingOption int -type RetrievalOption int // Syncing options const ( @@ -61,17 +59,6 @@ const ( SyncingAutoSubscribe ) -const ( - // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) - RetrievalDisabled RetrievalOption = iota - // Only the client side of the retrieve request is registered. - // (light nodes do not serve retrieve requests) - // once the client is registered, subscription to retrieve request stream is always sent - RetrievalClientOnly - // Both client and server funcs are registered, subscribe sent automatically - RetrievalEnabled -) - // subscriptionFunc is used to determine what to do in order to perform subscriptions // usually we would start to really subscribe to nodes, but for tests other functionality may be needed // (see TestRequestPeerSubscriptions in streamer_test.go) @@ -79,59 +66,58 @@ var subscriptionFunc = doRequestSubscription // Registry registry for outgoing and incoming streamer constructors type Registry struct { - addr enode.ID - api *API - skipCheck bool - clientMu sync.RWMutex - serverMu sync.RWMutex - peersMu sync.RWMutex - serverFuncs map[string]func(*Peer, string, bool) (Server, error) - clientFuncs map[string]func(*Peer, string, bool) (Client, error) - peers map[enode.ID]*Peer - delivery *Delivery - intervalsStore state.Store - autoRetrieval bool // automatically subscribe to retrieve request stream - maxPeerServers int - spec *protocols.Spec //this protocol's spec - balance protocols.Balance //implements protocols.Balance, for accounting - prices protocols.Prices //implements protocols.Prices, provides prices to accounting - quit chan struct{} // terminates registry goroutines + addr enode.ID + api *API + skipCheck bool + clientMu sync.RWMutex + serverMu sync.RWMutex + peersMu sync.RWMutex + serverFuncs map[string]func(*Peer, string, bool) (Server, error) + clientFuncs map[string]func(*Peer, string, bool) (Client, error) + peers map[enode.ID]*Peer + delivery *Delivery + intervalsStore state.Store + maxPeerServers int + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting + quit chan struct{} // terminates registry goroutines + syncMode SyncingOption + syncUpdateDelay time.Duration } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - Syncing SyncingOption // Defines syncing behavior - Retrieval RetrievalOption // Defines retrieval behavior + Syncing SyncingOption // Defines syncing behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } - // check if retrieval has been disabled - retrieval := options.Retrieval != RetrievalDisabled quit := make(chan struct{}) streamer := &Registry{ - addr: localID, - skipCheck: options.SkipCheck, - serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), - clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), - peers: make(map[enode.ID]*Peer), - delivery: delivery, - intervalsStore: intervalsStore, - autoRetrieval: retrieval, - maxPeerServers: options.MaxPeerServers, - balance: balance, - quit: quit, + addr: localID, + skipCheck: options.SkipCheck, + serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), + clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), + peers: make(map[enode.ID]*Peer), + delivery: delivery, + intervalsStore: intervalsStore, + maxPeerServers: options.MaxPeerServers, + balance: balance, + quit: quit, + syncUpdateDelay: options.SyncUpdateDelay, + syncMode: options.Syncing, } streamer.setupSpec() @@ -139,124 +125,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) - if options.Retrieval == RetrievalEnabled { - streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { - if !live { - return nil, errors.New("only live retrieval requests supported") - } - return NewSwarmChunkServer(delivery.chunkStore), nil - }) - } - - // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) - if options.Retrieval != RetrievalDisabled { - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) - } - // If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { - RegisterSwarmSyncerServer(streamer, syncChunkStore) - RegisterSwarmSyncerClient(streamer, syncChunkStore) - } - - // if syncing is set to automatically subscribe to the syncing stream, start the subscription process - if options.Syncing == SyncingAutoSubscribe { - // latestIntC function ensures that - // - receiving from the in chan is not blocked by processing inside the for loop - // - the latest int value is delivered to the loop after the processing is done - // In context of NeighbourhoodDepthC: - // after the syncing is done updating inside the loop, we do not need to update on the intermediate - // depth changes, only to the latest one - latestIntC := func(in <-chan int) <-chan int { - out := make(chan int, 1) - - go func() { - defer close(out) - - for { - select { - case i, ok := <-in: - if !ok { - return - } - select { - case <-out: - default: - } - out <- i - case <-quit: - return - } - } - }() - - return out - } - - kad := streamer.delivery.kad - // get notification channels from Kademlia before returning - // from this function to avoid race with Close method and - // the goroutine created below - depthC := latestIntC(kad.NeighbourhoodDepthC()) - addressBookSizeC := latestIntC(kad.AddrCountC()) - - go func() { - // wait for kademlia table to be healthy - // but return if Registry is closed before - select { - case <-time.After(options.SyncUpdateDelay): - case <-quit: - return - } - - // initial requests for syncing subscription to peers - streamer.updateSyncing() - - for depth := range depthC { - log.Debug("Kademlia neighbourhood depth change", "depth", depth) - - // Prevent too early sync subscriptions by waiting until there are no - // new peers connecting. Sync streams updating will be done after no - // peers are connected for at least SyncUpdateDelay period. - timer := time.NewTimer(options.SyncUpdateDelay) - // Hard limit to sync update delay, preventing long delays - // on a very dynamic network - maxTimer := time.NewTimer(3 * time.Minute) - loop: - for { - select { - case <-maxTimer.C: - // force syncing update when a hard timeout is reached - log.Trace("Sync subscriptions update on hard timeout") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case <-timer.C: - // start syncing as no new peers has been added to kademlia - // for some time - log.Trace("Sync subscriptions update") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case size := <-addressBookSizeC: - log.Trace("Kademlia address book size changed on depth change", "size", size) - // new peers has been added to kademlia, - // reset the timer to prevent early sync subscriptions - if !timer.Stop() { - <-timer.C - } - timer.Reset(options.SyncUpdateDelay) - case <-quit: - break loop - } - } - timer.Stop() - maxTimer.Stop() - } - }() + RegisterSwarmSyncerServer(streamer, netStore) + RegisterSwarmSyncerClient(streamer, netStore) } return streamer @@ -381,7 +253,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.Send(context.TODO(), msg) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -422,8 +294,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { func (r *Registry) Close() error { // Stop sending neighborhood depth change and address count // change from Kademlia that were initiated in NewRegistry constructor. - r.delivery.kad.CloseNeighbourhoodDepthC() - r.delivery.kad.CloseAddrCountC() + r.delivery.Close() close(r.quit) return r.intervalsStore.Close() } @@ -438,6 +309,7 @@ func (r *Registry) getPeer(peerId enode.ID) *Peer { func (r *Registry) setPeer(peer *Peer) { r.peersMu.Lock() r.peers[peer.ID()] = peer + metrics.GetOrRegisterCounter("registry.setpeer", nil).Inc(1) metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } @@ -445,6 +317,7 @@ func (r *Registry) setPeer(peer *Peer) { func (r *Registry) deletePeer(peer *Peer) { r.peersMu.Lock() delete(r.peers, peer.ID()) + metrics.GetOrRegisterCounter("registry.deletepeer", nil).Inc(1) metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } @@ -458,132 +331,31 @@ func (r *Registry) peersCount() (c int) { // Run protocol run function func (r *Registry) Run(p *network.BzzPeer) error { - sp := NewPeer(p.Peer, r) + sp := NewPeer(p, r) r.setPeer(sp) + + if r.syncMode == SyncingAutoSubscribe { + go sp.runUpdateSyncing() + } + defer r.deletePeer(sp) defer close(sp.quit) defer sp.close() - if r.autoRetrieval && !p.LightNode { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - return sp.Run(sp.HandleMsg) } -// updateSyncing subscribes to SYNC streams by iterating over the -// kademlia connections and bins. If there are existing SYNC streams -// and they are no longer required after iteration, request to Quit -// them will be send to appropriate peers. -func (r *Registry) updateSyncing() { - kad := r.delivery.kad - // map of all SYNC streams for all peers - // used at the and of the function to remove servers - // that are not needed anymore - subs := make(map[enode.ID]map[Stream]struct{}) - r.peersMu.RLock() - for id, peer := range r.peers { - peer.serverMu.RLock() - for stream := range peer.servers { - if stream.Name == "SYNC" { - if _, ok := subs[id]; !ok { - subs[id] = make(map[Stream]struct{}) - } - subs[id][stream] = struct{}{} - } - } - peer.serverMu.RUnlock() - } - r.peersMu.RUnlock() - - // start requesting subscriptions from peers - r.requestPeerSubscriptions(kad, subs) - - // remove SYNC servers that do not need to be subscribed - for id, streams := range subs { - if len(streams) == 0 { - continue - } - peer := r.getPeer(id) - if peer == nil { - continue - } - for stream := range streams { - log.Debug("Remove sync server", "peer", id, "stream", stream) - err := r.Quit(peer.ID(), stream) - if err != nil && err != p2p.ErrShuttingDown { - log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream) - } - } - } -} - -// requestPeerSubscriptions calls on each live peer in the kademlia table -// and sends a `RequestSubscription` to peers according to their bin -// and their relationship with kademlia's depth. -// Also check `TestRequestPeerSubscriptions` in order to understand the -// expected behavior. -// The function expects: -// * the kademlia -// * a map of subscriptions -// * the actual function to subscribe -// (in case of the test, it doesn't do real subscriptions) -func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) { - - var startPo int - var endPo int - var ok bool - - // kademlia's depth - kadDepth := kad.NeighbourhoodDepth() - // request subscriptions for all nodes and bins - // nil as base takes the node's base; we need to pass 255 as `EachConn` runs - // from deepest bins backwards - kad.EachConn(nil, 255, func(p *network.Peer, po int) bool { - // nodes that do not provide stream protocol - // should not be subscribed, e.g. bootnodes - if !p.HasCap("stream") { - return true - } - //if the peer's bin is shallower than the kademlia depth, - //only the peer's bin should be subscribed - if po < kadDepth { - startPo = po - endPo = po - } else { - //if the peer's bin is equal or deeper than the kademlia depth, - //each bin from the depth up to k.MaxProxDisplay should be subscribed - startPo = kadDepth - endPo = kad.MaxProxDisplay - } - - for bin := startPo; bin <= endPo; bin++ { - //do the actual subscription - ok = subscriptionFunc(r, p, uint8(bin), subs) - } - return ok - }) -} - // doRequestSubscription sends the actual RequestSubscription to the peer -func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { - log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin) +func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error { + log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin) // bin is always less then 256 and it is safe to convert it to type uint8 stream := NewStream("SYNC", FormatSyncBinKey(bin), true) - if streams, ok := subs[p.ID()]; ok { - // delete live and history streams from the map, so that it won't be removed with a Quit request - delete(streams, stream) - delete(streams, getHistoryStream(stream)) - } - err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High) + err := r.RequestSubscription(id, stream, NewRange(0, 0), High) if err != nil { - log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream) - return false + log.Debug("Request subscription", "err", err, "peer", id, "stream", stream) + return err } - return true + return nil } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { @@ -619,24 +391,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(ctx, msg) + go func() { + err := p.handleOfferedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(ctx, msg) + go func() { + err := p.handleTakeoverProofMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *WantedHashesMsg: - return p.handleWantedHashesMsg(ctx, msg) + go func() { + err := p.handleWantedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *ChunkDeliveryMsgRetrieval: // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *ChunkDeliveryMsgSyncing: // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + go func() { + err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RequestSubscriptionMsg: return p.handleRequestSubscription(ctx, msg) @@ -767,7 +581,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + if err := p.Send(context.TODO(), tp); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -969,15 +783,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { } /* -GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects. */ -func (api *API) GetPeerSubscriptions() map[string][]string { - //create the empty map +func (api *API) GetPeerServerSubscriptions() map[string][]string { pstreams := make(map[string][]string) - //iterate all streamer peers api.streamer.peersMu.RLock() defer api.streamer.peersMu.RUnlock() |