diff --git a/Godeps b/Godeps index 271e4c47f2f..762afc010f6 100644 --- a/Godeps +++ b/Godeps @@ -20,3 +20,4 @@ github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6 github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577 golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd golang.org/x/sys 90796e5a05ce440b41c768bd9af257005e470461 +github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133 diff --git a/prometheus/converters.go b/prometheus/converters.go index ba1db0fa8fa..91e90aa35ab 100644 --- a/prometheus/converters.go +++ b/prometheus/converters.go @@ -12,11 +12,11 @@ import ( ) const ( - // MeasurementName is where all prometheus time series go to - MeasurementName = "_" + // measurementName is where all prometheus time series go to + measurementName = "_" - // FieldName is the field all prometheus values get written to - FieldName = "f64" + // fieldName is the field all prometheus values get written to + fieldName = "f64" ) var ErrNaNDropped = errors.New("") @@ -24,7 +24,12 @@ var ErrNaNDropped = errors.New("") // WriteRequestToPoints converts a Prometheus remote write request of time series and their // samples into Points that can be written into Influx func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { - var points []models.Point + var maxPoints int + for _, ts := range req.Timeseries { + maxPoints += len(ts.Samples) + } + points := make([]models.Point, 0, maxPoints) + var droppedNaN error for _, ts := range req.Timeseries { @@ -42,8 +47,8 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { // convert and append t := time.Unix(0, s.TimestampMs*int64(time.Millisecond)) - fields := map[string]interface{}{FieldName: s.Value} - p, err := models.NewPoint(MeasurementName, models.NewTags(tags), fields, t) + fields := map[string]interface{}{fieldName: s.Value} + p, err := models.NewPoint(measurementName, models.NewTags(tags), fields, t) if err != nil { return nil, err } @@ -67,10 +72,10 @@ func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influx stmt := &influxql.SelectStatement{ IsRawQuery: true, Fields: []*influxql.Field{ - {Expr: &influxql.VarRef{Val: FieldName}}, + {Expr: &influxql.VarRef{Val: fieldName}}, }, Sources: []influxql.Source{&influxql.Measurement{ - Name: MeasurementName, + Name: measurementName, Database: db, RetentionPolicy: rp, }}, @@ -150,7 +155,7 @@ func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influx // TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair { - var pairs []*remote.LabelPair + pairs := make([]*remote.LabelPair, 0, len(tags)) for k, v := range tags { if v == "" { // If we select metrics with different sets of labels names, diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 243741394f9..0623b3d5c71 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -851,20 +851,19 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me reqBuf, err := snappy.Decode(nil, buf.Bytes()) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } // Convert the Prometheus remote write request to Influx Points var req remote.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } points, err := prometheus.WriteRequestToPoints(&req) if err != nil { - h.Logger.Info(fmt.Sprintf("promerr: %s", err.Error())) if h.Config.WriteTracing { h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error())) } @@ -879,7 +878,6 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me level := r.URL.Query().Get("consistency") consistency := models.ConsistencyLevelOne if level != "" { - var err error consistency, err = models.ParseConsistencyLevel(level) if err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) @@ -916,27 +914,19 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) { compressed, err := ioutil.ReadAll(r.Body) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, err.Error(), http.StatusInternalServerError) return } reqBuf, err := snappy.Decode(nil, compressed) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } var req remote.ReadRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if len(req.Queries) == 0 { - http.Error(w, "no prometheus queries passed", http.StatusBadRequest) - return - } else if len(req.Queries) > 1 { - http.Error(w, "only one prometheus remote query is supported per request", http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } @@ -944,7 +934,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met db := r.FormValue("db") q, err := prometheus.ReadRequestToInfluxQLQuery(&req, db, r.FormValue("rp")) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } @@ -1019,10 +1009,19 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met } for _, v := range s.Values { - timestamp := v[0].(time.Time).UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) + t, ok := v[0].(time.Time) + if !ok { + h.httpError(w, fmt.Sprintf("value %v wasn't a time", v[0]), http.StatusBadRequest) + return + } + val, ok := v[1].(float64) + if !ok { + h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest) + } + timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) ts.Samples = append(ts.Samples, &remote.Sample{ TimestampMs: timestamp, - Value: v[1].(float64), + Value: val, }) } @@ -1032,7 +1031,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met data, err := proto.Marshal(resp) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, err.Error(), http.StatusInternalServerError) return } @@ -1041,7 +1040,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met compressed = snappy.Encode(nil, data) if _, err := w.Write(compressed); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, err.Error(), http.StatusInternalServerError) return } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 89bcdf79da5..dbe61cec3fd 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -561,8 +561,8 @@ func TestHandler_PromWrite(t *testing.T) { h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { called = true point := points[0] - if point.UnixNano() != 1*int64(time.Millisecond) { - t.Fatalf("Exp point time %d but got %d", 1*int64(time.Millisecond), point.UnixNano()) + if point.UnixNano() != int64(time.Millisecond) { + t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano()) } tags := point.Tags() expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}}