diff options
Diffstat (limited to 'swarm')
-rw-r--r-- | swarm/api/client/client.go | 76 | ||||
-rw-r--r-- | swarm/api/http/middleware.go | 11 | ||||
-rw-r--r-- | swarm/metrics/flags.go | 32 | ||||
-rw-r--r-- | swarm/storage/chunker.go | 8 |
4 files changed, 107 insertions, 20 deletions
diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index d9837ca73..f793ca8b8 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -19,6 +19,7 @@ package client import ( "archive/tar" "bytes" + "context" "encoding/json" "errors" "fmt" @@ -26,6 +27,7 @@ import ( "io/ioutil" "mime/multipart" "net/http" + "net/http/httptrace" "net/textproto" "net/url" "os" @@ -33,9 +35,14 @@ import ( "regexp" "strconv" "strings" + "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/pborman/uuid" ) var ( @@ -474,6 +481,11 @@ type UploadFn func(file *File) error // TarUpload uses the given Uploader to upload files to swarm as a tar stream, // returning the resulting manifest hash func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) { + ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload") + defer sp.Finish() + + var tn time.Time + reqR, reqW := io.Pipe() defer reqR.Close() addr := hash @@ -489,6 +501,12 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t if err != nil { return "", err } + + trace := GetClientTrace("swarm api client - upload tar", "api.client.uploadtar", uuid.New()[:8], &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + req.Header.Set("Content-Type", "application/x-tar") if defaultPath != "" { q := req.URL.Query() @@ -529,8 +547,8 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t } reqW.CloseWithError(err) }() - - res, err := http.DefaultClient.Do(req) + tn = time.Now() + res, err := transport.RoundTrip(req) if err != nil { return "", err } @@ -728,3 +746,57 @@ func (c *Client) GetFeedRequest(query *feed.Query, manifestAddressOrDomain strin } return &metadata, nil } + +func GetClientTrace(traceMsg, metricPrefix, ruid string, tn *time.Time) *httptrace.ClientTrace { + trace := &httptrace.ClientTrace{ + GetConn: func(_ string) { + log.Trace(traceMsg+" - http get", "event", "GetConn", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".getconn", nil).Update(time.Since(*tn)) + }, + GotConn: func(_ httptrace.GotConnInfo) { + log.Trace(traceMsg+" - http get", "event", "GotConn", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".gotconn", nil).Update(time.Since(*tn)) + }, + PutIdleConn: func(err error) { + log.Trace(traceMsg+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err) + metrics.GetOrRegisterResettingTimer(metricPrefix+".putidle", nil).Update(time.Since(*tn)) + }, + GotFirstResponseByte: func() { + log.Trace(traceMsg+" - http get", "event", "GotFirstResponseByte", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".firstbyte", nil).Update(time.Since(*tn)) + }, + Got100Continue: func() { + log.Trace(traceMsg, "event", "Got100Continue", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".got100continue", nil).Update(time.Since(*tn)) + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + log.Trace(traceMsg, "event", "DNSStart", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsstart", nil).Update(time.Since(*tn)) + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + log.Trace(traceMsg, "event", "DNSDone", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsdone", nil).Update(time.Since(*tn)) + }, + ConnectStart: func(network, addr string) { + log.Trace(traceMsg, "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr) + metrics.GetOrRegisterResettingTimer(metricPrefix+".connectstart", nil).Update(time.Since(*tn)) + }, + ConnectDone: func(network, addr string, err error) { + log.Trace(traceMsg, "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err) + metrics.GetOrRegisterResettingTimer(metricPrefix+".connectdone", nil).Update(time.Since(*tn)) + }, + WroteHeaders: func() { + log.Trace(traceMsg, "event", "WroteHeaders(request)", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wroteheaders", nil).Update(time.Since(*tn)) + }, + Wait100Continue: func() { + log.Trace(traceMsg, "event", "Wait100Continue", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wait100continue", nil).Update(time.Since(*tn)) + }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + log.Trace(traceMsg, "event", "WroteRequest", "ruid", ruid) + metrics.GetOrRegisterResettingTimer(metricPrefix+".wroterequest", nil).Update(time.Since(*tn)) + }, + } + return trace +} diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 115a00856..f7f819eab 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -74,13 +74,15 @@ func ParseURI(h http.Handler) http.Handler { func InitLoggingResponseWriter(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(startTime) + tn := time.Now() writer := newLoggingResponseWriter(w) h.ServeHTTP(writer, r) - log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode) - metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).UpdateSince(startTime) + + ts := time.Since(tn) + log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode, "time", ts*time.Millisecond) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).Update(ts) + metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).Update(ts) }) } @@ -93,6 +95,7 @@ func InstrumentOpenTracing(h http.Handler) http.Handler { } spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme) ctx, sp := spancontext.StartSpan(r.Context(), spanName) + defer sp.Finish() h.ServeHTTP(w, r.WithContext(ctx)) }) diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go index 79490fd36..7c12120a6 100644 --- a/swarm/metrics/flags.go +++ b/swarm/metrics/flags.go @@ -27,26 +27,26 @@ import ( ) var ( - metricsEnableInfluxDBExportFlag = cli.BoolFlag{ + MetricsEnableInfluxDBExportFlag = cli.BoolFlag{ Name: "metrics.influxdb.export", Usage: "Enable metrics export/push to an external InfluxDB database", } - metricsInfluxDBEndpointFlag = cli.StringFlag{ + MetricsInfluxDBEndpointFlag = cli.StringFlag{ Name: "metrics.influxdb.endpoint", Usage: "Metrics InfluxDB endpoint", Value: "http://127.0.0.1:8086", } - metricsInfluxDBDatabaseFlag = cli.StringFlag{ + MetricsInfluxDBDatabaseFlag = cli.StringFlag{ Name: "metrics.influxdb.database", Usage: "Metrics InfluxDB database", Value: "metrics", } - metricsInfluxDBUsernameFlag = cli.StringFlag{ + MetricsInfluxDBUsernameFlag = cli.StringFlag{ Name: "metrics.influxdb.username", Usage: "Metrics InfluxDB username", Value: "", } - metricsInfluxDBPasswordFlag = cli.StringFlag{ + MetricsInfluxDBPasswordFlag = cli.StringFlag{ Name: "metrics.influxdb.password", Usage: "Metrics InfluxDB password", Value: "", @@ -55,7 +55,7 @@ var ( // It is used so that we can group all nodes and average a measurement across all of them, but also so // that we can select a specific node and inspect its measurements. // https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key - metricsInfluxDBHostTagFlag = cli.StringFlag{ + MetricsInfluxDBHostTagFlag = cli.StringFlag{ Name: "metrics.influxdb.host.tag", Usage: "Metrics InfluxDB `host` tag attached to all measurements", Value: "localhost", @@ -65,20 +65,24 @@ var ( // Flags holds all command-line flags required for metrics collection. var Flags = []cli.Flag{ utils.MetricsEnabledFlag, - metricsEnableInfluxDBExportFlag, - metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag, + MetricsEnableInfluxDBExportFlag, + MetricsInfluxDBEndpointFlag, + MetricsInfluxDBDatabaseFlag, + MetricsInfluxDBUsernameFlag, + MetricsInfluxDBPasswordFlag, + MetricsInfluxDBHostTagFlag, } func Setup(ctx *cli.Context) { if gethmetrics.Enabled { log.Info("Enabling swarm metrics collection") var ( - enableExport = ctx.GlobalBool(metricsEnableInfluxDBExportFlag.Name) - endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name) - database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name) - username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name) - password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name) - hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name) + enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name) + endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name) ) // Start system runtime metrics collection diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 40292e88f..cbe65372a 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/ethereum/go-ethereum/metrics" ch "github.com/ethereum/go-ethereum/swarm/chunk" @@ -410,10 +411,14 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e log.Debug("lazychunkreader.size", "addr", r.addr) if r.chunkData == nil { + + startTime := time.Now() chunkData, err := r.getter.Get(cctx, Reference(r.addr)) if err != nil { + metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) return 0, err } + metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime) r.chunkData = chunkData s := r.chunkData.Size() log.Debug("lazychunkreader.size", "key", r.addr, "size", s) @@ -542,8 +547,10 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS wg.Add(1) go func(j int64) { childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + startTime := time.Now() chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) if err != nil { + metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime) log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): @@ -551,6 +558,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS } return } + metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime) if l := len(chunkData); l < 9 { select { case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l): |