aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/uber/jaeger-client-go/tracer.go
blob: 198c32eb4fde7317df702d650e4cf8a9c1330b5d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import (
    "fmt"
    "io"
    "os"
    "reflect"
    "strconv"
    "sync"
    "time"

    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"

    "github.com/uber/jaeger-client-go/internal/baggage"
    "github.com/uber/jaeger-client-go/internal/throttler"
    "github.com/uber/jaeger-client-go/log"
    "github.com/uber/jaeger-client-go/utils"
)

// Tracer implements opentracing.Tracer.
type Tracer struct {
    serviceName string
    hostIPv4    uint32 // this is for zipkin endpoint conversion

    sampler  Sampler
    reporter Reporter
    metrics  Metrics
    logger   log.Logger

    timeNow      func() time.Time
    randomNumber func() uint64

    options struct {
        poolSpans            bool
        gen128Bit            bool // whether to generate 128bit trace IDs
        zipkinSharedRPCSpan  bool
        highTraceIDGenerator func() uint64 // custom high trace ID generator
        maxTagValueLength    int
        // more options to come
    }
    // pool for Span objects
    spanPool sync.Pool

    injectors  map[interface{}]Injector
    extractors map[interface{}]Extractor

    observer compositeObserver

    tags    []Tag
    process Process

    baggageRestrictionManager baggage.RestrictionManager
    baggageSetter             *baggageSetter

    debugThrottler throttler.Throttler
}

// NewTracer creates Tracer implementation that reports tracing to Jaeger.
// The returned io.Closer can be used in shutdown hooks to ensure that the internal
// queue of the Reporter is drained and all buffered spans are submitted to collectors.
func NewTracer(
    serviceName string,
    sampler Sampler,
    reporter Reporter,
    options ...TracerOption,
) (opentracing.Tracer, io.Closer) {
    t := &Tracer{
        serviceName: serviceName,
        sampler:     sampler,
        reporter:    reporter,
        injectors:   make(map[interface{}]Injector),
        extractors:  make(map[interface{}]Extractor),
        metrics:     *NewNullMetrics(),
        spanPool: sync.Pool{New: func() interface{} {
            return &Span{}
        }},
    }

    for _, option := range options {
        option(t)
    }

    // register default injectors/extractors unless they are already provided via options
    textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
    t.addCodec(opentracing.TextMap, textPropagator, textPropagator)

    httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
    t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)

    binaryPropagator := newBinaryPropagator(t)
    t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)

    // TODO remove after TChannel supports OpenTracing
    interopPropagator := &jaegerTraceContextPropagator{tracer: t}
    t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)

    zipkinPropagator := &zipkinPropagator{tracer: t}
    t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)

    if t.baggageRestrictionManager != nil {
        t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
    } else {
        t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
    }
    if t.debugThrottler == nil {
        t.debugThrottler = throttler.DefaultThrottler{}
    }

    if t.randomNumber == nil {
        rng := utils.NewRand(time.Now().UnixNano())
        t.randomNumber = func() uint64 {
            return uint64(rng.Int63())
        }
    }
    if t.timeNow == nil {
        t.timeNow = time.Now
    }
    if t.logger == nil {
        t.logger = log.NullLogger
    }
    // Set tracer-level tags
    t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
    if hostname, err := os.Hostname(); err == nil {
        t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
    }
    if ip, err := utils.HostIP(); err == nil {
        t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
        t.hostIPv4 = utils.PackIPAsUint32(ip)
    } else {
        t.logger.Error("Unable to determine this host's IP address: " + err.Error())
    }

    if t.options.gen128Bit {
        if t.options.highTraceIDGenerator == nil {
            t.options.highTraceIDGenerator = t.randomNumber
        }
    } else if t.options.highTraceIDGenerator != nil {
        t.logger.Error("Overriding high trace ID generator but not generating " +
            "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
    }
    if t.options.maxTagValueLength == 0 {
        t.options.maxTagValueLength = DefaultMaxTagValueLength
    }
    t.process = Process{
        Service: serviceName,
        UUID:    strconv.FormatUint(t.randomNumber(), 16),
        Tags:    t.tags,
    }
    if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
        throttler.SetProcess(t.process)
    }

    return t, t
}

// addCodec adds registers injector and extractor for given propagation format if not already defined.
func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
    if _, ok := t.injectors[format]; !ok {
        t.injectors[format] = injector
    }
    if _, ok := t.extractors[format]; !ok {
        t.extractors[format] = extractor
    }
}

// StartSpan implements StartSpan() method of opentracing.Tracer.
func (t *Tracer) StartSpan(
    operationName string,
    options ...opentracing.StartSpanOption,
) opentracing.Span {
    sso := opentracing.StartSpanOptions{}
    for _, o := range options {
        o.Apply(&sso)
    }
    return t.startSpanWithOptions(operationName, sso)
}

func (t *Tracer) startSpanWithOptions(
    operationName string,
    options opentracing.StartSpanOptions,
) opentracing.Span {
    if options.StartTime.IsZero() {
        options.StartTime = t.timeNow()
    }

    // Predicate whether the given span context is a valid reference
    // which may be used as parent / debug ID / baggage items source
    isValidReference := func(ctx SpanContext) bool {
        return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
    }

    var references []Reference
    var parent SpanContext
    var hasParent bool // need this because `parent` is a value, not reference
    for _, ref := range options.References {
        ctx, ok := ref.ReferencedContext.(SpanContext)
        if !ok {
            t.logger.Error(fmt.Sprintf(
                "Reference contains invalid type of SpanReference: %s",
                reflect.ValueOf(ref.ReferencedContext)))
            continue
        }
        if !isValidReference(ctx) {
            continue
        }
        references = append(references, Reference{Type: ref.Type, Context: ctx})
        if !hasParent {
            parent = ctx
            hasParent = ref.Type == opentracing.ChildOfRef
        }
    }
    if !hasParent && isValidReference(parent) {
        // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
        // the FollowFromRef as the parent
        hasParent = true
    }

    rpcServer := false
    if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
        rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
    }

    var samplerTags []Tag
    var ctx SpanContext
    newTrace := false
    if !hasParent || !parent.IsValid() {
        newTrace = true
        ctx.traceID.Low = t.randomID()
        if t.options.gen128Bit {
            ctx.traceID.High = t.options.highTraceIDGenerator()
        }
        ctx.spanID = SpanID(ctx.traceID.Low)
        ctx.parentID = 0
        ctx.flags = byte(0)
        if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
            ctx.flags |= (flagSampled | flagDebug)
            samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
        } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
            ctx.flags |= flagSampled
            samplerTags = tags
        }
    } else {
        ctx.traceID = parent.traceID
        if rpcServer && t.options.zipkinSharedRPCSpan {
            // Support Zipkin's one-span-per-RPC model
            ctx.spanID = parent.spanID
            ctx.parentID = parent.parentID
        } else {
            ctx.spanID = SpanID(t.randomID())
            ctx.parentID = parent.spanID
        }
        ctx.flags = parent.flags
    }
    if hasParent {
        // copy baggage items
        if l := len(parent.baggage); l > 0 {
            ctx.baggage = make(map[string]string, len(parent.baggage))
            for k, v := range parent.baggage {
                ctx.baggage[k] = v
            }
        }
    }

    sp := t.newSpan()
    sp.context = ctx
    sp.observer = t.observer.OnStartSpan(sp, operationName, options)
    return t.startSpanInternal(
        sp,
        operationName,
        options.StartTime,
        samplerTags,
        options.Tags,
        newTrace,
        rpcServer,
        references,
    )
}

// Inject implements Inject() method of opentracing.Tracer
func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
    c, ok := ctx.(SpanContext)
    if !ok {
        return opentracing.ErrInvalidSpanContext
    }
    if injector, ok := t.injectors[format]; ok {
        return injector.Inject(c, carrier)
    }
    return opentracing.ErrUnsupportedFormat
}

// Extract implements Extract() method of opentracing.Tracer
func (t *Tracer) Extract(
    format interface{},
    carrier interface{},
) (opentracing.SpanContext, error) {
    if extractor, ok := t.extractors[format]; ok {
        return extractor.Extract(carrier)
    }
    return nil, opentracing.ErrUnsupportedFormat
}

// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
func (t *Tracer) Close() error {
    t.reporter.Close()
    t.sampler.Close()
    if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
        mgr.Close()
    }
    if throttler, ok := t.debugThrottler.(io.Closer); ok {
        throttler.Close()
    }
    return nil
}

// Tags returns a slice of tracer-level tags.
func (t *Tracer) Tags() []opentracing.Tag {
    tags := make([]opentracing.Tag, len(t.tags))
    for i, tag := range t.tags {
        tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
    }
    return tags
}

// newSpan returns an instance of a clean Span object.
// If options.PoolSpans is true, the spans are retrieved from an object pool.
func (t *Tracer) newSpan() *Span {
    if !t.options.poolSpans {
        return &Span{}
    }
    sp := t.spanPool.Get().(*Span)
    sp.context = emptyContext
    sp.tracer = nil
    sp.tags = nil
    sp.logs = nil
    return sp
}

func (t *Tracer) startSpanInternal(
    sp *Span,
    operationName string,
    startTime time.Time,
    internalTags []Tag,
    tags opentracing.Tags,
    newTrace bool,
    rpcServer bool,
    references []Reference,
) *Span {
    sp.tracer = t
    sp.operationName = operationName
    sp.startTime = startTime
    sp.duration = 0
    sp.references = references
    sp.firstInProcess = rpcServer || sp.context.parentID == 0
    if len(tags) > 0 || len(internalTags) > 0 {
        sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
        copy(sp.tags, internalTags)
        for k, v := range tags {
            sp.observer.OnSetTag(k, v)
            if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) {
                continue
            }
            sp.setTagNoLocking(k, v)
        }
    }
    // emit metrics
    if sp.context.IsSampled() {
        t.metrics.SpansStartedSampled.Inc(1)
        if newTrace {
            // We cannot simply check for parentID==0 because in Zipkin model the
            // server-side RPC span has the exact same trace/span/parent IDs as the
            // calling client-side span, but obviously the server side span is
            // no longer a root span of the trace.
            t.metrics.TracesStartedSampled.Inc(1)
        } else if sp.firstInProcess {
            t.metrics.TracesJoinedSampled.Inc(1)
        }
    } else {
        t.metrics.SpansStartedNotSampled.Inc(1)
        if newTrace {
            t.metrics.TracesStartedNotSampled.Inc(1)
        } else if sp.firstInProcess {
            t.metrics.TracesJoinedNotSampled.Inc(1)
        }
    }
    return sp
}

func (t *Tracer) reportSpan(sp *Span) {
    t.metrics.SpansFinished.Inc(1)
    if sp.context.IsSampled() {
        t.reporter.Report(sp)
    }
    if t.options.poolSpans {
        t.spanPool.Put(sp)
    }
}

// randomID generates a random trace/span ID, using tracer.random() generator.
// It never returns 0.
func (t *Tracer) randomID() uint64 {
    val := t.randomNumber()
    for val == 0 {
        val = t.randomNumber()
    }
    return val
}

// (NB) span must hold the lock before making this call
func (t *Tracer) setBaggage(sp *Span, key, value string) {
    t.baggageSetter.setBaggage(sp, key, value)
}

// (NB) span must hold the lock before making this call
func (t *Tracer) isDebugAllowed(operation string) bool {
    return t.debugThrottler.IsAllowed(operation)
}