aboutsummaryrefslogblamecommitdiffstats
path: root/vendor/github.com/uber/jaeger-client-go/rpcmetrics/observer.go
blob: eca5ff6f3b9882a840779c865982121fbac85f47 (plain) (tree)










































































































































































                                                                                       
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import (
    "strconv"
    "sync"
    "time"

    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/uber/jaeger-lib/metrics"

    jaeger "github.com/uber/jaeger-client-go"
)

const defaultMaxNumberOfEndpoints = 200

// Observer is an observer that can emit RPC metrics.
type Observer struct {
    metricsByEndpoint *MetricsByEndpoint
}

// NewObserver creates a new observer that can emit RPC metrics.
func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
    return &Observer{
        metricsByEndpoint: newMetricsByEndpoint(
            metricsFactory,
            normalizer,
            defaultMaxNumberOfEndpoints,
        ),
    }
}

// OnStartSpan creates a new Observer for the span.
func (o *Observer) OnStartSpan(
    operationName string,
    options opentracing.StartSpanOptions,
) jaeger.SpanObserver {
    return NewSpanObserver(o.metricsByEndpoint, operationName, options)
}

// SpanKind identifies the span as inboud, outbound, or internal
type SpanKind int

const (
    // Local span kind
    Local SpanKind = iota
    // Inbound span kind
    Inbound
    // Outbound span kind
    Outbound
)

// SpanObserver collects RPC metrics
type SpanObserver struct {
    metricsByEndpoint *MetricsByEndpoint
    operationName     string
    startTime         time.Time
    mux               sync.Mutex
    kind              SpanKind
    httpStatusCode    uint16
    err               bool
}

// NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
func NewSpanObserver(
    metricsByEndpoint *MetricsByEndpoint,
    operationName string,
    options opentracing.StartSpanOptions,
) *SpanObserver {
    so := &SpanObserver{
        metricsByEndpoint: metricsByEndpoint,
        operationName:     operationName,
        startTime:         options.StartTime,
    }
    for k, v := range options.Tags {
        so.handleTagInLock(k, v)
    }
    return so
}

// handleTags watches for special tags
// - SpanKind
// - HttpStatusCode
// - Error
func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
    if key == string(ext.SpanKind) {
        if v, ok := value.(ext.SpanKindEnum); ok {
            value = string(v)
        }
        if v, ok := value.(string); ok {
            if v == string(ext.SpanKindRPCClientEnum) {
                so.kind = Outbound
            } else if v == string(ext.SpanKindRPCServerEnum) {
                so.kind = Inbound
            }
        }
        return
    }
    if key == string(ext.HTTPStatusCode) {
        if v, ok := value.(uint16); ok {
            so.httpStatusCode = v
        } else if v, ok := value.(int); ok {
            so.httpStatusCode = uint16(v)
        } else if v, ok := value.(string); ok {
            if vv, err := strconv.Atoi(v); err == nil {
                so.httpStatusCode = uint16(vv)
            }
        }
        return
    }
    if key == string(ext.Error) {
        if v, ok := value.(bool); ok {
            so.err = v
        } else if v, ok := value.(string); ok {
            if vv, err := strconv.ParseBool(v); err == nil {
                so.err = vv
            }
        }
        return
    }
}

// OnFinish emits the RPC metrics. It only has an effect when operation name
// is not blank, and the span kind is an RPC server.
func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
    so.mux.Lock()
    defer so.mux.Unlock()

    if so.operationName == "" || so.kind != Inbound {
        return
    }

    mets := so.metricsByEndpoint.get(so.operationName)
    latency := options.FinishTime.Sub(so.startTime)
    if so.err {
        mets.RequestCountFailures.Inc(1)
        mets.RequestLatencyFailures.Record(latency)
    } else {
        mets.RequestCountSuccess.Inc(1)
        mets.RequestLatencySuccess.Record(latency)
    }
    mets.recordHTTPStatusCode(so.httpStatusCode)
}

// OnSetOperationName records new operation name.
func (so *SpanObserver) OnSetOperationName(operationName string) {
    so.mux.Lock()
    so.operationName = operationName
    so.mux.Unlock()
}

// OnSetTag implements SpanObserver
func (so *SpanObserver) OnSetTag(key string, value interface{}) {
    so.mux.Lock()
    so.handleTagInLock(key, value)
    so.mux.Unlock()
}