// Copyright (c) 2017 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rpcmetrics import ( "strconv" "sync" "time" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/uber/jaeger-lib/metrics" jaeger "github.com/uber/jaeger-client-go" ) const defaultMaxNumberOfEndpoints = 200 // Observer is an observer that can emit RPC metrics. type Observer struct { metricsByEndpoint *MetricsByEndpoint } // NewObserver creates a new observer that can emit RPC metrics. func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer { return &Observer{ metricsByEndpoint: newMetricsByEndpoint( metricsFactory, normalizer, defaultMaxNumberOfEndpoints, ), } } // OnStartSpan creates a new Observer for the span. func (o *Observer) OnStartSpan( operationName string, options opentracing.StartSpanOptions, ) jaeger.SpanObserver { return NewSpanObserver(o.metricsByEndpoint, operationName, options) } // SpanKind identifies the span as inboud, outbound, or internal type SpanKind int const ( // Local span kind Local SpanKind = iota // Inbound span kind Inbound // Outbound span kind Outbound ) // SpanObserver collects RPC metrics type SpanObserver struct { metricsByEndpoint *MetricsByEndpoint operationName string startTime time.Time mux sync.Mutex kind SpanKind httpStatusCode uint16 err bool } // NewSpanObserver creates a new SpanObserver that can emit RPC metrics. func NewSpanObserver( metricsByEndpoint *MetricsByEndpoint, operationName string, options opentracing.StartSpanOptions, ) *SpanObserver { so := &SpanObserver{ metricsByEndpoint: metricsByEndpoint, operationName: operationName, startTime: options.StartTime, } for k, v := range options.Tags { so.handleTagInLock(k, v) } return so } // handleTags watches for special tags // - SpanKind // - HttpStatusCode // - Error func (so *SpanObserver) handleTagInLock(key string, value interface{}) { if key == string(ext.SpanKind) { if v, ok := value.(ext.SpanKindEnum); ok { value = string(v) } if v, ok := value.(string); ok { if v == string(ext.SpanKindRPCClientEnum) { so.kind = Outbound } else if v == string(ext.SpanKindRPCServerEnum) { so.kind = Inbound } } return } if key == string(ext.HTTPStatusCode) { if v, ok := value.(uint16); ok { so.httpStatusCode = v } else if v, ok := value.(int); ok { so.httpStatusCode = uint16(v) } else if v, ok := value.(string); ok { if vv, err := strconv.Atoi(v); err == nil { so.httpStatusCode = uint16(vv) } } return } if key == string(ext.Error) { if v, ok := value.(bool); ok { so.err = v } else if v, ok := value.(string); ok { if vv, err := strconv.ParseBool(v); err == nil { so.err = vv } } return } } // OnFinish emits the RPC metrics. It only has an effect when operation name // is not blank, and the span kind is an RPC server. func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) { so.mux.Lock() defer so.mux.Unlock() if so.operationName == "" || so.kind != Inbound { return } mets := so.metricsByEndpoint.get(so.operationName) latency := options.FinishTime.Sub(so.startTime) if so.err { mets.RequestCountFailures.Inc(1) mets.RequestLatencyFailures.Record(latency) } else { mets.RequestCountSuccess.Inc(1) mets.RequestLatencySuccess.Record(latency) } mets.recordHTTPStatusCode(so.httpStatusCode) } // OnSetOperationName records new operation name. func (so *SpanObserver) OnSetOperationName(operationName string) { so.mux.Lock() so.operationName = operationName so.mux.Unlock() } // OnSetTag implements SpanObserver func (so *SpanObserver) OnSetTag(key string, value interface{}) { so.mux.Lock() so.handleTagInLock(key, value) so.mux.Unlock() }