aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/messages.go')
-rw-r--r--swarm/network/stream/messages.go29
1 files changed, 19 insertions, 10 deletions
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 5668a73e9..a19f63589 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -17,6 +17,7 @@
package stream
import (
+ "context"
"errors"
"fmt"
"sync"
@@ -25,7 +26,9 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
// Stream defines a unique stream identifier.
@@ -71,17 +74,17 @@ type RequestSubscriptionMsg struct {
Priority uint8 // delivered on priority channel
}
-func (p *Peer) handleRequestSubscription(req *RequestSubscriptionMsg) (err error) {
+func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) {
log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr.ID(), p.ID(), req.Stream))
return p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority)
}
-func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) {
+func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err error) {
metrics.GetOrRegisterCounter("peer.handlesubscribemsg", nil).Inc(1)
defer func() {
if err != nil {
- if e := p.Send(SubscribeErrorMsg{
+ if e := p.Send(context.TODO(), SubscribeErrorMsg{
Error: err.Error(),
}); e != nil {
log.Error("send stream subscribe error message", "err", err)
@@ -181,9 +184,15 @@ func (m OfferedHashesMsg) String() string {
// handleOfferedHashesMsg protocol msg handler calls the incoming streamer interface
// Filter method
-func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
+func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1)
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ "handle.offered.hashes")
+ defer sp.Finish()
+
c, _, err := p.getOrSetClient(req.Stream, req.From, req.To)
if err != nil {
return err
@@ -197,7 +206,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
- if wait := c.NeedData(hash); wait != nil {
+ if wait := c.NeedData(ctx, hash); wait != nil {
want.Set(i/HashSize, true)
wg.Add(1)
// create request and wait until the chunk data arrives and is stored
@@ -260,7 +269,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(msg, c.priority)
+ err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority err, so dropping peer", "err", err)
p.Drop(err)
@@ -285,7 +294,7 @@ func (m WantedHashesMsg) String() string {
// handleWantedHashesMsg protocol msg handler
// * sends the next batch of unsynced keys
// * sends the actual data chunks as per WantedHashesMsg
-func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
+func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1)
log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
@@ -314,7 +323,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg.actualget", nil).Inc(1)
hash := hashes[i*HashSize : (i+1)*HashSize]
- data, err := s.GetData(hash)
+ data, err := s.GetData(ctx, hash)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
@@ -323,7 +332,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
if length := len(chunk.SData); length < 9 {
log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
}
- if err := p.Deliver(chunk, s.priority); err != nil {
+ if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}
}
@@ -363,7 +372,7 @@ func (m TakeoverProofMsg) String() string {
return fmt.Sprintf("Stream: '%v' [%v-%v], Root: %x, Sig: %x", m.Stream, m.Start, m.End, m.Root, m.Sig)
}
-func (p *Peer) handleTakeoverProofMsg(req *TakeoverProofMsg) error {
+func (p *Peer) handleTakeoverProofMsg(ctx context.Context, req *TakeoverProofMsg) error {
_, err := p.getServer(req.Stream)
// store the strongest takeoverproof for the stream in streamer
return err