diff options
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r-- | swarm/network/stream/peer.go | 184 |
1 files changed, 175 insertions, 9 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 152814bd4..28fd06e4d 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -24,8 +24,10 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/network" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -54,7 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers") // Peer is the Peer extension for the streaming protocol type Peer struct { - *protocols.Peer + *network.BzzPeer streamer *Registry pq *pq.PriorityQueue serverMu sync.RWMutex @@ -74,9 +76,9 @@ type WrappedPriorityMsg struct { } // NewPeer is the constructor for Peer -func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { +func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer { p := &Peer{ - Peer: peer, + BzzPeer: peer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: streamer, servers: make(map[Stream]*server), @@ -90,7 +92,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() } }) @@ -134,7 +136,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { var msg interface{} - spanName := "send.chunk.delivery" + metrics.GetOrRegisterCounter("peer.deliver", nil).Inc(1) //we send different types of messages if delivery is for syncing or retrievals, //even if handling and content of the message are the same, @@ -144,16 +146,13 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, Addr: chunk.Address(), SData: chunk.Data(), } - spanName += ".syncing" } else { msg = &ChunkDeliveryMsgRetrieval{ Addr: chunk.Address(), SData: chunk.Data(), } - spanName += ".retrieval" } - ctx = context.WithValue(ctx, "stream_send_tag", nil) return p.SendPriority(ctx, msg, priority) } @@ -416,7 +415,174 @@ func (p *Peer) removeClientParams(s Stream) error { } func (p *Peer) close() { + p.serverMu.Lock() + defer p.serverMu.Unlock() + for _, s := range p.servers { s.Close() } + + p.servers = nil +} + +// runUpdateSyncing is a long running function that creates the initial +// syncing subscriptions to the peer and waits for neighbourhood depth change +// to create new ones or quit existing ones based on the new neighbourhood depth +// and if peer enters or leaves nearest neighbourhood by using +// syncSubscriptionsDiff and updateSyncSubscriptions functions. +func (p *Peer) runUpdateSyncing() { + timer := time.NewTimer(p.streamer.syncUpdateDelay) + defer timer.Stop() + + select { + case <-timer.C: + case <-p.streamer.quit: + return + } + + kad := p.streamer.delivery.kad + po := chunk.Proximity(p.BzzAddr.Over(), kad.BaseAddr()) + + depth := kad.NeighbourhoodDepth() + + log.Debug("update syncing subscriptions: initial", "peer", p.ID(), "po", po, "depth", depth) + + // initial subscriptions + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay)) + + depthChangeSignal, unsubscribeDepthChangeSignal := kad.SubscribeToNeighbourhoodDepthChange() + defer unsubscribeDepthChangeSignal() + + prevDepth := depth + for { + select { + case _, ok := <-depthChangeSignal: + if !ok { + return + } + // update subscriptions for this peer when depth changes + depth := kad.NeighbourhoodDepth() + log.Debug("update syncing subscriptions", "peer", p.ID(), "po", po, "depth", depth) + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) + prevDepth = depth + case <-p.streamer.quit: + return + } + } + log.Debug("update syncing subscriptions: exiting", "peer", p.ID()) +} + +// updateSyncSubscriptions accepts two slices of integers, the first one +// representing proximity order bins for required syncing subscriptions +// and the second one representing bins for syncing subscriptions that +// need to be removed. This function sends request for subscription +// messages and quit messages for provided bins. +func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { + if p.streamer.getPeer(p.ID()) == nil { + log.Debug("update syncing subscriptions", "peer not found", p.ID()) + return + } + log.Debug("update syncing subscriptions", "peer", p.ID(), "subscribe", subBins, "quit", quitBins) + for _, po := range subBins { + p.subscribeSync(po) + } + for _, po := range quitBins { + p.quitSync(po) + } +} + +// subscribeSync send the request for syncing subscriptions to the peer +// using subscriptionFunc. This function is used to request syncing subscriptions +// when new peer is added to the registry and on neighbourhood depth change. +func (p *Peer) subscribeSync(po int) { + err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) + if err != nil { + log.Error("subscription", "err", err) + } +} + +// quitSync sends the quit message for live and history syncing streams to the peer. +// This function is used in runUpdateSyncing indirectly over updateSyncSubscriptions +// to remove unneeded syncing subscriptions on neighbourhood depth change. +func (p *Peer) quitSync(po int) { + live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) + history := getHistoryStream(live) + err := p.streamer.Quit(p.ID(), live) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", live) + } + err = p.streamer.Quit(p.ID(), history) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", history) + } + + err = p.removeServer(live) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } + err = p.removeServer(history) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } +} + +// syncSubscriptionsDiff calculates to which proximity order bins a peer +// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth +// change from prevDepth to newDepth. Max argument limits the number of +// proximity order bins. Returned values are slices of integers which represent +// proximity order bins, the first one to which additional subscriptions need to +// be requested and the second one which subscriptions need to be quit. Argument +// prevDepth with value less then 0 represents no previous depth, used for +// initial syncing subscriptions. +func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []int) { + newStart, newEnd := syncBins(peerPO, newDepth, max) + if prevDepth < 0 { + // no previous depth, return the complete range + // for subscriptions requests and nothing for quitting + return intRange(newStart, newEnd), nil + } + + prevStart, prevEnd := syncBins(peerPO, prevDepth, max) + + if newStart < prevStart { + subBins = append(subBins, intRange(newStart, prevStart)...) + } + + if prevStart < newStart { + quitBins = append(quitBins, intRange(prevStart, newStart)...) + } + + if newEnd < prevEnd { + quitBins = append(quitBins, intRange(newEnd, prevEnd)...) + } + + if prevEnd < newEnd { + subBins = append(subBins, intRange(prevEnd, newEnd)...) + } + + return subBins, quitBins +} + +// syncBins returns the range to which proximity order bins syncing +// subscriptions need to be requested, based on peer proximity and +// kademlia neighbourhood depth. Returned range is [start,end), inclusive for +// start and exclusive for end. +func syncBins(peerPO, depth, max int) (start, end int) { + if peerPO < depth { + // subscribe only to peerPO bin if it is not + // in the nearest neighbourhood + return peerPO, peerPO + 1 + } + // subscribe from depth to max bin if the peer + // is in the nearest neighbourhood + return depth, max + 1 +} + +// intRange returns the slice of integers [start,end). The start +// is inclusive and the end is not. +func intRange(start, end int) (r []int) { + for i := start; i < end; i++ { + r = append(r, i) + } + return r } |