aboutsummaryrefslogblamecommitdiffstats
path: root/vendor/go.opencensus.io/trace/spanstore.go
blob: c442d990218a3a2df65cc93aeeb3b27866dbb7b2 (plain) (tree)

















































































































































































































































































































                                                                                                           
// Copyright 2017, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
    "sync"
    "time"

    "go.opencensus.io/internal"
)

const (
    maxBucketSize     = 100000
    defaultBucketSize = 10
)

var (
    ssmu       sync.RWMutex // protects spanStores
    spanStores = make(map[string]*spanStore)
)

// This exists purely to avoid exposing internal methods used by z-Pages externally.
type internalOnly struct{}

func init() {
    //TODO(#412): remove
    internal.Trace = &internalOnly{}
}

// ReportActiveSpans returns the active spans for the given name.
func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
    s := spanStoreForName(name)
    if s == nil {
        return nil
    }
    var out []*SpanData
    s.mu.Lock()
    defer s.mu.Unlock()
    for span := range s.active {
        out = append(out, span.makeSpanData())
    }
    return out
}

// ReportSpansByError returns a sample of error spans.
//
// If code is nonzero, only spans with that status code are returned.
func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
    s := spanStoreForName(name)
    if s == nil {
        return nil
    }
    var out []*SpanData
    s.mu.Lock()
    defer s.mu.Unlock()
    if code != 0 {
        if b, ok := s.errors[code]; ok {
            for _, sd := range b.buffer {
                if sd == nil {
                    break
                }
                out = append(out, sd)
            }
        }
    } else {
        for _, b := range s.errors {
            for _, sd := range b.buffer {
                if sd == nil {
                    break
                }
                out = append(out, sd)
            }
        }
    }
    return out
}

// ConfigureBucketSizes sets the number of spans to keep per latency and error
// bucket for different span names.
func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
    for _, bc := range bcs {
        latencyBucketSize := bc.MaxRequestsSucceeded
        if latencyBucketSize < 0 {
            latencyBucketSize = 0
        }
        if latencyBucketSize > maxBucketSize {
            latencyBucketSize = maxBucketSize
        }
        errorBucketSize := bc.MaxRequestsErrors
        if errorBucketSize < 0 {
            errorBucketSize = 0
        }
        if errorBucketSize > maxBucketSize {
            errorBucketSize = maxBucketSize
        }
        spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
    }
}

// ReportSpansPerMethod returns a summary of what spans are being stored for each span name.
func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
    out := make(map[string]internal.PerMethodSummary)
    ssmu.RLock()
    defer ssmu.RUnlock()
    for name, s := range spanStores {
        s.mu.Lock()
        p := internal.PerMethodSummary{
            Active: len(s.active),
        }
        for code, b := range s.errors {
            p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
                ErrorCode: code,
                Size:      b.size(),
            })
        }
        for i, b := range s.latency {
            min, max := latencyBucketBounds(i)
            p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
                MinLatency: min,
                MaxLatency: max,
                Size:       b.size(),
            })
        }
        s.mu.Unlock()
        out[name] = p
    }
    return out
}

// ReportSpansByLatency returns a sample of successful spans.
//
// minLatency is the minimum latency of spans to be returned.
// maxLatency, if nonzero, is the maximum latency of spans to be returned.
func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
    s := spanStoreForName(name)
    if s == nil {
        return nil
    }
    var out []*SpanData
    s.mu.Lock()
    defer s.mu.Unlock()
    for i, b := range s.latency {
        min, max := latencyBucketBounds(i)
        if i+1 != len(s.latency) && max <= minLatency {
            continue
        }
        if maxLatency != 0 && maxLatency < min {
            continue
        }
        for _, sd := range b.buffer {
            if sd == nil {
                break
            }
            if minLatency != 0 || maxLatency != 0 {
                d := sd.EndTime.Sub(sd.StartTime)
                if d < minLatency {
                    continue
                }
                if maxLatency != 0 && d > maxLatency {
                    continue
                }
            }
            out = append(out, sd)
        }
    }
    return out
}

// spanStore keeps track of spans stored for a particular span name.
//
// It contains all active spans; a sample of spans for failed requests,
// categorized by error code; and a sample of spans for successful requests,
// bucketed by latency.
type spanStore struct {
    mu                     sync.Mutex // protects everything below.
    active                 map[*Span]struct{}
    errors                 map[int32]*bucket
    latency                []bucket
    maxSpansPerErrorBucket int
}

// newSpanStore creates a span store.
func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
    s := &spanStore{
        active:                 make(map[*Span]struct{}),
        latency:                make([]bucket, len(defaultLatencies)+1),
        maxSpansPerErrorBucket: errorBucketSize,
    }
    for i := range s.latency {
        s.latency[i] = makeBucket(latencyBucketSize)
    }
    return s
}

// spanStoreForName returns the spanStore for the given name.
//
// It returns nil if it doesn't exist.
func spanStoreForName(name string) *spanStore {
    var s *spanStore
    ssmu.RLock()
    s, _ = spanStores[name]
    ssmu.RUnlock()
    return s
}

// spanStoreForNameCreateIfNew returns the spanStore for the given name.
//
// It creates it if it didn't exist.
func spanStoreForNameCreateIfNew(name string) *spanStore {
    ssmu.RLock()
    s, ok := spanStores[name]
    ssmu.RUnlock()
    if ok {
        return s
    }
    ssmu.Lock()
    defer ssmu.Unlock()
    s, ok = spanStores[name]
    if ok {
        return s
    }
    s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
    spanStores[name] = s
    return s
}

// spanStoreSetSize resizes the spanStore for the given name.
//
// It creates it if it didn't exist.
func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
    ssmu.RLock()
    s, ok := spanStores[name]
    ssmu.RUnlock()
    if ok {
        s.resize(latencyBucketSize, errorBucketSize)
        return
    }
    ssmu.Lock()
    defer ssmu.Unlock()
    s, ok = spanStores[name]
    if ok {
        s.resize(latencyBucketSize, errorBucketSize)
        return
    }
    s = newSpanStore(name, latencyBucketSize, errorBucketSize)
    spanStores[name] = s
}

func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
    s.mu.Lock()
    for i := range s.latency {
        s.latency[i].resize(latencyBucketSize)
    }
    for _, b := range s.errors {
        b.resize(errorBucketSize)
    }
    s.maxSpansPerErrorBucket = errorBucketSize
    s.mu.Unlock()
}

// add adds a span to the active bucket of the spanStore.
func (s *spanStore) add(span *Span) {
    s.mu.Lock()
    s.active[span] = struct{}{}
    s.mu.Unlock()
}

// finished removes a span from the active set, and adds a corresponding
// SpanData to a latency or error bucket.
func (s *spanStore) finished(span *Span, sd *SpanData) {
    latency := sd.EndTime.Sub(sd.StartTime)
    if latency < 0 {
        latency = 0
    }
    code := sd.Status.Code

    s.mu.Lock()
    delete(s.active, span)
    if code == 0 {
        s.latency[latencyBucket(latency)].add(sd)
    } else {
        if s.errors == nil {
            s.errors = make(map[int32]*bucket)
        }
        if b := s.errors[code]; b != nil {
            b.add(sd)
        } else {
            b := makeBucket(s.maxSpansPerErrorBucket)
            s.errors[code] = &b
            b.add(sd)
        }
    }
    s.mu.Unlock()
}