Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Prometheus remote read and write API. #8784

Merged
merged 8 commits into from
Sep 7, 2017
Merged
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac
github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce
github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6
Expand Down
1 change: 1 addition & 0 deletions LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/prometheus/prometheus [APACHE LICENSE](https://github.com/prometheus/prometheus/blob/master/LICENSE)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
- github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt)
Expand Down
175 changes: 175 additions & 0 deletions prometheus/converters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package prometheus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be under services/prometheus to match the location of other packages such as opentsdb, graphite etc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I didn't put it under there because there wasn't an associated service running. I think of things under services as things that have network listeners.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point.


import (
"errors"
"fmt"
"math"
"time"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/prometheus/prometheus/storage/remote"
)

const (
// measurementName is where all prometheus time series go to
measurementName = "_"

// fieldName is the field all prometheus values get written to
fieldName = "f64"
)

var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported")

// WriteRequestToPoints converts a Prometheus remote write request of time series and their
// samples into Points that can be written into Influx
func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
var maxPoints int
for _, ts := range req.Timeseries {
maxPoints += len(ts.Samples)
}
points := make([]models.Point, 0, maxPoints)

var droppedNaN error

for _, ts := range req.Timeseries {
tags := make(map[string]string, len(ts.Labels))
for _, l := range ts.Labels {
tags[l.Name] = l.Value
}

for _, s := range ts.Samples {
// skip NaN values, which are valid in Prometheus
if math.IsNaN(s.Value) {
droppedNaN = ErrNaNDropped
continue
}

// convert and append
t := time.Unix(0, s.TimestampMs*int64(time.Millisecond))
fields := map[string]interface{}{fieldName: s.Value}
p, err := models.NewPoint(measurementName, models.NewTags(tags), fields, t)
if err != nil {
return nil, err
}

points = append(points, p)
}
}
return points, droppedNaN
}

// ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL
// query that will return the requested data when executed
func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) {
if len(req.Queries) != 1 {
return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
}
promQuery := req.Queries[0]

q := &influxql.Query{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be easier to just create this at the bottom when you need it with a literal. The literal will likely allocate less memory to boot since it doesn't use append.


stmt := &influxql.SelectStatement{
IsRawQuery: true,
Fields: []*influxql.Field{
{Expr: &influxql.VarRef{Val: fieldName}},
},
Sources: []influxql.Source{&influxql.Measurement{
Name: measurementName,
Database: db,
RetentionPolicy: rp,
}},
Dimensions: []*influxql.Dimension{{Expr: &influxql.Wildcard{}}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a Wildcard expression only used when doing a SELECT *?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also used when doing a GROUP BY *, which this needs to do.

}

cond, err := condFromMatchers(promQuery, promQuery.Matchers)
if err != nil {
return nil, err
}

stmt.Condition = cond
q.Statements = append(q.Statements, stmt)

return q, nil
}

// condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr
func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) {
var op influxql.Token
switch m.Type {
case remote.MatchType_EQUAL:
op = influxql.EQ
case remote.MatchType_NOT_EQUAL:
op = influxql.NEQ
case remote.MatchType_REGEX_MATCH:
op = influxql.EQREGEX
case remote.MatchType_REGEX_NO_MATCH:
op = influxql.NEQREGEX
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
}

return &influxql.BinaryExpr{
Op: op,
LHS: &influxql.VarRef{Val: m.Name},
RHS: &influxql.StringLiteral{Val: m.Value},
}, nil
}

// condFromMatchers converts a Prometheus remote query and a collection of Prometheus label matchers
// into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus
// remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels
// are kept equivalent.
func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) {
if len(matchers) > 0 {
lhs, err := condFromMatcher(matchers[0])
if err != nil {
return nil, err
}
rhs, err := condFromMatchers(q, matchers[1:])
if err != nil {
return nil, err
}

return &influxql.BinaryExpr{
Op: influxql.AND,
LHS: lhs,
RHS: rhs,
}, nil
}

return &influxql.BinaryExpr{
Op: influxql.AND,
LHS: &influxql.BinaryExpr{
Op: influxql.GTE,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond))},
},
RHS: &influxql.BinaryExpr{
Op: influxql.LTE,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond))},
},
}, nil
}

// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
// InfluxDB returns *all* possible tag names on all returned
// series, with empty tag values on series where they don't
// apply. In Prometheus, an empty label value is equivalent
// to a non-existent label, so we just skip empty ones here
// to make the result correct.
continue
}
pairs = append(pairs, &remote.LabelPair{
Name: k,
Value: v,
})
}
return pairs
}
Loading