diff options
Diffstat (limited to 'metrics/influxdb')
-rw-r--r-- | metrics/influxdb/LICENSE | 19 | ||||
-rw-r--r-- | metrics/influxdb/README.md | 30 | ||||
-rw-r--r-- | metrics/influxdb/influxdb.go | 227 |
3 files changed, 276 insertions, 0 deletions
diff --git a/metrics/influxdb/LICENSE b/metrics/influxdb/LICENSE new file mode 100644 index 000000000..e5bf20cdb --- /dev/null +++ b/metrics/influxdb/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2015 Vincent Rischmann + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/metrics/influxdb/README.md b/metrics/influxdb/README.md new file mode 100644 index 000000000..b76b1a3f9 --- /dev/null +++ b/metrics/influxdb/README.md @@ -0,0 +1,30 @@ +go-metrics-influxdb +=================== + +This is a reporter for the [go-metrics](https://github.com/rcrowley/go-metrics) library which will post the metrics to [InfluxDB](https://influxdb.com/). + +Note +---- + +This is only compatible with InfluxDB 0.9+. + +Usage +----- + +```go +import "github.com/vrischmann/go-metrics-influxdb" + +go influxdb.InfluxDB( + metrics.DefaultRegistry, // metrics registry + time.Second * 10, // interval + "http://localhost:8086", // the InfluxDB url + "mydb", // your InfluxDB database + "myuser", // your InfluxDB user + "mypassword", // your InfluxDB password +) +``` + +License +------- + +go-metrics-influxdb is licensed under the MIT license. See the LICENSE file for details. diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go new file mode 100644 index 000000000..d5cb4da66 --- /dev/null +++ b/metrics/influxdb/influxdb.go @@ -0,0 +1,227 @@ +package influxdb + +import ( + "fmt" + "log" + uurl "net/url" + "time" + + "github.com/ethereum/go-ethereum/metrics" + "github.com/influxdata/influxdb/client" +) + +type reporter struct { + reg metrics.Registry + interval time.Duration + + url uurl.URL + database string + username string + password string + namespace string + tags map[string]string + + client *client.Client + + cache map[string]int64 +} + +// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval. +func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) { + InfluxDBWithTags(r, d, url, database, username, password, namespace, nil) +} + +// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags +func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) { + u, err := uurl.Parse(url) + if err != nil { + log.Printf("unable to parse InfluxDB url %s. err=%v", url, err) + return + } + + rep := &reporter{ + reg: r, + interval: d, + url: *u, + database: database, + username: username, + password: password, + namespace: namespace, + tags: tags, + cache: make(map[string]int64), + } + if err := rep.makeClient(); err != nil { + log.Printf("unable to make InfluxDB client. err=%v", err) + return + } + + rep.run() +} + +func (r *reporter) makeClient() (err error) { + r.client, err = client.NewClient(client.Config{ + URL: r.url, + Username: r.username, + Password: r.password, + }) + + return +} + +func (r *reporter) run() { + intervalTicker := time.Tick(r.interval) + pingTicker := time.Tick(time.Second * 5) + + for { + select { + case <-intervalTicker: + if err := r.send(); err != nil { + log.Printf("unable to send to InfluxDB. err=%v", err) + } + case <-pingTicker: + _, _, err := r.client.Ping() + if err != nil { + log.Printf("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err) + + if err = r.makeClient(); err != nil { + log.Printf("unable to make InfluxDB client. err=%v", err) + } + } + } + } +} + +func (r *reporter) send() error { + var pts []client.Point + + r.reg.Each(func(name string, i interface{}) { + now := time.Now() + namespace := r.namespace + + switch metric := i.(type) { + case metrics.Counter: + v := metric.Count() + l := r.cache[name] + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.count", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "value": v - l, + }, + Time: now, + }) + r.cache[name] = v + case metrics.Gauge: + ms := metric.Snapshot() + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.gauge", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "value": ms.Value(), + }, + Time: now, + }) + case metrics.GaugeFloat64: + ms := metric.Snapshot() + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.gauge", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "value": ms.Value(), + }, + Time: now, + }) + case metrics.Histogram: + ms := metric.Snapshot() + ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.histogram", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "count": ms.Count(), + "max": ms.Max(), + "mean": ms.Mean(), + "min": ms.Min(), + "stddev": ms.StdDev(), + "variance": ms.Variance(), + "p50": ps[0], + "p75": ps[1], + "p95": ps[2], + "p99": ps[3], + "p999": ps[4], + "p9999": ps[5], + }, + Time: now, + }) + case metrics.Meter: + ms := metric.Snapshot() + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.meter", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "count": ms.Count(), + "m1": ms.Rate1(), + "m5": ms.Rate5(), + "m15": ms.Rate15(), + "mean": ms.RateMean(), + }, + Time: now, + }) + case metrics.Timer: + ms := metric.Snapshot() + ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.timer", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "count": ms.Count(), + "max": ms.Max(), + "mean": ms.Mean(), + "min": ms.Min(), + "stddev": ms.StdDev(), + "variance": ms.Variance(), + "p50": ps[0], + "p75": ps[1], + "p95": ps[2], + "p99": ps[3], + "p999": ps[4], + "p9999": ps[5], + "m1": ms.Rate1(), + "m5": ms.Rate5(), + "m15": ms.Rate15(), + "meanrate": ms.RateMean(), + }, + Time: now, + }) + case metrics.ResettingTimer: + t := metric.Snapshot() + + if len(t.Values()) > 0 { + ps := t.Percentiles([]float64{50, 95, 99}) + val := t.Values() + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.span", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "count": len(val), + "max": val[len(val)-1], + "mean": t.Mean(), + "min": val[0], + "p50": ps[0], + "p95": ps[1], + "p99": ps[2], + }, + Time: now, + }) + } + } + }) + + bps := client.BatchPoints{ + Points: pts, + Database: r.database, + } + + _, err := r.client.Write(bps) + return err +} |