aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery.go
diff options
context:
space:
mode:
authorlash <nolash@users.noreply.github.com>2019-02-20 21:50:37 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2019-02-20 21:50:37 +0800
commitd36e974ba303d12d79d769d0811dd5babcf6688f (patch)
tree22073c5ae7d6e0b243ed7f4bed4ed9aae054a116 /swarm/network/stream/delivery.go
parent460d206f309fc0884c666bd191a1b6a4b63462fc (diff)
downloadgo-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.tar
go-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.tar.gz
go-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.tar.bz2
go-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.tar.lz
go-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.tar.xz
go-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.tar.zst
go-tangerine-d36e974ba303d12d79d769d0811dd5babcf6688f.zip
swarm/network: Keep span across roundtrip (#19140)
* swarm/newtork: WIP Span request span until delivery and put * swarm/storage: Introduce new trace across single fetcher lifespan * swarm/network: Put span ids for sendpriority in context value * swarm: Add global span store in tracing * swarm/tracing: Add context key constants * swarm/tracing: Add comments * swarm/storage: Remove redundant fix for filestore * swarm/tracing: Elaborate constants comments * swarm/network, swarm/storage, swarm:tracing: Minor cleanup
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r--swarm/network/stream/delivery.go27
1 files changed, 18 insertions, 9 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index fae6994f0..02c5f222c 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
)
@@ -143,7 +144,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
- "retrieve.request")
+ "stream.handle.retrieve")
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
@@ -207,17 +208,19 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
//defines a chunk delivery for syncing (without accounting)
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
-// TODO: Fix context SNAFU
+// chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
- var osp opentracing.Span
- ctx, osp = spancontext.StartSpan(
- ctx,
- "chunk.delivery")
processReceivedChunksCount.Inc(1)
+ // retrieve the span for the originating retrieverequest
+ spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
+ span := tracing.ShiftSpanByKey(spanId)
+
go func() {
- defer osp.Finish()
+ if span != nil {
+ defer span.(opentracing.Span).Finish()
+ }
req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
@@ -233,7 +236,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
return nil
}
-// RequestFromPeers sends a chunk retrieve request to
+// RequestFromPeers sends a chunk retrieve request to a peer
+// The most eligible peer that hasn't already been sent to is chosen
+// TODO: define "eligible"
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
var sp *Peer
@@ -268,11 +273,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
}
}
+ // setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
+ // this span will finish only when delivery is handled (or times out)
+ ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
+ ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
HopCount: req.HopCount,
- }, Top, "request.from.peers")
+ }, Top)
if err != nil {
return nil, nil, err
}