Skip to content

Commit

Permalink
Add Prometheus to Godeps and address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Sep 4, 2017
1 parent 0e0e5f1 commit f0ffe20
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 32 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6
github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
golang.org/x/sys 90796e5a05ce440b41c768bd9af257005e470461
github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133
25 changes: 15 additions & 10 deletions prometheus/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@ import (
)

const (
// MeasurementName is where all prometheus time series go to
MeasurementName = "_"
// measurementName is where all prometheus time series go to
measurementName = "_"

// FieldName is the field all prometheus values get written to
FieldName = "f64"
// fieldName is the field all prometheus values get written to
fieldName = "f64"
)

var ErrNaNDropped = errors.New("")

// WriteRequestToPoints converts a Prometheus remote write request of time series and their
// samples into Points that can be written into Influx
func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
var points []models.Point
var maxPoints int
for _, ts := range req.Timeseries {
maxPoints += len(ts.Samples)
}
points := make([]models.Point, 0, maxPoints)

var droppedNaN error

for _, ts := range req.Timeseries {
Expand All @@ -42,8 +47,8 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {

// convert and append
t := time.Unix(0, s.TimestampMs*int64(time.Millisecond))
fields := map[string]interface{}{FieldName: s.Value}
p, err := models.NewPoint(MeasurementName, models.NewTags(tags), fields, t)
fields := map[string]interface{}{fieldName: s.Value}
p, err := models.NewPoint(measurementName, models.NewTags(tags), fields, t)
if err != nil {
return nil, err
}
Expand All @@ -67,10 +72,10 @@ func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influx
stmt := &influxql.SelectStatement{
IsRawQuery: true,
Fields: []*influxql.Field{
{Expr: &influxql.VarRef{Val: FieldName}},
{Expr: &influxql.VarRef{Val: fieldName}},
},
Sources: []influxql.Source{&influxql.Measurement{
Name: MeasurementName,
Name: measurementName,
Database: db,
RetentionPolicy: rp,
}},
Expand Down Expand Up @@ -150,7 +155,7 @@ func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influx

// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
var pairs []*remote.LabelPair
pairs := make([]*remote.LabelPair, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
Expand Down
39 changes: 19 additions & 20 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,20 +851,19 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me

reqBuf, err := snappy.Decode(nil, buf.Bytes())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}

// Convert the Prometheus remote write request to Influx Points
var req remote.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}

points, err := prometheus.WriteRequestToPoints(&req)
if err != nil {
h.Logger.Info(fmt.Sprintf("promerr: %s", err.Error()))
if h.Config.WriteTracing {
h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error()))
}
Expand All @@ -879,7 +878,6 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
level := r.URL.Query().Get("consistency")
consistency := models.ConsistencyLevelOne
if level != "" {
var err error
consistency, err = models.ParseConsistencyLevel(level)
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -916,35 +914,27 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}

var req remote.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if len(req.Queries) == 0 {
http.Error(w, "no prometheus queries passed", http.StatusBadRequest)
return
} else if len(req.Queries) > 1 {
http.Error(w, "only one prometheus remote query is supported per request", http.StatusBadRequest)
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}

// Query the DB and create a ReadResponse for Prometheus
db := r.FormValue("db")
q, err := prometheus.ReadRequestToInfluxQLQuery(&req, db, r.FormValue("rp"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}

Expand Down Expand Up @@ -1019,10 +1009,19 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
}

for _, v := range s.Values {
timestamp := v[0].(time.Time).UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond)
t, ok := v[0].(time.Time)
if !ok {
h.httpError(w, fmt.Sprintf("value %v wasn't a time", v[0]), http.StatusBadRequest)
return
}
val, ok := v[1].(float64)
if !ok {
h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest)
}
timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond)
ts.Samples = append(ts.Samples, &remote.Sample{
TimestampMs: timestamp,
Value: v[1].(float64),
Value: val,
})
}

Expand All @@ -1032,7 +1031,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met

data, err := proto.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

Expand All @@ -1041,7 +1040,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met

compressed = snappy.Encode(nil, data)
if _, err := w.Write(compressed); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

Expand Down
4 changes: 2 additions & 2 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ func TestHandler_PromWrite(t *testing.T) {
h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error {
called = true
point := points[0]
if point.UnixNano() != 1*int64(time.Millisecond) {
t.Fatalf("Exp point time %d but got %d", 1*int64(time.Millisecond), point.UnixNano())
if point.UnixNano() != int64(time.Millisecond) {
t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano())
}
tags := point.Tags()
expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}
Expand Down

0 comments on commit f0ffe20

Please sign in to comment.