diff options
Diffstat (limited to 'swarm/network/stream')
-rw-r--r-- | swarm/network/stream/delivery.go | 7 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 26 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 5 |
4 files changed, 26 insertions, 14 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 988afcce8..fae6994f0 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * ctx, osp = spancontext.StartSpan( ctx, "retrieve.request") - defer osp.Finish() s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) if err != nil { @@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * }() go func() { + defer osp.Finish() chunk, err := d.chunkStore.Get(ctx, req.Addr) if err != nil { retrieveChunkFail.Inc(1) @@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch ctx, osp = spancontext.StartSpan( ctx, "chunk.delivery") - defer osp.Finish() processReceivedChunksCount.Inc(1) go func() { + defer osp.Finish() + req.peer = sp err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) if err != nil { @@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( Addr: req.Addr, SkipCheck: req.SkipCheck, HopCount: req.HopCount, - }, Top) + }, Top, "request.from.peers") if err != nil { return nil, nil, err } diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..de4e8a3bb 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(ctx, msg, c.priority) + err := p.SendPriority(ctx, msg, c.priority, "") if err != nil { log.Warn("SendPriority error", "err", err) } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 4bccf56f5..68da8f44a 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -65,6 +65,7 @@ type Peer struct { // on creating a new client in offered hashes handler. clientParams map[Stream]*clientParams quit chan struct{} + spans sync.Map } type WrappedPriorityMsg struct { @@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { clients: make(map[Stream]*client), clientParams: make(map[Stream]*clientParams), quit: make(chan struct{}), + spans: sync.Map{}, } ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) + defer p.spans.Delete(wmsg.Context) + sp, ok := p.spans.Load(wmsg.Context) + if ok { + defer sp.(opentracing.Span).Finish() + } err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) @@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { // Deliver sends a storeRequestMsg protocol message to the peer // Depending on the `syncing` parameter we send different message types func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { - var sp opentracing.Span var msg interface{} spanName := "send.chunk.delivery" @@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, } spanName += ".retrieval" } - ctx, sp = spancontext.StartSpan( - ctx, - spanName) - defer sp.Finish() - return p.SendPriority(ctx, msg, priority) + return p.SendPriority(ctx, msg, priority, spanName) } // SendPriority sends message to the peer using the outgoing priority queue -func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { +func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) + if traceId != "" { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + traceId, + ) + p.spans.Store(ctx, sp) + } wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, @@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) - return p.SendPriority(ctx, msg, s.priority) + return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes") } func (p *Peer) getServer(s Stream) (*server, error) { diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index e06048053..ee4f57c1a 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.SendPriority(context.TODO(), msg, priority, "") } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -730,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error if err != nil { return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + + if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { |