aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/messages.go')
-rw-r--r--swarm/network/stream/messages.go72
1 files changed, 42 insertions, 30 deletions
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc..339101b88 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -24,9 +24,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/opentracing/opentracing-go"
)
var syncBatchTimeout = 30 * time.Second
@@ -175,7 +173,11 @@ type QuitMsg struct {
}
func (p *Peer) handleQuitMsg(req *QuitMsg) error {
- return p.removeClient(req.Stream)
+ err := p.removeClient(req.Stream)
+ if _, ok := err.(*notFoundError); ok {
+ return nil
+ }
+ return err
}
// OfferedHashesMsg is the protocol msg for offering to hand over a
@@ -197,12 +199,6 @@ func (m OfferedHashesMsg) String() string {
func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1)
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "handle.offered.hashes")
- defer sp.Finish()
-
c, _, err := p.getOrSetClient(req.Stream, req.From, req.To)
if err != nil {
return err
@@ -219,6 +215,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err)
}
+ var wantDelaySet bool
+ var wantDelay time.Time
+
ctr := 0
errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
@@ -230,6 +229,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if wait := c.NeedData(ctx, hash); wait != nil {
ctr++
want.Set(i/HashSize, true)
+
+ // measure how long it takes before we mark chunks for retrieval, and actually send the request
+ if !wantDelaySet {
+ wantDelaySet = true
+ wantDelay = time.Now()
+ }
+
// create request and wait until the chunk data arrives and is stored
go func(w func(context.Context) error) {
select {
@@ -247,7 +253,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
case err := <-errC:
if err != nil {
log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
- p.Drop(err)
+ p.Drop()
return
}
case <-ctx.Done():
@@ -283,28 +289,34 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
From: from,
To: to,
}
- go func() {
- log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- select {
- case err := <-c.next:
- if err != nil {
- log.Warn("c.next error dropping peer", "err", err)
- p.Drop(err)
- return
- }
- case <-c.quit:
- log.Debug("client.handleOfferedHashesMsg() quit")
- return
- case <-ctx.Done():
- log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
- return
- }
- log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(ctx, msg, c.priority)
+
+ log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
+ select {
+ case err := <-c.next:
if err != nil {
- log.Warn("SendPriority error", "err", err)
+ log.Warn("c.next error dropping peer", "err", err)
+ p.Drop()
+ return err
}
- }()
+ case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return nil
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
+ return nil
+ }
+ log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
+
+ // record want delay
+ if wantDelaySet {
+ metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay)
+ }
+
+ err = p.SendPriority(ctx, msg, c.priority)
+ if err != nil {
+ log.Warn("SendPriority error", "err", err)
+ }
+
return nil
}