aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/influxdata/influxdb/client/v2/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/influxdata/influxdb/client/v2/client.go')
-rw-r--r--vendor/github.com/influxdata/influxdb/client/v2/client.go635
1 files changed, 0 insertions, 635 deletions
diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client.go b/vendor/github.com/influxdata/influxdb/client/v2/client.go
deleted file mode 100644
index 77d44f2b3..000000000
--- a/vendor/github.com/influxdata/influxdb/client/v2/client.go
+++ /dev/null
@@ -1,635 +0,0 @@
-// Package client (v2) is the current official Go client for InfluxDB.
-package client // import "github.com/influxdata/influxdb/client/v2"
-
-import (
- "bytes"
- "crypto/tls"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "time"
-
- "github.com/influxdata/influxdb/models"
-)
-
-// HTTPConfig is the config data needed to create an HTTP Client.
-type HTTPConfig struct {
- // Addr should be of the form "http://host:port"
- // or "http://[ipv6-host%zone]:port".
- Addr string
-
- // Username is the influxdb username, optional.
- Username string
-
- // Password is the influxdb password, optional.
- Password string
-
- // UserAgent is the http User Agent, defaults to "InfluxDBClient".
- UserAgent string
-
- // Timeout for influxdb writes, defaults to no timeout.
- Timeout time.Duration
-
- // InsecureSkipVerify gets passed to the http client, if true, it will
- // skip https certificate verification. Defaults to false.
- InsecureSkipVerify bool
-
- // TLSConfig allows the user to set their own TLS config for the HTTP
- // Client. If set, this option overrides InsecureSkipVerify.
- TLSConfig *tls.Config
-}
-
-// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
-type BatchPointsConfig struct {
- // Precision is the write precision of the points, defaults to "ns".
- Precision string
-
- // Database is the database to write points to.
- Database string
-
- // RetentionPolicy is the retention policy of the points.
- RetentionPolicy string
-
- // Write consistency is the number of servers required to confirm write.
- WriteConsistency string
-}
-
-// Client is a client interface for writing & querying the database.
-type Client interface {
- // Ping checks that status of cluster, and will always return 0 time and no
- // error for UDP clients.
- Ping(timeout time.Duration) (time.Duration, string, error)
-
- // Write takes a BatchPoints object and writes all Points to InfluxDB.
- Write(bp BatchPoints) error
-
- // Query makes an InfluxDB Query on the database. This will fail if using
- // the UDP client.
- Query(q Query) (*Response, error)
-
- // Close releases any resources a Client may be using.
- Close() error
-}
-
-// NewHTTPClient returns a new Client from the provided config.
-// Client is safe for concurrent use by multiple goroutines.
-func NewHTTPClient(conf HTTPConfig) (Client, error) {
- if conf.UserAgent == "" {
- conf.UserAgent = "InfluxDBClient"
- }
-
- u, err := url.Parse(conf.Addr)
- if err != nil {
- return nil, err
- } else if u.Scheme != "http" && u.Scheme != "https" {
- m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
- " must start with http:// or https://", u.Scheme)
- return nil, errors.New(m)
- }
-
- tr := &http.Transport{
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: conf.InsecureSkipVerify,
- },
- }
- if conf.TLSConfig != nil {
- tr.TLSClientConfig = conf.TLSConfig
- }
- return &client{
- url: *u,
- username: conf.Username,
- password: conf.Password,
- useragent: conf.UserAgent,
- httpClient: &http.Client{
- Timeout: conf.Timeout,
- Transport: tr,
- },
- transport: tr,
- }, nil
-}
-
-// Ping will check to see if the server is up with an optional timeout on waiting for leader.
-// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
-func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
- now := time.Now()
- u := c.url
- u.Path = "ping"
-
- req, err := http.NewRequest("GET", u.String(), nil)
- if err != nil {
- return 0, "", err
- }
-
- req.Header.Set("User-Agent", c.useragent)
-
- if c.username != "" {
- req.SetBasicAuth(c.username, c.password)
- }
-
- if timeout > 0 {
- params := req.URL.Query()
- params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
- req.URL.RawQuery = params.Encode()
- }
-
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return 0, "", err
- }
- defer resp.Body.Close()
-
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return 0, "", err
- }
-
- if resp.StatusCode != http.StatusNoContent {
- var err = fmt.Errorf(string(body))
- return 0, "", err
- }
-
- version := resp.Header.Get("X-Influxdb-Version")
- return time.Since(now), version, nil
-}
-
-// Close releases the client's resources.
-func (c *client) Close() error {
- c.transport.CloseIdleConnections()
- return nil
-}
-
-// client is safe for concurrent use as the fields are all read-only
-// once the client is instantiated.
-type client struct {
- // N.B - if url.UserInfo is accessed in future modifications to the
- // methods on client, you will need to syncronise access to url.
- url url.URL
- username string
- password string
- useragent string
- httpClient *http.Client
- transport *http.Transport
-}
-
-// BatchPoints is an interface into a batched grouping of points to write into
-// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
-// batch for each goroutine.
-type BatchPoints interface {
- // AddPoint adds the given point to the Batch of points.
- AddPoint(p *Point)
- // AddPoints adds the given points to the Batch of points.
- AddPoints(ps []*Point)
- // Points lists the points in the Batch.
- Points() []*Point
-
- // Precision returns the currently set precision of this Batch.
- Precision() string
- // SetPrecision sets the precision of this batch.
- SetPrecision(s string) error
-
- // Database returns the currently set database of this Batch.
- Database() string
- // SetDatabase sets the database of this Batch.
- SetDatabase(s string)
-
- // WriteConsistency returns the currently set write consistency of this Batch.
- WriteConsistency() string
- // SetWriteConsistency sets the write consistency of this Batch.
- SetWriteConsistency(s string)
-
- // RetentionPolicy returns the currently set retention policy of this Batch.
- RetentionPolicy() string
- // SetRetentionPolicy sets the retention policy of this Batch.
- SetRetentionPolicy(s string)
-}
-
-// NewBatchPoints returns a BatchPoints interface based on the given config.
-func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
- if conf.Precision == "" {
- conf.Precision = "ns"
- }
- if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
- return nil, err
- }
- bp := &batchpoints{
- database: conf.Database,
- precision: conf.Precision,
- retentionPolicy: conf.RetentionPolicy,
- writeConsistency: conf.WriteConsistency,
- }
- return bp, nil
-}
-
-type batchpoints struct {
- points []*Point
- database string
- precision string
- retentionPolicy string
- writeConsistency string
-}
-
-func (bp *batchpoints) AddPoint(p *Point) {
- bp.points = append(bp.points, p)
-}
-
-func (bp *batchpoints) AddPoints(ps []*Point) {
- bp.points = append(bp.points, ps...)
-}
-
-func (bp *batchpoints) Points() []*Point {
- return bp.points
-}
-
-func (bp *batchpoints) Precision() string {
- return bp.precision
-}
-
-func (bp *batchpoints) Database() string {
- return bp.database
-}
-
-func (bp *batchpoints) WriteConsistency() string {
- return bp.writeConsistency
-}
-
-func (bp *batchpoints) RetentionPolicy() string {
- return bp.retentionPolicy
-}
-
-func (bp *batchpoints) SetPrecision(p string) error {
- if _, err := time.ParseDuration("1" + p); err != nil {
- return err
- }
- bp.precision = p
- return nil
-}
-
-func (bp *batchpoints) SetDatabase(db string) {
- bp.database = db
-}
-
-func (bp *batchpoints) SetWriteConsistency(wc string) {
- bp.writeConsistency = wc
-}
-
-func (bp *batchpoints) SetRetentionPolicy(rp string) {
- bp.retentionPolicy = rp
-}
-
-// Point represents a single data point.
-type Point struct {
- pt models.Point
-}
-
-// NewPoint returns a point with the given timestamp. If a timestamp is not
-// given, then data is sent to the database without a timestamp, in which case
-// the server will assign local time upon reception. NOTE: it is recommended to
-// send data with a timestamp.
-func NewPoint(
- name string,
- tags map[string]string,
- fields map[string]interface{},
- t ...time.Time,
-) (*Point, error) {
- var T time.Time
- if len(t) > 0 {
- T = t[0]
- }
-
- pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
- if err != nil {
- return nil, err
- }
- return &Point{
- pt: pt,
- }, nil
-}
-
-// String returns a line-protocol string of the Point.
-func (p *Point) String() string {
- return p.pt.String()
-}
-
-// PrecisionString returns a line-protocol string of the Point,
-// with the timestamp formatted for the given precision.
-func (p *Point) PrecisionString(precison string) string {
- return p.pt.PrecisionString(precison)
-}
-
-// Name returns the measurement name of the point.
-func (p *Point) Name() string {
- return string(p.pt.Name())
-}
-
-// Tags returns the tags associated with the point.
-func (p *Point) Tags() map[string]string {
- return p.pt.Tags().Map()
-}
-
-// Time return the timestamp for the point.
-func (p *Point) Time() time.Time {
- return p.pt.Time()
-}
-
-// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
-func (p *Point) UnixNano() int64 {
- return p.pt.UnixNano()
-}
-
-// Fields returns the fields for the point.
-func (p *Point) Fields() (map[string]interface{}, error) {
- return p.pt.Fields()
-}
-
-// NewPointFrom returns a point from the provided models.Point.
-func NewPointFrom(pt models.Point) *Point {
- return &Point{pt: pt}
-}
-
-func (c *client) Write(bp BatchPoints) error {
- var b bytes.Buffer
-
- for _, p := range bp.Points() {
- if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
- return err
- }
-
- if err := b.WriteByte('\n'); err != nil {
- return err
- }
- }
-
- u := c.url
- u.Path = "write"
- req, err := http.NewRequest("POST", u.String(), &b)
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "")
- req.Header.Set("User-Agent", c.useragent)
- if c.username != "" {
- req.SetBasicAuth(c.username, c.password)
- }
-
- params := req.URL.Query()
- params.Set("db", bp.Database())
- params.Set("rp", bp.RetentionPolicy())
- params.Set("precision", bp.Precision())
- params.Set("consistency", bp.WriteConsistency())
- req.URL.RawQuery = params.Encode()
-
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
-
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
-
- if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
- var err = fmt.Errorf(string(body))
- return err
- }
-
- return nil
-}
-
-// Query defines a query to send to the server.
-type Query struct {
- Command string
- Database string
- Precision string
- Chunked bool
- ChunkSize int
- Parameters map[string]interface{}
-}
-
-// NewQuery returns a query object.
-// The database and precision arguments can be empty strings if they are not needed for the query.
-func NewQuery(command, database, precision string) Query {
- return Query{
- Command: command,
- Database: database,
- Precision: precision,
- Parameters: make(map[string]interface{}),
- }
-}
-
-// NewQueryWithParameters returns a query object.
-// The database and precision arguments can be empty strings if they are not needed for the query.
-// parameters is a map of the parameter names used in the command to their values.
-func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
- return Query{
- Command: command,
- Database: database,
- Precision: precision,
- Parameters: parameters,
- }
-}
-
-// Response represents a list of statement results.
-type Response struct {
- Results []Result
- Err string `json:"error,omitempty"`
-}
-
-// Error returns the first error from any statement.
-// It returns nil if no errors occurred on any statements.
-func (r *Response) Error() error {
- if r.Err != "" {
- return fmt.Errorf(r.Err)
- }
- for _, result := range r.Results {
- if result.Err != "" {
- return fmt.Errorf(result.Err)
- }
- }
- return nil
-}
-
-// Message represents a user message.
-type Message struct {
- Level string
- Text string
-}
-
-// Result represents a resultset returned from a single statement.
-type Result struct {
- Series []models.Row
- Messages []*Message
- Err string `json:"error,omitempty"`
-}
-
-// Query sends a command to the server and returns the Response.
-func (c *client) Query(q Query) (*Response, error) {
- u := c.url
- u.Path = "query"
-
- jsonParameters, err := json.Marshal(q.Parameters)
-
- if err != nil {
- return nil, err
- }
-
- req, err := http.NewRequest("POST", u.String(), nil)
- if err != nil {
- return nil, err
- }
-
- req.Header.Set("Content-Type", "")
- req.Header.Set("User-Agent", c.useragent)
-
- if c.username != "" {
- req.SetBasicAuth(c.username, c.password)
- }
-
- params := req.URL.Query()
- params.Set("q", q.Command)
- params.Set("db", q.Database)
- params.Set("params", string(jsonParameters))
- if q.Chunked {
- params.Set("chunked", "true")
- if q.ChunkSize > 0 {
- params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
- }
- }
-
- if q.Precision != "" {
- params.Set("epoch", q.Precision)
- }
- req.URL.RawQuery = params.Encode()
-
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
-
- // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
- // but instead some other service. If the error code is also a 500+ code, then some
- // downstream loadbalancer/proxy/etc had an issue and we should report that.
- if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil || len(body) == 0 {
- return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
- }
-
- return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
- }
-
- // If we get an unexpected content type, then it is also not from influx direct and therefore
- // we want to know what we received and what status code was returned for debugging purposes.
- if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
- // Read up to 1kb of the body to help identify downstream errors and limit the impact of things
- // like downstream serving a large file
- body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
- if err != nil || len(body) == 0 {
- return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
- }
-
- return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
- }
-
- var response Response
- if q.Chunked {
- cr := NewChunkedResponse(resp.Body)
- for {
- r, err := cr.NextResponse()
- if err != nil {
- // If we got an error while decoding the response, send that back.
- return nil, err
- }
-
- if r == nil {
- break
- }
-
- response.Results = append(response.Results, r.Results...)
- if r.Err != "" {
- response.Err = r.Err
- break
- }
- }
- } else {
- dec := json.NewDecoder(resp.Body)
- dec.UseNumber()
- decErr := dec.Decode(&response)
-
- // ignore this error if we got an invalid status code
- if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
- decErr = nil
- }
- // If we got a valid decode error, send that back
- if decErr != nil {
- return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
- }
- }
-
- // If we don't have an error in our json response, and didn't get statusOK
- // then send back an error
- if resp.StatusCode != http.StatusOK && response.Error() == nil {
- return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
- }
- return &response, nil
-}
-
-// duplexReader reads responses and writes it to another writer while
-// satisfying the reader interface.
-type duplexReader struct {
- r io.Reader
- w io.Writer
-}
-
-func (r *duplexReader) Read(p []byte) (n int, err error) {
- n, err = r.r.Read(p)
- if err == nil {
- r.w.Write(p[:n])
- }
- return n, err
-}
-
-// ChunkedResponse represents a response from the server that
-// uses chunking to stream the output.
-type ChunkedResponse struct {
- dec *json.Decoder
- duplex *duplexReader
- buf bytes.Buffer
-}
-
-// NewChunkedResponse reads a stream and produces responses from the stream.
-func NewChunkedResponse(r io.Reader) *ChunkedResponse {
- resp := &ChunkedResponse{}
- resp.duplex = &duplexReader{r: r, w: &resp.buf}
- resp.dec = json.NewDecoder(resp.duplex)
- resp.dec.UseNumber()
- return resp
-}
-
-// NextResponse reads the next line of the stream and returns a response.
-func (r *ChunkedResponse) NextResponse() (*Response, error) {
- var response Response
-
- if err := r.dec.Decode(&response); err != nil {
- if err == io.EOF {
- return nil, nil
- }
- // A decoding error happened. This probably means the server crashed
- // and sent a last-ditch error message to us. Ensure we have read the
- // entirety of the connection to get any remaining error text.
- io.Copy(ioutil.Discard, r.duplex)
- return nil, errors.New(strings.TrimSpace(r.buf.String()))
- }
-
- r.buf.Reset()
- return &response, nil
-}