diff options
author | Anton Evangelatov <anton.evangelatov@gmail.com> | 2019-03-21 04:30:34 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-21 04:30:34 +0800 |
commit | baded64d8819ece2bb715bf707882017dca03ae4 (patch) | |
tree | f7a198688d2be68ce4fad5030363ae31b1500eef /swarm | |
parent | c53c5e616f04ae8b041bfb64309cbc7f3e70303a (diff) | |
download | go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.gz go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.bz2 go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.lz go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.xz go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.zst go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.zip |
swarm/network: measure time of messages in priority queue (#19250)
Diffstat (limited to 'swarm')
-rw-r--r-- | swarm/network/fetcher.go | 7 | ||||
-rw-r--r-- | swarm/network/priorityqueue/priorityqueue.go | 23 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 12 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 2 | ||||
-rw-r--r-- | swarm/storage/chunker.go | 5 | ||||
-rw-r--r-- | swarm/storage/netstore.go | 3 | ||||
-rw-r--r-- | swarm/swarm.go | 2 |
7 files changed, 40 insertions, 14 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go index f7deead3d..5c0dfefce 100644 --- a/swarm/network/fetcher.go +++ b/swarm/network/fetcher.go @@ -204,24 +204,24 @@ func (f *Fetcher) run(peers *sync.Map) { // incoming request case hopCount = <-f.requestC: - log.Trace("new request", "request addr", f.addr) // 2) chunk is requested, set requested flag // launch a request iff none been launched yet doRequest = !requested + log.Trace("new request", "request addr", f.addr, "doRequest", doRequest) requested = true // peer we requested from is gone. fall back to another // and remove the peer from the peers map case id := <-gone: - log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr) peers.Delete(id.String()) doRequest = requested + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest) // search timeout: too much time passed since the last request, // extend the search to a new peer if we can find one case <-waitC: - log.Trace("search timed out: requesting", "request addr", f.addr) doRequest = requested + log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest) // all Fetcher context closed, can quit case <-f.ctx.Done(): @@ -288,6 +288,7 @@ func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources for i = 0; i < len(sources); i++ { req.Source = sources[i] var err error + log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String()) sourceID, quit, err = f.protoRequestFunc(f.ctx, req) if err == nil { // remove the peer from known sources diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go index 538502605..056e85ec1 100644 --- a/swarm/network/priorityqueue/priorityqueue.go +++ b/swarm/network/priorityqueue/priorityqueue.go @@ -28,8 +28,9 @@ package priorityqueue import ( "context" "errors" + "time" - "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) var ( @@ -69,13 +70,16 @@ READ: case <-ctx.Done(): return case x := <-q: - log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p])) - f(x) + val := x.(struct { + v interface{} + t time.Time + }) + f(val.v) + metrics.GetOrRegisterResettingTimer("pq.run", nil).UpdateSince(val.t) p = top default: if p > 0 { p-- - log.Trace("priority.queue p > 0", "p", p) continue READ } p = top @@ -83,7 +87,6 @@ READ: case <-ctx.Done(): return case <-pq.wakeup: - log.Trace("priority.queue wakeup", "p", p) } } } @@ -95,9 +98,15 @@ func (pq *PriorityQueue) Push(x interface{}, p int) error { if p < 0 || p >= len(pq.Queues) { return errBadPriority } - log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p])) + val := struct { + v interface{} + t time.Time + }{ + x, + time.Now(), + } select { - case pq.Queues[p] <- x: + case pq.Queues[p] <- val: default: return ErrContention } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 01ae7f943..bc4f1f665 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -185,6 +185,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } + osp.LogFields(olog.Bool("delivered", true)) return } osp.LogFields(olog.Bool("skipCheck", false)) @@ -216,6 +217,10 @@ type ChunkDeliveryMsgSyncing ChunkDeliveryMsg // chunk delivery msg is response to retrieverequest msg func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "handle.chunk.delivery") processReceivedChunksCount.Inc(1) @@ -223,13 +228,18 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr) span := tracing.ShiftSpanByKey(spanId) + log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID()) + go func() { + defer osp.Finish() + if span != nil { span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) defer span.Finish() } req.peer = sp + log.Trace("handle.chunk.delivery", "put", req.Addr) err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) if err != nil { if err == storage.ErrChunkInvalid { @@ -239,6 +249,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch req.peer.Drop(err) } } + log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err) }() return nil } @@ -284,6 +295,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( // this span will finish only when delivery is handled (or times out) ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request") ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr)) + log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr) err := sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: req.Addr, SkipCheck: req.SkipCheck, diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index c7c489152..1038e52d0 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -910,7 +910,7 @@ func (r *Registry) APIs() []rpc.API { Namespace: "stream", Version: "3.0", Service: r.api, - Public: true, + Public: false, }, } } diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 5b36b477e..b2f0f5633 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -536,7 +536,6 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in chunkData, err := r.getter.Get(ctx, Reference(childAddress)) if err != nil { metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) - log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): case <-quitC: @@ -561,12 +560,12 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in // Read keeps a cursor so cannot be called simulateously, see ReadAt func (r *LazyChunkReader) Read(b []byte) (read int, err error) { - log.Debug("lazychunkreader.read", "key", r.addr) + log.Trace("lazychunkreader.read", "key", r.addr) metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1) read, err = r.ReadAt(b, r.off) if err != nil && err != io.EOF { - log.Debug("lazychunkreader.readat", "read", read, "err", err) + log.Trace("lazychunkreader.readat", "read", read, "err", err) metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1) } diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index e3845489e..7741b8f7b 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -87,7 +87,9 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error { // if chunk is now put in the store, check if there was an active fetcher and call deliver on it // (this delivers the chunk to requestors via the fetcher) + log.Trace("n.getFetcher", "ref", ch.Address()) if f := n.getFetcher(ch.Address()); f != nil { + log.Trace("n.getFetcher deliver", "ref", ch.Address()) f.deliver(ctx, ch) } return nil @@ -341,5 +343,6 @@ func (f *fetcher) deliver(ctx context.Context, ch Chunk) { f.chunk = ch // closing the deliveredC channel will terminate ongoing requests close(f.deliveredC) + log.Trace("n.getFetcher close deliveredC", "ref", ch.Address()) }) } diff --git a/swarm/swarm.go b/swarm/swarm.go index b4b08c5c5..ae78ccd48 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -522,6 +522,8 @@ func (s *Swarm) APIs() []rpc.API { apis = append(apis, s.bzz.APIs()...) + apis = append(apis, s.streamer.APIs()...) + if s.ps != nil { apis = append(apis, s.ps.APIs()...) } |