From 993b145f25845e50e8af41ffb1116eaee381d693 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 11 Apr 2019 10:26:52 +0200 Subject: swarm/storage/localstore: fix export db.Put signature cmd/swarm/swarm-smoke: improve smoke tests (#1337) swarm/network: remove dead code (#1339) swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342) --- swarm/network/stream/messages.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'swarm/network/stream/messages.go') diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..b60d2fcc9 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -247,7 +247,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-errC: if err != nil { log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() return } case <-ctx.Done(): @@ -289,7 +289,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-c.next: if err != nil { log.Warn("c.next error dropping peer", "err", err) - p.Drop(err) + p.Drop() return } case <-c.quit: -- cgit v1.2.3 From 3030893a21b17a0e90ddd0047d0f310fee8335a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Tue, 30 Apr 2019 09:28:46 +0200 Subject: swarm/network: update syncing --- swarm/network/stream/messages.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'swarm/network/stream/messages.go') diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b60d2fcc9..821cdaa9a 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -175,7 +175,11 @@ type QuitMsg struct { } func (p *Peer) handleQuitMsg(req *QuitMsg) error { - return p.removeClient(req.Stream) + err := p.removeClient(req.Stream) + if _, ok := err.(*notFoundError); ok { + return nil + } + return err } // OfferedHashesMsg is the protocol msg for offering to hand over a -- cgit v1.2.3 From 8802b9ce7fe8f9764bd50dfe072b37ebc9e84cb8 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 6 May 2019 10:58:42 +0200 Subject: swarm-smoke: add syncDelay flag swarm/network: add want delay timer to syncing (#1367) swarm/network: synchronise peer.close() (#1369) --- swarm/network/stream/messages.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'swarm/network/stream/messages.go') diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 821cdaa9a..b43fdeee2 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -223,6 +223,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err) } + var wantDelaySet bool + var wantDelay time.Time + ctr := 0 errC := make(chan error) ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) @@ -234,6 +237,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if wait := c.NeedData(ctx, hash); wait != nil { ctr++ want.Set(i/HashSize, true) + + // measure how long it takes before we mark chunks for retrieval, and actually send the request + if !wantDelaySet { + wantDelaySet = true + wantDelay = time.Now() + } + // create request and wait until the chunk data arrives and is stored go func(w func(context.Context) error) { select { @@ -304,6 +314,12 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + + // record want delay + if wantDelaySet { + metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay) + } + err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority error", "err", err) -- cgit v1.2.3 From 3e9ba576694e7df018b3c9fa2c1d3aa7d55031fe Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Tue, 7 May 2019 13:46:26 +0200 Subject: swarm/storage: improve instrumentation swarm/storage/localstore: fix broken metric (#1373) p2p/protocols: count different messages (#1374) cmd/swarm: disable snapshot create test due to constant flakes (#1376) swarm/network: remove redundant goroutine (#1377) --- swarm/network/stream/messages.go | 58 +++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 33 deletions(-) (limited to 'swarm/network/stream/messages.go') diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b43fdeee2..339101b88 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -24,9 +24,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/opentracing/opentracing-go" ) var syncBatchTimeout = 30 * time.Second @@ -201,12 +199,6 @@ func (m OfferedHashesMsg) String() string { func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1) - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "handle.offered.hashes") - defer sp.Finish() - c, _, err := p.getOrSetClient(req.Stream, req.From, req.To) if err != nil { return err @@ -297,34 +289,34 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg From: from, To: to, } - go func() { - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - select { - case err := <-c.next: - if err != nil { - log.Warn("c.next error dropping peer", "err", err) - p.Drop() - return - } - case <-c.quit: - log.Debug("client.handleOfferedHashesMsg() quit") - return - case <-ctx.Done(): - log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) - return - } - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - - // record want delay - if wantDelaySet { - metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay) - } - err := p.SendPriority(ctx, msg, c.priority) + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + select { + case err := <-c.next: if err != nil { - log.Warn("SendPriority error", "err", err) + log.Warn("c.next error dropping peer", "err", err) + p.Drop() + return err } - }() + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return nil + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return nil + } + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + + // record want delay + if wantDelaySet { + metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay) + } + + err = p.SendPriority(ctx, msg, c.priority) + if err != nil { + log.Warn("SendPriority error", "err", err) + } + return nil } -- cgit v1.2.3