diff options
author | Anton Evangelatov <anton.evangelatov@gmail.com> | 2018-07-13 23:40:28 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-07-13 23:40:28 +0800 |
commit | 7c9314f231a7ddffbbbc5fec16c65519a0121eeb (patch) | |
tree | dbc4021b66ee8968ad747036741fac7e1b972a39 /swarm/network/stream/messages.go | |
parent | f7d3678c28c4b92e45a458e4785bd0f1cdc20e34 (diff) | |
download | dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.gz dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.bz2 dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.lz dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.xz dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.zst dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.zip |
swarm: integrate OpenTracing; propagate ctx to internal APIs (#17169)
* swarm: propagate ctx, enable opentracing
* swarm/tracing: log error when tracing is misconfigured
Diffstat (limited to 'swarm/network/stream/messages.go')
-rw-r--r-- | swarm/network/stream/messages.go | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 5668a73e9..a19f63589 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -17,6 +17,7 @@ package stream import ( + "context" "errors" "fmt" "sync" @@ -25,7 +26,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Stream defines a unique stream identifier. @@ -71,17 +74,17 @@ type RequestSubscriptionMsg struct { Priority uint8 // delivered on priority channel } -func (p *Peer) handleRequestSubscription(req *RequestSubscriptionMsg) (err error) { +func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) { log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr.ID(), p.ID(), req.Stream)) return p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority) } -func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) { +func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err error) { metrics.GetOrRegisterCounter("peer.handlesubscribemsg", nil).Inc(1) defer func() { if err != nil { - if e := p.Send(SubscribeErrorMsg{ + if e := p.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), }); e != nil { log.Error("send stream subscribe error message", "err", err) @@ -181,9 +184,15 @@ func (m OfferedHashesMsg) String() string { // handleOfferedHashesMsg protocol msg handler calls the incoming streamer interface // Filter method -func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { +func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1) + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "handle.offered.hashes") + defer sp.Finish() + c, _, err := p.getOrSetClient(req.Stream, req.From, req.To) if err != nil { return err @@ -197,7 +206,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] - if wait := c.NeedData(hash); wait != nil { + if wait := c.NeedData(ctx, hash); wait != nil { want.Set(i/HashSize, true) wg.Add(1) // create request and wait until the chunk data arrives and is stored @@ -260,7 +269,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error { return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(msg, c.priority) + err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority err, so dropping peer", "err", err) p.Drop(err) @@ -285,7 +294,7 @@ func (m WantedHashesMsg) String() string { // handleWantedHashesMsg protocol msg handler // * sends the next batch of unsynced keys // * sends the actual data chunks as per WantedHashesMsg -func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { +func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1) log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) @@ -314,7 +323,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg.actualget", nil).Inc(1) hash := hashes[i*HashSize : (i+1)*HashSize] - data, err := s.GetData(hash) + data, err := s.GetData(ctx, hash) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } @@ -323,7 +332,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error { if length := len(chunk.SData); length < 9 { log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) } - if err := p.Deliver(chunk, s.priority); err != nil { + if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } } @@ -363,7 +372,7 @@ func (m TakeoverProofMsg) String() string { return fmt.Sprintf("Stream: '%v' [%v-%v], Root: %x, Sig: %x", m.Stream, m.Start, m.End, m.Root, m.Sig) } -func (p *Peer) handleTakeoverProofMsg(req *TakeoverProofMsg) error { +func (p *Peer) handleTakeoverProofMsg(ctx context.Context, req *TakeoverProofMsg) error { _, err := p.getServer(req.Stream) // store the strongest takeoverproof for the stream in streamer return err |