aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2019-03-21 04:30:34 +0800
committerGitHub <noreply@github.com>2019-03-21 04:30:34 +0800
commitbaded64d8819ece2bb715bf707882017dca03ae4 (patch)
treef7a198688d2be68ce4fad5030363ae31b1500eef
parentc53c5e616f04ae8b041bfb64309cbc7f3e70303a (diff)
downloadgo-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar
go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.gz
go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.bz2
go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.lz
go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.xz
go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.tar.zst
go-tangerine-baded64d8819ece2bb715bf707882017dca03ae4.zip
swarm/network: measure time of messages in priority queue (#19250)
-rw-r--r--cmd/swarm/swarm-smoke/main.go7
-rw-r--r--cmd/swarm/swarm-smoke/upload_and_sync.go35
-rw-r--r--metrics/influxdb/influxdb.go1
-rw-r--r--swarm/network/fetcher.go7
-rw-r--r--swarm/network/priorityqueue/priorityqueue.go23
-rw-r--r--swarm/network/stream/delivery.go12
-rw-r--r--swarm/network/stream/stream.go2
-rw-r--r--swarm/storage/chunker.go5
-rw-r--r--swarm/storage/netstore.go3
-rw-r--r--swarm/swarm.go2
-rw-r--r--vendor/github.com/opentracing/opentracing-go/CHANGELOG.md2
-rw-r--r--vendor/github.com/opentracing/opentracing-go/Makefile18
-rw-r--r--vendor/github.com/opentracing/opentracing-go/README.md4
-rw-r--r--vendor/github.com/opentracing/opentracing-go/globaltracer.go18
-rw-r--r--vendor/github.com/opentracing/opentracing-go/propagation.go2
-rw-r--r--vendor/vendor.json6
16 files changed, 87 insertions, 60 deletions
diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go
index 43d2c1ff5..860fbcc1d 100644
--- a/cmd/swarm/swarm-smoke/main.go
+++ b/cmd/swarm/swarm-smoke/main.go
@@ -40,6 +40,7 @@ var (
allhosts string
hosts []string
filesize int
+ inputSeed int
syncDelay int
httpPort int
wsPort int
@@ -75,6 +76,12 @@ func main() {
Destination: &wsPort,
},
cli.IntFlag{
+ Name: "seed",
+ Value: 0,
+ Usage: "input seed in case we need deterministic upload",
+ Destination: &inputSeed,
+ },
+ cli.IntFlag{
Name: "filesize",
Value: 1024,
Usage: "file size for generated random file in KB",
diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go
index d1032f821..6c20a4fa6 100644
--- a/cmd/swarm/swarm-smoke/upload_and_sync.go
+++ b/cmd/swarm/swarm-smoke/upload_and_sync.go
@@ -39,6 +39,11 @@ import (
)
func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
+ // use input seed if it has been set
+ if inputSeed != 0 {
+ seed = inputSeed
+ }
+
randomBytes := testutil.RandomBytes(seed, filesize*1000)
errc := make(chan error)
@@ -47,37 +52,28 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
errc <- uploadAndSync(ctx, randomBytes, tuid)
}()
+ var err error
select {
- case err := <-errc:
+ case err = <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
- return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
- e := fmt.Errorf("timeout after %v sec", timeout)
- // trigger debug functionality on randomBytes
- err := trackChunks(randomBytes[:])
- if err != nil {
- e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err)
- }
-
- return e
+ err = fmt.Errorf("timeout after %v sec", timeout)
}
- // trigger debug functionality on randomBytes even on successful runs
- err := trackChunks(randomBytes[:])
- if err != nil {
- log.Error(err.Error())
+ // trigger debug functionality on randomBytes
+ e := trackChunks(randomBytes[:])
+ if e != nil {
+ log.Error(e.Error())
}
- return nil
+ return err
}
func trackChunks(testData []byte) error {
- log.Warn("Test timed out, running chunk debug sequence")
-
addrs, err := getAllRefs(testData)
if err != nil {
return err
@@ -94,14 +90,14 @@ func trackChunks(testData []byte) error {
rpcClient, err := rpc.Dial(httpHost)
if err != nil {
- log.Error("Error dialing host", "err", err)
+ log.Error("error dialing host", "err", err, "host", httpHost)
continue
}
var hasInfo []api.HasInfo
err = rpcClient.Call(&hasInfo, "bzz_has", addrs)
if err != nil {
- log.Error("Error calling host", "err", err)
+ log.Error("error calling rpc client", "err", err, "host", httpHost)
continue
}
@@ -125,7 +121,6 @@ func trackChunks(testData []byte) error {
}
func getAllRefs(testData []byte) (storage.AddressCollection, error) {
- log.Trace("Getting all references for given root hash")
datadir, err := ioutil.TempDir("", "chunk-debug")
if err != nil {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go
index c4ef92723..6619915fd 100644
--- a/metrics/influxdb/influxdb.go
+++ b/metrics/influxdb/influxdb.go
@@ -91,6 +91,7 @@ func (r *reporter) makeClient() (err error) {
URL: r.url,
Username: r.username,
Password: r.password,
+ Timeout: 10 * time.Second,
})
return
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
index f7deead3d..5c0dfefce 100644
--- a/swarm/network/fetcher.go
+++ b/swarm/network/fetcher.go
@@ -204,24 +204,24 @@ func (f *Fetcher) run(peers *sync.Map) {
// incoming request
case hopCount = <-f.requestC:
- log.Trace("new request", "request addr", f.addr)
// 2) chunk is requested, set requested flag
// launch a request iff none been launched yet
doRequest = !requested
+ log.Trace("new request", "request addr", f.addr, "doRequest", doRequest)
requested = true
// peer we requested from is gone. fall back to another
// and remove the peer from the peers map
case id := <-gone:
- log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
peers.Delete(id.String())
doRequest = requested
+ log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest)
// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case <-waitC:
- log.Trace("search timed out: requesting", "request addr", f.addr)
doRequest = requested
+ log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest)
// all Fetcher context closed, can quit
case <-f.ctx.Done():
@@ -288,6 +288,7 @@ func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
+ log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String())
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go
index 538502605..056e85ec1 100644
--- a/swarm/network/priorityqueue/priorityqueue.go
+++ b/swarm/network/priorityqueue/priorityqueue.go
@@ -28,8 +28,9 @@ package priorityqueue
import (
"context"
"errors"
+ "time"
- "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
)
var (
@@ -69,13 +70,16 @@ READ:
case <-ctx.Done():
return
case x := <-q:
- log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
- f(x)
+ val := x.(struct {
+ v interface{}
+ t time.Time
+ })
+ f(val.v)
+ metrics.GetOrRegisterResettingTimer("pq.run", nil).UpdateSince(val.t)
p = top
default:
if p > 0 {
p--
- log.Trace("priority.queue p > 0", "p", p)
continue READ
}
p = top
@@ -83,7 +87,6 @@ READ:
case <-ctx.Done():
return
case <-pq.wakeup:
- log.Trace("priority.queue wakeup", "p", p)
}
}
}
@@ -95,9 +98,15 @@ func (pq *PriorityQueue) Push(x interface{}, p int) error {
if p < 0 || p >= len(pq.Queues) {
return errBadPriority
}
- log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
+ val := struct {
+ v interface{}
+ t time.Time
+ }{
+ x,
+ time.Now(),
+ }
select {
- case pq.Queues[p] <- x:
+ case pq.Queues[p] <- val:
default:
return ErrContention
}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 01ae7f943..bc4f1f665 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -185,6 +185,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
+ osp.LogFields(olog.Bool("delivered", true))
return
}
osp.LogFields(olog.Bool("skipCheck", false))
@@ -216,6 +217,10 @@ type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
// chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "handle.chunk.delivery")
processReceivedChunksCount.Inc(1)
@@ -223,13 +228,18 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
span := tracing.ShiftSpanByKey(spanId)
+ log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID())
+
go func() {
+ defer osp.Finish()
+
if span != nil {
span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
defer span.Finish()
}
req.peer = sp
+ log.Trace("handle.chunk.delivery", "put", req.Addr)
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
@@ -239,6 +249,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
req.peer.Drop(err)
}
}
+ log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err)
}()
return nil
}
@@ -284,6 +295,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
// this span will finish only when delivery is handled (or times out)
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
+ log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index c7c489152..1038e52d0 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -910,7 +910,7 @@ func (r *Registry) APIs() []rpc.API {
Namespace: "stream",
Version: "3.0",
Service: r.api,
- Public: true,
+ Public: false,
},
}
}
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index 5b36b477e..b2f0f5633 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -536,7 +536,6 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
chunkData, err := r.getter.Get(ctx, Reference(childAddress))
if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
- log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
select {
case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
case <-quitC:
@@ -561,12 +560,12 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
// Read keeps a cursor so cannot be called simulateously, see ReadAt
func (r *LazyChunkReader) Read(b []byte) (read int, err error) {
- log.Debug("lazychunkreader.read", "key", r.addr)
+ log.Trace("lazychunkreader.read", "key", r.addr)
metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1)
read, err = r.ReadAt(b, r.off)
if err != nil && err != io.EOF {
- log.Debug("lazychunkreader.readat", "read", read, "err", err)
+ log.Trace("lazychunkreader.readat", "read", read, "err", err)
metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1)
}
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index e3845489e..7741b8f7b 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -87,7 +87,9 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
// if chunk is now put in the store, check if there was an active fetcher and call deliver on it
// (this delivers the chunk to requestors via the fetcher)
+ log.Trace("n.getFetcher", "ref", ch.Address())
if f := n.getFetcher(ch.Address()); f != nil {
+ log.Trace("n.getFetcher deliver", "ref", ch.Address())
f.deliver(ctx, ch)
}
return nil
@@ -341,5 +343,6 @@ func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
f.chunk = ch
// closing the deliveredC channel will terminate ongoing requests
close(f.deliveredC)
+ log.Trace("n.getFetcher close deliveredC", "ref", ch.Address())
})
}
diff --git a/swarm/swarm.go b/swarm/swarm.go
index b4b08c5c5..ae78ccd48 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -522,6 +522,8 @@ func (s *Swarm) APIs() []rpc.API {
apis = append(apis, s.bzz.APIs()...)
+ apis = append(apis, s.streamer.APIs()...)
+
if s.ps != nil {
apis = append(apis, s.ps.APIs()...)
}
diff --git a/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md b/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md
index 1fc9fdf7f..ecfb7e3b7 100644
--- a/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md
+++ b/vendor/github.com/opentracing/opentracing-go/CHANGELOG.md
@@ -10,5 +10,5 @@ Changes by Version
1.0.0 (2016-09-26)
-------------------
-- This release implements OpenTracing Specification 1.0 (http://opentracing.io/spec)
+- This release implements OpenTracing Specification 1.0 (https://opentracing.io/spec)
diff --git a/vendor/github.com/opentracing/opentracing-go/Makefile b/vendor/github.com/opentracing/opentracing-go/Makefile
index d49a5c0d4..62abb63f5 100644
--- a/vendor/github.com/opentracing/opentracing-go/Makefile
+++ b/vendor/github.com/opentracing/opentracing-go/Makefile
@@ -1,26 +1,15 @@
-PACKAGES := . ./mocktracer/... ./ext/...
-
.DEFAULT_GOAL := test-and-lint
-.PHONE: test-and-lint
-
+.PHONY: test-and-lint
test-and-lint: test lint
.PHONY: test
test:
go test -v -cover -race ./...
+.PHONY: cover
cover:
- @rm -rf cover-all.out
- $(foreach pkg, $(PACKAGES), $(MAKE) cover-pkg PKG=$(pkg) || true;)
- @grep mode: cover.out > coverage.out
- @cat cover-all.out >> coverage.out
- go tool cover -html=coverage.out -o cover.html
- @rm -rf cover.out cover-all.out coverage.out
-
-cover-pkg:
- go test -coverprofile cover.out $(PKG)
- @grep -v mode: cover.out >> cover-all.out
+ go test -v -coverprofile=coverage.txt -covermode=atomic -race ./...
.PHONY: lint
lint:
@@ -29,4 +18,3 @@ lint:
@# Run again with magic to exit non-zero if golint outputs anything.
@! (golint ./... | read dummy)
go vet ./...
-
diff --git a/vendor/github.com/opentracing/opentracing-go/README.md b/vendor/github.com/opentracing/opentracing-go/README.md
index 007ee237c..6ef1d7c9d 100644
--- a/vendor/github.com/opentracing/opentracing-go/README.md
+++ b/vendor/github.com/opentracing/opentracing-go/README.md
@@ -8,8 +8,8 @@ This package is a Go platform API for OpenTracing.
## Required Reading
In order to understand the Go platform API, one must first be familiar with the
-[OpenTracing project](http://opentracing.io) and
-[terminology](http://opentracing.io/documentation/pages/spec.html) more specifically.
+[OpenTracing project](https://opentracing.io) and
+[terminology](https://opentracing.io/specification/) more specifically.
## API overview for those adding instrumentation
diff --git a/vendor/github.com/opentracing/opentracing-go/globaltracer.go b/vendor/github.com/opentracing/opentracing-go/globaltracer.go
index 8c8e793ff..4f7066a92 100644
--- a/vendor/github.com/opentracing/opentracing-go/globaltracer.go
+++ b/vendor/github.com/opentracing/opentracing-go/globaltracer.go
@@ -1,7 +1,12 @@
package opentracing
+type registeredTracer struct {
+ tracer Tracer
+ isRegistered bool
+}
+
var (
- globalTracer Tracer = NoopTracer{}
+ globalTracer = registeredTracer{NoopTracer{}, false}
)
// SetGlobalTracer sets the [singleton] opentracing.Tracer returned by
@@ -11,22 +16,27 @@ var (
// Prior to calling `SetGlobalTracer`, any Spans started via the `StartSpan`
// (etc) globals are noops.
func SetGlobalTracer(tracer Tracer) {
- globalTracer = tracer
+ globalTracer = registeredTracer{tracer, true}
}
// GlobalTracer returns the global singleton `Tracer` implementation.
// Before `SetGlobalTracer()` is called, the `GlobalTracer()` is a noop
// implementation that drops all data handed to it.
func GlobalTracer() Tracer {
- return globalTracer
+ return globalTracer.tracer
}
// StartSpan defers to `Tracer.StartSpan`. See `GlobalTracer()`.
func StartSpan(operationName string, opts ...StartSpanOption) Span {
- return globalTracer.StartSpan(operationName, opts...)
+ return globalTracer.tracer.StartSpan(operationName, opts...)
}
// InitGlobalTracer is deprecated. Please use SetGlobalTracer.
func InitGlobalTracer(tracer Tracer) {
SetGlobalTracer(tracer)
}
+
+// IsGlobalTracerRegistered returns a `bool` to indicate if a tracer has been globally registered
+func IsGlobalTracerRegistered() bool {
+ return globalTracer.isRegistered
+}
diff --git a/vendor/github.com/opentracing/opentracing-go/propagation.go b/vendor/github.com/opentracing/opentracing-go/propagation.go
index 0dd466a37..b0c275eb0 100644
--- a/vendor/github.com/opentracing/opentracing-go/propagation.go
+++ b/vendor/github.com/opentracing/opentracing-go/propagation.go
@@ -160,7 +160,7 @@ type HTTPHeadersCarrier http.Header
// Set conforms to the TextMapWriter interface.
func (c HTTPHeadersCarrier) Set(key, val string) {
h := http.Header(c)
- h.Add(key, val)
+ h.Set(key, val)
}
// ForeachKey conforms to the TextMapReader interface.
diff --git a/vendor/vendor.json b/vendor/vendor.json
index c2c7d7c3d..e965b4d7f 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -352,10 +352,10 @@
"revisionTime": "2017-01-28T05:05:32Z"
},
{
- "checksumSHA1": "wIcN7tZiF441h08RHAm4NV8cYO4=",
+ "checksumSHA1": "a/DHmc9bdsYlZZcwp6i3xhvV7Pk=",
"path": "github.com/opentracing/opentracing-go",
- "revision": "bd9c3193394760d98b2fa6ebb2291f0cd1d06a7d",
- "revisionTime": "2018-06-06T20:41:48Z"
+ "revision": "25a84ff92183e2f8ac018ba1db54f8a07b3c0e04",
+ "revisionTime": "2019-02-18T02:30:34Z"
},
{
"checksumSHA1": "uhDxBvLEqRAMZKgpTZ8MFuLIIM8=",