aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/stream/delivery.go27
-rw-r--r--swarm/network/stream/messages.go2
-rw-r--r--swarm/network/stream/peer.go29
-rw-r--r--swarm/network/stream/stream.go4
4 files changed, 31 insertions, 31 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index fae6994f0..02c5f222c 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
)
@@ -143,7 +144,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
- "retrieve.request")
+ "stream.handle.retrieve")
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
@@ -207,17 +208,19 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
//defines a chunk delivery for syncing (without accounting)
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
-// TODO: Fix context SNAFU
+// chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
- var osp opentracing.Span
- ctx, osp = spancontext.StartSpan(
- ctx,
- "chunk.delivery")
processReceivedChunksCount.Inc(1)
+ // retrieve the span for the originating retrieverequest
+ spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
+ span := tracing.ShiftSpanByKey(spanId)
+
go func() {
- defer osp.Finish()
+ if span != nil {
+ defer span.(opentracing.Span).Finish()
+ }
req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
@@ -233,7 +236,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
return nil
}
-// RequestFromPeers sends a chunk retrieve request to
+// RequestFromPeers sends a chunk retrieve request to a peer
+// The most eligible peer that hasn't already been sent to is chosen
+// TODO: define "eligible"
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
var sp *Peer
@@ -268,11 +273,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
}
}
+ // setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
+ // this span will finish only when delivery is handled (or times out)
+ ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
+ ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
HopCount: req.HopCount,
- }, Top, "request.from.peers")
+ }, Top)
if err != nil {
return nil, nil, err
}
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index de4e8a3bb..b293724cc 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(ctx, msg, c.priority, "")
+ err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority error", "err", err)
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 68da8f44a..c59799e08 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
)
@@ -65,7 +66,6 @@ type Peer struct {
// on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams
quit chan struct{}
- spans sync.Map
}
type WrappedPriorityMsg struct {
@@ -83,16 +83,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}),
- spans: sync.Map{},
}
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
- defer p.spans.Delete(wmsg.Context)
- sp, ok := p.spans.Load(wmsg.Context)
- if ok {
- defer sp.(opentracing.Span).Finish()
- }
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@@ -129,6 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
go func() {
<-p.quit
+
cancel()
}()
return p
@@ -158,21 +153,15 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
spanName += ".retrieval"
}
- return p.SendPriority(ctx, msg, priority, spanName)
+ ctx = context.WithValue(ctx, "stream_send_tag", nil)
+ return p.SendPriority(ctx, msg, priority)
}
// SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
+ tracing.StartSaveSpan(ctx)
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- if traceId != "" {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- traceId,
- )
- p.spans.Store(ctx, sp)
- }
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
@@ -190,7 +179,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
var sp opentracing.Span
ctx, sp := spancontext.StartSpan(
context.TODO(),
- "send.offered.hashes")
+ "send.offered.hashes",
+ )
defer sp.Finish()
hashes, from, to, proof, err := s.setNextBatch(f, t)
@@ -215,7 +205,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
- return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
+ ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
+ return p.SendPriority(ctx, msg, s.priority)
}
func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 622b46e4c..8e2a5f31a 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -381,7 +381,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority, "")
+ return peer.SendPriority(context.TODO(), msg, priority)
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -757,7 +757,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
+ if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {