diff options
Diffstat (limited to 'swarm/network/stream/messages.go')
-rw-r--r-- | swarm/network/stream/messages.go | 72 |
1 files changed, 42 insertions, 30 deletions
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..339101b88 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -24,9 +24,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/opentracing/opentracing-go" ) var syncBatchTimeout = 30 * time.Second @@ -175,7 +173,11 @@ type QuitMsg struct { } func (p *Peer) handleQuitMsg(req *QuitMsg) error { - return p.removeClient(req.Stream) + err := p.removeClient(req.Stream) + if _, ok := err.(*notFoundError); ok { + return nil + } + return err } // OfferedHashesMsg is the protocol msg for offering to hand over a @@ -197,12 +199,6 @@ func (m OfferedHashesMsg) String() string { func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1) - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "handle.offered.hashes") - defer sp.Finish() - c, _, err := p.getOrSetClient(req.Stream, req.From, req.To) if err != nil { return err @@ -219,6 +215,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err) } + var wantDelaySet bool + var wantDelay time.Time + ctr := 0 errC := make(chan error) ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) @@ -230,6 +229,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if wait := c.NeedData(ctx, hash); wait != nil { ctr++ want.Set(i/HashSize, true) + + // measure how long it takes before we mark chunks for retrieval, and actually send the request + if !wantDelaySet { + wantDelaySet = true + wantDelay = time.Now() + } + // create request and wait until the chunk data arrives and is stored go func(w func(context.Context) error) { select { @@ -247,7 +253,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-errC: if err != nil { log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() return } case <-ctx.Done(): @@ -283,28 +289,34 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg From: from, To: to, } - go func() { - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - select { - case err := <-c.next: - if err != nil { - log.Warn("c.next error dropping peer", "err", err) - p.Drop(err) - return - } - case <-c.quit: - log.Debug("client.handleOfferedHashesMsg() quit") - return - case <-ctx.Done(): - log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) - return - } - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(ctx, msg, c.priority) + + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + select { + case err := <-c.next: if err != nil { - log.Warn("SendPriority error", "err", err) + log.Warn("c.next error dropping peer", "err", err) + p.Drop() + return err } - }() + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return nil + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return nil + } + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + + // record want delay + if wantDelaySet { + metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay) + } + + err = p.SendPriority(ctx, msg, c.priority) + if err != nil { + log.Warn("SendPriority error", "err", err) + } + return nil } |