From 0e0e5f1a372e87c985f4fb9cacd92ba8ab9f8944 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sun, 3 Sep 2017 17:00:55 -0400 Subject: [PATCH 1/8] Add support for Prometheus remote read and write API. Adds a new package prometheus for converting from remote reads and writes to Influx queries and points. Adds two new endpoints to the httpd handler to support prometheus remote read at /api/v1/prom/read and remote write at /api/v1/prom/write. --- prometheus/converters.go | 170 +++++++++++++++++++ services/httpd/handler.go | 288 +++++++++++++++++++++++++++++++++ services/httpd/handler_test.go | 136 ++++++++++++++++ services/httpd/service.go | 4 + 4 files changed, 598 insertions(+) create mode 100644 prometheus/converters.go diff --git a/prometheus/converters.go b/prometheus/converters.go new file mode 100644 index 00000000000..ba1db0fa8fa --- /dev/null +++ b/prometheus/converters.go @@ -0,0 +1,170 @@ +package prometheus + +import ( + "errors" + "fmt" + "math" + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/prometheus/prometheus/storage/remote" +) + +const ( + // MeasurementName is where all prometheus time series go to + MeasurementName = "_" + + // FieldName is the field all prometheus values get written to + FieldName = "f64" +) + +var ErrNaNDropped = errors.New("") + +// WriteRequestToPoints converts a Prometheus remote write request of time series and their +// samples into Points that can be written into Influx +func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { + var points []models.Point + var droppedNaN error + + for _, ts := range req.Timeseries { + tags := make(map[string]string, len(ts.Labels)) + for _, l := range ts.Labels { + tags[l.Name] = l.Value + } + + for _, s := range ts.Samples { + // skip NaN values, which are valid in Prometheus + if math.IsNaN(s.Value) { + droppedNaN = ErrNaNDropped + continue + } + + // convert and append + t := time.Unix(0, s.TimestampMs*int64(time.Millisecond)) + fields := map[string]interface{}{FieldName: s.Value} + p, err := models.NewPoint(MeasurementName, models.NewTags(tags), fields, t) + if err != nil { + return nil, err + } + + points = append(points, p) + } + } + return points, droppedNaN +} + +// ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL +// query that will return the requested data when executed +func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) { + if len(req.Queries) != 1 { + return nil, errors.New("Prometheus read endpoint currently only supports one query at a time") + } + promQuery := req.Queries[0] + + q := &influxql.Query{} + + stmt := &influxql.SelectStatement{ + IsRawQuery: true, + Fields: []*influxql.Field{ + {Expr: &influxql.VarRef{Val: FieldName}}, + }, + Sources: []influxql.Source{&influxql.Measurement{ + Name: MeasurementName, + Database: db, + RetentionPolicy: rp, + }}, + Dimensions: []*influxql.Dimension{{Expr: &influxql.Wildcard{}}}, + } + + cond, err := condFromMatchers(promQuery, promQuery.Matchers) + if err != nil { + return nil, err + } + + stmt.Condition = cond + q.Statements = append(q.Statements, stmt) + + return q, nil +} + +// condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr +func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) { + var op influxql.Token + switch m.Type { + case remote.MatchType_EQUAL: + op = influxql.EQ + case remote.MatchType_NOT_EQUAL: + op = influxql.NEQ + case remote.MatchType_REGEX_MATCH: + op = influxql.EQREGEX + case remote.MatchType_REGEX_NO_MATCH: + op = influxql.NEQREGEX + default: + return nil, fmt.Errorf("unknown match type %v", m.Type) + } + + return &influxql.BinaryExpr{ + Op: op, + LHS: &influxql.VarRef{Val: m.Name}, + RHS: &influxql.StringLiteral{Val: m.Value}, + }, nil +} + +// condFromMatchers converts a Prometheus remote query and a collection of Prometheus label matchers +// into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus +// remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels +// are kept equivalent. +func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) { + if len(matchers) > 0 { + lhs, err := condFromMatcher(matchers[0]) + if err != nil { + return nil, err + } + rhs, err := condFromMatchers(q, matchers[1:]) + if err != nil { + return nil, err + } + + return &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: lhs, + RHS: rhs, + }, nil + } + + return &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: &influxql.BinaryExpr{ + Op: influxql.GTE, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond))}, + }, + RHS: &influxql.BinaryExpr{ + Op: influxql.LTE, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.TimeLiteral{Val: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond))}, + }, + }, nil +} + +// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs +func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair { + var pairs []*remote.LabelPair + for k, v := range tags { + if v == "" { + // If we select metrics with different sets of labels names, + // InfluxDB returns *all* possible tag names on all returned + // series, with empty tag values on series where they don't + // apply. In Prometheus, an empty label value is equivalent + // to a non-existent label, so we just skip empty ones here + // to make the result correct. + continue + } + pairs = append(pairs, &remote.LabelPair{ + Name: k, + Value: v, + }) + } + return pairs +} diff --git a/services/httpd/handler.go b/services/httpd/handler.go index b835b7c2701..243741394f9 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -8,6 +8,7 @@ import ( "expvar" "fmt" "io" + "io/ioutil" "log" "math" "net/http" @@ -20,15 +21,19 @@ import ( "github.com/bmizerany/pat" "github.com/dgrijalva/jwt-go" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/monitor/diagnostics" + "github.com/influxdata/influxdb/prometheus" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/uuid" + "github.com/prometheus/prometheus/storage/remote" "github.com/uber-go/zap" ) @@ -141,6 +146,14 @@ func NewHandler(c Config) *Handler { "write", // Data-ingest route. "POST", "/write", true, true, h.serveWrite, }, + Route{ + "prometheus-write", // Prometheus remote write + "POST", "/api/v1/prom/write", false, true, h.servePromWrite, + }, + Route{ + "prometheus-read", // Prometheus remote read + "POST", "/api/v1/prom/read", true, true, h.servePromRead, + }, Route{ // Ping "ping", "GET", "/ping", false, true, h.servePing, @@ -184,6 +197,8 @@ type Statistics struct { ClientErrors int64 ServerErrors int64 RecoveredPanics int64 + PromWriteRequests int64 + PromReadRequests int64 } // Statistics returns statistics for periodic monitoring. @@ -211,6 +226,8 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic { statClientError: atomic.LoadInt64(&h.stats.ClientErrors), statServerError: atomic.LoadInt64(&h.stats.ServerErrors), statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics), + statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests), + statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests), }, }} } @@ -760,6 +777,277 @@ func convertToEpoch(r *query.Result, epoch string) { } } +// servePromWrite receives data in the Prometheus remote write protocol and writes it +// to the database +func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) { + atomic.AddInt64(&h.stats.WriteRequests, 1) + atomic.AddInt64(&h.stats.ActiveWriteRequests, 1) + atomic.AddInt64(&h.stats.PromWriteRequests, 1) + defer func(start time.Time) { + atomic.AddInt64(&h.stats.ActiveWriteRequests, -1) + atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds()) + }(time.Now()) + h.requestTracker.Add(r, user) + + database := r.URL.Query().Get("db") + if database == "" { + h.httpError(w, "database is required", http.StatusBadRequest) + return + } + + if di := h.MetaClient.Database(database); di == nil { + h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound) + return + } + + if h.Config.AuthEnabled { + if user == nil { + h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden) + return + } + + if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil { + h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden) + return + } + } + + body := r.Body + if h.Config.MaxBodySize > 0 { + body = truncateReader(body, int64(h.Config.MaxBodySize)) + } + + var bs []byte + if r.ContentLength > 0 { + if h.Config.MaxBodySize > 0 && r.ContentLength > int64(h.Config.MaxBodySize) { + h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + return + } + + // This will just be an initial hint for the reader, as the + // bytes.Buffer will grow as needed when ReadFrom is called + bs = make([]byte, 0, r.ContentLength) + } + buf := bytes.NewBuffer(bs) + + _, err := buf.ReadFrom(body) + if err != nil { + if err == errTruncated { + h.httpError(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + return + } + + if h.Config.WriteTracing { + h.Logger.Info("Prom write handler unable to read bytes from request body") + } + h.httpError(w, err.Error(), http.StatusBadRequest) + return + } + atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len())) + + if h.Config.WriteTracing { + h.Logger.Info(fmt.Sprintf("Prom write body received by handler: %s", buf.Bytes())) + } + + reqBuf, err := snappy.Decode(nil, buf.Bytes()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Convert the Prometheus remote write request to Influx Points + var req remote.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + points, err := prometheus.WriteRequestToPoints(&req) + if err != nil { + h.Logger.Info(fmt.Sprintf("promerr: %s", err.Error())) + if h.Config.WriteTracing { + h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error())) + } + + if err != prometheus.ErrNaNDropped { + h.httpError(w, err.Error(), http.StatusBadRequest) + return + } + } + + // Determine required consistency level. + level := r.URL.Query().Get("consistency") + consistency := models.ConsistencyLevelOne + if level != "" { + var err error + consistency, err = models.ParseConsistencyLevel(level) + if err != nil { + h.httpError(w, err.Error(), http.StatusBadRequest) + return + } + } + + // Write points. + if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err) { + atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) + h.httpError(w, err.Error(), http.StatusBadRequest) + return + } else if influxdb.IsAuthorizationError(err) { + atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) + h.httpError(w, err.Error(), http.StatusForbidden) + return + } else if werr, ok := err.(tsdb.PartialWriteError); ok { + atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)-werr.Dropped)) + atomic.AddInt64(&h.stats.PointsWrittenDropped, int64(werr.Dropped)) + h.httpError(w, werr.Error(), http.StatusBadRequest) + return + } else if err != nil { + atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points))) + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points))) + h.writeHeader(w, http.StatusNoContent) +} + +// servePromRead will convert a Prometheus remote read request into an InfluxQL query and +// return data in Prometheus remote read protobuf format. +func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req remote.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if len(req.Queries) == 0 { + http.Error(w, "no prometheus queries passed", http.StatusBadRequest) + return + } else if len(req.Queries) > 1 { + http.Error(w, "only one prometheus remote query is supported per request", http.StatusBadRequest) + return + } + + // Query the DB and create a ReadResponse for Prometheus + db := r.FormValue("db") + q, err := prometheus.ReadRequestToInfluxQLQuery(&req, db, r.FormValue("rp")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Check authorization. + if h.Config.AuthEnabled { + if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil { + if err, ok := err.(meta.ErrAuthorize); ok { + h.Logger.Info(fmt.Sprintf("Unauthorized request | user: %q | query: %q | database %q", err.User, err.Query.String(), err.Database)) + } + h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden) + return + } + } + + opts := query.ExecutionOptions{ + Database: db, + ChunkSize: DefaultChunkSize, + ReadOnly: true, + } + + if h.Config.AuthEnabled { + // The current user determines the authorized actions. + opts.Authorizer = user + } else { + // Auth is disabled, so allow everything. + opts.Authorizer = query.OpenAuthorizer{} + } + + // Make sure if the client disconnects we signal the query to abort + var closing chan struct{} + closing = make(chan struct{}) + if notifier, ok := w.(http.CloseNotifier); ok { + // CloseNotify() is not guaranteed to send a notification when the query + // is closed. Use this channel to signal that the query is finished to + // prevent lingering goroutines that may be stuck. + done := make(chan struct{}) + defer close(done) + + notify := notifier.CloseNotify() + go func() { + // Wait for either the request to finish + // or for the client to disconnect + select { + case <-done: + case <-notify: + close(closing) + } + }() + opts.AbortCh = done + } else { + defer close(closing) + } + + // Execute query. + results := h.QueryExecutor.ExecuteQuery(q, opts, closing) + + resp := &remote.ReadResponse{ + Results: []*remote.QueryResult{{}}, + } + + // pull all results from the channel + for r := range results { + // Ignore nil results. + if r == nil { + continue + } + + // read the series data and convert into Prometheus samples + for _, s := range r.Series { + ts := &remote.TimeSeries{ + Labels: prometheus.TagsToLabelPairs(s.Tags), + } + + for _, v := range s.Values { + timestamp := v[0].(time.Time).UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) + ts.Samples = append(ts.Samples, &remote.Sample{ + TimestampMs: timestamp, + Value: v[1].(float64), + }) + } + + resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts) + } + } + + data, err := proto.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed = snappy.Encode(nil, data) + if _, err := w.Write(compressed); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed))) +} + // serveExpvar serves internal metrics in /debug/vars format over HTTP. func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { // Retrieve statistics from the monitor. diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 530734d2953..89bcdf79da5 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -6,21 +6,26 @@ import ( "fmt" "io" "log" + "math" "mime/multipart" "net/http" "net/http/httptest" "net/url" + "reflect" "strings" "testing" "time" "github.com/dgrijalva/jwt-go" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" "github.com/influxdata/influxdb/services/meta" + "github.com/prometheus/prometheus/storage/remote" ) // Ensure the handler returns results from a query (including nil results). @@ -524,6 +529,137 @@ func TestHandler_Query_CloseNotify(t *testing.T) { } } +// Ensure the prometheus remote write works +func TestHandler_PromWrite(t *testing.T) { + req := &remote.WriteRequest{ + Timeseries: []*remote.TimeSeries{ + { + Labels: []*remote.LabelPair{ + {Name: "host", Value: "a"}, + {Name: "region", Value: "west"}, + }, + Samples: []*remote.Sample{ + {TimestampMs: 1, Value: 1.2}, + {TimestampMs: 2, Value: math.NaN()}, + }, + }, + }, + } + + data, err := proto.Marshal(req) + if err != nil { + t.Fatal("couldn't marshal prometheus request") + } + compressed := snappy.Encode(nil, data) + + b := bytes.NewReader(compressed) + h := NewHandler(false) + h.MetaClient.DatabaseFn = func(name string) *meta.DatabaseInfo { + return &meta.DatabaseInfo{} + } + called := false + h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { + called = true + point := points[0] + if point.UnixNano() != 1*int64(time.Millisecond) { + t.Fatalf("Exp point time %d but got %d", 1*int64(time.Millisecond), point.UnixNano()) + } + tags := point.Tags() + expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}} + if !reflect.DeepEqual(tags, expectedTags) { + t.Fatalf("tags don't match\n\texp: %v\n\tgot: %v", expectedTags, tags) + } + + fields, err := point.Fields() + if err != nil { + t.Fatal(err.Error()) + } + expFields := models.Fields{"f64": 1.2} + if !reflect.DeepEqual(fields, expFields) { + t.Fatalf("fields don't match\n\texp: %v\n\tgot: %v", expFields, fields) + } + return nil + } + + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewRequest("POST", "/api/v1/prom/write?db=foo", b)) + if !called { + t.Fatal("WritePoints: expected call") + } + if w.Code != http.StatusNoContent { + t.Fatalf("unexpected status: %d", w.Code) + } +} + +// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and +// data is returned +func TestHandler_PromRead(t *testing.T) { + req := &remote.ReadRequest{ + Queries: []*remote.Query{{ + Matchers: []*remote.LabelMatcher{ + {Type: remote.MatchType_EQUAL, Name: "eq", Value: "a"}, + {Type: remote.MatchType_NOT_EQUAL, Name: "neq", Value: "b"}, + {Type: remote.MatchType_REGEX_MATCH, Name: "regex", Value: "c"}, + {Type: remote.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"}, + }, + StartTimestampMs: 1, + EndTimestampMs: 2, + }}, + } + data, err := proto.Marshal(req) + if err != nil { + t.Fatal("couldn't marshal prometheus request") + } + compressed := snappy.Encode(nil, data) + b := bytes.NewReader(compressed) + + h := NewHandler(false) + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx query.ExecutionContext) error { + if stmt.String() != `SELECT f64 FROM foo.._ WHERE eq = 'a' AND neq != 'b' AND regex =~ 'c' AND neqregex !~ 'd' AND time >= '1970-01-01T00:00:00.001Z' AND time <= '1970-01-01T00:00:00.002Z' GROUP BY *` { + t.Fatalf("unexpected query: %s", stmt.String()) + } else if ctx.Database != `foo` { + t.Fatalf("unexpected db: %s", ctx.Database) + } + row := &models.Row{ + Name: "_", + Tags: map[string]string{"foo": "bar"}, + Columns: []string{"time", "f64"}, + Values: [][]interface{}{{time.Unix(23, 0), 1.2}}, + } + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{row})} + return nil + } + + w := httptest.NewRecorder() + + h.ServeHTTP(w, MustNewJSONRequest("POST", "/api/v1/prom/read?db=foo", b)) + if w.Code != http.StatusOK { + t.Fatalf("unexpected status: %d", w.Code) + } + + reqBuf, err := snappy.Decode(nil, w.Body.Bytes()) + if err != nil { + t.Fatal(err.Error()) + } + + var resp remote.ReadResponse + if err := proto.Unmarshal(reqBuf, &resp); err != nil { + t.Fatal(err.Error()) + } + + expLabels := []*remote.LabelPair{{Name: "foo", Value: "bar"}} + expSamples := []*remote.Sample{{TimestampMs: 23000, Value: 1.2}} + + ts := resp.Results[0].Timeseries[0] + + if !reflect.DeepEqual(expLabels, ts.Labels) { + t.Fatalf("unexpected labels\n\texp: %v\n\tgot: %v", expLabels, ts.Labels) + } + if !reflect.DeepEqual(expSamples, ts.Samples) { + t.Fatalf("unexpectd samples\n\texp: %v\n\tgot: %v", expSamples, ts.Samples) + } +} + // Ensure the handler handles ping requests correctly. // TODO: This should be expanded to verify the MetaClient check in servePing is working correctly func TestHandler_Ping(t *testing.T) { diff --git a/services/httpd/service.go b/services/httpd/service.go index 74eba09bfc0..c4c88090fc4 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -38,6 +38,10 @@ const ( statClientError = "clientError" // Number of HTTP responses due to client error. statServerError = "serverError" // Number of HTTP responses due to server error. statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler. + + // Prometheus stats + statPromWriteRequest = "promWriteReq" // Number of write requests to the promtheus endpoint + statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint ) // Service manages the listener and handler for an HTTP endpoint. From f0ffe201615dc9d75c9a6fd064faa076eb99eb16 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 4 Sep 2017 12:34:53 -0400 Subject: [PATCH 2/8] Add Prometheus to Godeps and address PR comments. --- Godeps | 1 + prometheus/converters.go | 25 +++++++++++++--------- services/httpd/handler.go | 39 +++++++++++++++++----------------- services/httpd/handler_test.go | 4 ++-- 4 files changed, 37 insertions(+), 32 deletions(-) diff --git a/Godeps b/Godeps index 271e4c47f2f..762afc010f6 100644 --- a/Godeps +++ b/Godeps @@ -20,3 +20,4 @@ github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6 github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577 golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd golang.org/x/sys 90796e5a05ce440b41c768bd9af257005e470461 +github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133 diff --git a/prometheus/converters.go b/prometheus/converters.go index ba1db0fa8fa..91e90aa35ab 100644 --- a/prometheus/converters.go +++ b/prometheus/converters.go @@ -12,11 +12,11 @@ import ( ) const ( - // MeasurementName is where all prometheus time series go to - MeasurementName = "_" + // measurementName is where all prometheus time series go to + measurementName = "_" - // FieldName is the field all prometheus values get written to - FieldName = "f64" + // fieldName is the field all prometheus values get written to + fieldName = "f64" ) var ErrNaNDropped = errors.New("") @@ -24,7 +24,12 @@ var ErrNaNDropped = errors.New("") // WriteRequestToPoints converts a Prometheus remote write request of time series and their // samples into Points that can be written into Influx func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { - var points []models.Point + var maxPoints int + for _, ts := range req.Timeseries { + maxPoints += len(ts.Samples) + } + points := make([]models.Point, 0, maxPoints) + var droppedNaN error for _, ts := range req.Timeseries { @@ -42,8 +47,8 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { // convert and append t := time.Unix(0, s.TimestampMs*int64(time.Millisecond)) - fields := map[string]interface{}{FieldName: s.Value} - p, err := models.NewPoint(MeasurementName, models.NewTags(tags), fields, t) + fields := map[string]interface{}{fieldName: s.Value} + p, err := models.NewPoint(measurementName, models.NewTags(tags), fields, t) if err != nil { return nil, err } @@ -67,10 +72,10 @@ func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influx stmt := &influxql.SelectStatement{ IsRawQuery: true, Fields: []*influxql.Field{ - {Expr: &influxql.VarRef{Val: FieldName}}, + {Expr: &influxql.VarRef{Val: fieldName}}, }, Sources: []influxql.Source{&influxql.Measurement{ - Name: MeasurementName, + Name: measurementName, Database: db, RetentionPolicy: rp, }}, @@ -150,7 +155,7 @@ func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influx // TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair { - var pairs []*remote.LabelPair + pairs := make([]*remote.LabelPair, 0, len(tags)) for k, v := range tags { if v == "" { // If we select metrics with different sets of labels names, diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 243741394f9..0623b3d5c71 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -851,20 +851,19 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me reqBuf, err := snappy.Decode(nil, buf.Bytes()) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } // Convert the Prometheus remote write request to Influx Points var req remote.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } points, err := prometheus.WriteRequestToPoints(&req) if err != nil { - h.Logger.Info(fmt.Sprintf("promerr: %s", err.Error())) if h.Config.WriteTracing { h.Logger.Info(fmt.Sprintf("Prom write handler: %s", err.Error())) } @@ -879,7 +878,6 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me level := r.URL.Query().Get("consistency") consistency := models.ConsistencyLevelOne if level != "" { - var err error consistency, err = models.ParseConsistencyLevel(level) if err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) @@ -916,27 +914,19 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) { compressed, err := ioutil.ReadAll(r.Body) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, err.Error(), http.StatusInternalServerError) return } reqBuf, err := snappy.Decode(nil, compressed) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } var req remote.ReadRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if len(req.Queries) == 0 { - http.Error(w, "no prometheus queries passed", http.StatusBadRequest) - return - } else if len(req.Queries) > 1 { - http.Error(w, "only one prometheus remote query is supported per request", http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } @@ -944,7 +934,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met db := r.FormValue("db") q, err := prometheus.ReadRequestToInfluxQLQuery(&req, db, r.FormValue("rp")) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + h.httpError(w, err.Error(), http.StatusBadRequest) return } @@ -1019,10 +1009,19 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met } for _, v := range s.Values { - timestamp := v[0].(time.Time).UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) + t, ok := v[0].(time.Time) + if !ok { + h.httpError(w, fmt.Sprintf("value %v wasn't a time", v[0]), http.StatusBadRequest) + return + } + val, ok := v[1].(float64) + if !ok { + h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest) + } + timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) ts.Samples = append(ts.Samples, &remote.Sample{ TimestampMs: timestamp, - Value: v[1].(float64), + Value: val, }) } @@ -1032,7 +1031,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met data, err := proto.Marshal(resp) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, err.Error(), http.StatusInternalServerError) return } @@ -1041,7 +1040,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met compressed = snappy.Encode(nil, data) if _, err := w.Write(compressed); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + h.httpError(w, err.Error(), http.StatusInternalServerError) return } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 89bcdf79da5..dbe61cec3fd 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -561,8 +561,8 @@ func TestHandler_PromWrite(t *testing.T) { h.PointsWriter.WritePointsFn = func(db, rp string, _ models.ConsistencyLevel, _ meta.User, points []models.Point) error { called = true point := points[0] - if point.UnixNano() != 1*int64(time.Millisecond) { - t.Fatalf("Exp point time %d but got %d", 1*int64(time.Millisecond), point.UnixNano()) + if point.UnixNano() != int64(time.Millisecond) { + t.Fatalf("Exp point time %d but got %d", int64(time.Millisecond), point.UnixNano()) } tags := point.Tags() expectedTags := models.Tags{models.Tag{Key: []byte("host"), Value: []byte("a")}, models.Tag{Key: []byte("region"), Value: []byte("west")}} From a2d7024f4aeea92643c84bf6d47e5179224d7286 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 4 Sep 2017 12:37:23 -0400 Subject: [PATCH 3/8] Add error message for dropping NaN in Prometheus --- prometheus/converters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus/converters.go b/prometheus/converters.go index 91e90aa35ab..5bb12e4c4ae 100644 --- a/prometheus/converters.go +++ b/prometheus/converters.go @@ -19,7 +19,7 @@ const ( fieldName = "f64" ) -var ErrNaNDropped = errors.New("") +var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not supported") // WriteRequestToPoints converts a Prometheus remote write request of time series and their // samples into Points that can be written into Influx From 8a0d781cd9aafacf3f0d99b32989861c358ecb4d Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 4 Sep 2017 13:23:55 -0400 Subject: [PATCH 4/8] Move Prometheus dep to correct spot in GoDeps. --- Godeps | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Godeps b/Godeps index 762afc010f6..e4e93a9dd62 100644 --- a/Godeps +++ b/Godeps @@ -14,10 +14,10 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac +github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6 github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577 golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd golang.org/x/sys 90796e5a05ce440b41c768bd9af257005e470461 -github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133 From e7a85a1a21d7c7302de93b8561d7fd631236282b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 5 Sep 2017 10:52:33 +0100 Subject: [PATCH 5/8] Update license of dependencies --- LICENSE_OF_DEPENDENCIES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index 8c506bf4726..49f811b184a 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -17,6 +17,7 @@ - github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) - github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) +- github.com/prometheus/prometheus [APACHE LICENSE](https://github.com/prometheus/prometheus/blob/master/LICENSE) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) - github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) - github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt) From 64f483570852b6687132b53b20f9c37224baf60c Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 7 Sep 2017 11:55:32 -0400 Subject: [PATCH 6/8] Remove Prometheus dependency in favor of copying over proto definition. The only thing used from Prometheus was the storage/remote files that are generated from the remote.proto file. Copied that file into promtheus package and removed the dependency. --- Godeps | 1 - LICENSE_OF_DEPENDENCIES.md | 1 - prometheus/converters.go | 25 +- prometheus/remote.pb.go | 1759 ++++++++++++++++++++++++++++++++ prometheus/remote.proto | 70 ++ services/httpd/handler.go | 13 +- services/httpd/handler_test.go | 30 +- 7 files changed, 1863 insertions(+), 36 deletions(-) create mode 100644 prometheus/remote.pb.go create mode 100644 prometheus/remote.proto diff --git a/Godeps b/Godeps index e4e93a9dd62..271e4c47f2f 100644 --- a/Godeps +++ b/Godeps @@ -14,7 +14,6 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac -github.com/prometheus/prometheus 3afb3fffa3a29c3de865e1172fb740442e9d0133 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6 diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index 49f811b184a..8c506bf4726 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -17,7 +17,6 @@ - github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) - github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) -- github.com/prometheus/prometheus [APACHE LICENSE](https://github.com/prometheus/prometheus/blob/master/LICENSE) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) - github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) - github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt) diff --git a/prometheus/converters.go b/prometheus/converters.go index 5bb12e4c4ae..458010cc7fa 100644 --- a/prometheus/converters.go +++ b/prometheus/converters.go @@ -8,9 +8,10 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" - "github.com/prometheus/prometheus/storage/remote" ) +//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto + const ( // measurementName is where all prometheus time series go to measurementName = "_" @@ -23,7 +24,7 @@ var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not s // WriteRequestToPoints converts a Prometheus remote write request of time series and their // samples into Points that can be written into Influx -func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { +func WriteRequestToPoints(req *WriteRequest) ([]models.Point, error) { var maxPoints int for _, ts := range req.Timeseries { maxPoints += len(ts.Samples) @@ -61,7 +62,7 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { // ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL // query that will return the requested data when executed -func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) { +func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Query, error) { if len(req.Queries) != 1 { return nil, errors.New("Prometheus read endpoint currently only supports one query at a time") } @@ -94,16 +95,16 @@ func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influx } // condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr -func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) { +func condFromMatcher(m *LabelMatcher) (*influxql.BinaryExpr, error) { var op influxql.Token switch m.Type { - case remote.MatchType_EQUAL: + case MatchType_EQUAL: op = influxql.EQ - case remote.MatchType_NOT_EQUAL: + case MatchType_NOT_EQUAL: op = influxql.NEQ - case remote.MatchType_REGEX_MATCH: + case MatchType_REGEX_MATCH: op = influxql.EQREGEX - case remote.MatchType_REGEX_NO_MATCH: + case MatchType_REGEX_NO_MATCH: op = influxql.NEQREGEX default: return nil, fmt.Errorf("unknown match type %v", m.Type) @@ -120,7 +121,7 @@ func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) { // into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus // remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels // are kept equivalent. -func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) { +func condFromMatchers(q *Query, matchers []*LabelMatcher) (*influxql.BinaryExpr, error) { if len(matchers) > 0 { lhs, err := condFromMatcher(matchers[0]) if err != nil { @@ -154,8 +155,8 @@ func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influx } // TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs -func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair { - pairs := make([]*remote.LabelPair, 0, len(tags)) +func TagsToLabelPairs(tags map[string]string) []*LabelPair { + pairs := make([]*LabelPair, 0, len(tags)) for k, v := range tags { if v == "" { // If we select metrics with different sets of labels names, @@ -166,7 +167,7 @@ func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair { // to make the result correct. continue } - pairs = append(pairs, &remote.LabelPair{ + pairs = append(pairs, &LabelPair{ Name: k, Value: v, }) diff --git a/prometheus/remote.pb.go b/prometheus/remote.pb.go new file mode 100644 index 00000000000..cf2c848497a --- /dev/null +++ b/prometheus/remote.pb.go @@ -0,0 +1,1759 @@ +// Code generated by protoc-gen-gogo. +// source: remote.proto +// DO NOT EDIT! + +/* + Package prometheus is a generated protocol buffer package. + + It is generated from these files: + remote.proto + + It has these top-level messages: + Sample + LabelPair + TimeSeries + WriteRequest + ReadRequest + ReadResponse + Query + LabelMatcher + QueryResult +*/ +package prometheus + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type MatchType int32 + +const ( + MatchType_EQUAL MatchType = 0 + MatchType_NOT_EQUAL MatchType = 1 + MatchType_REGEX_MATCH MatchType = 2 + MatchType_REGEX_NO_MATCH MatchType = 3 +) + +var MatchType_name = map[int32]string{ + 0: "EQUAL", + 1: "NOT_EQUAL", + 2: "REGEX_MATCH", + 3: "REGEX_NO_MATCH", +} +var MatchType_value = map[string]int32{ + "EQUAL": 0, + "NOT_EQUAL": 1, + "REGEX_MATCH": 2, + "REGEX_NO_MATCH": 3, +} + +func (x MatchType) String() string { + return proto.EnumName(MatchType_name, int32(x)) +} +func (MatchType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRemote, []int{0} } + +type Sample struct { + Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` +} + +func (m *Sample) Reset() { *m = Sample{} } +func (m *Sample) String() string { return proto.CompactTextString(m) } +func (*Sample) ProtoMessage() {} +func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{0} } + +func (m *Sample) GetValue() float64 { + if m != nil { + return m.Value + } + return 0 +} + +func (m *Sample) GetTimestampMs() int64 { + if m != nil { + return m.TimestampMs + } + return 0 +} + +type LabelPair struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *LabelPair) Reset() { *m = LabelPair{} } +func (m *LabelPair) String() string { return proto.CompactTextString(m) } +func (*LabelPair) ProtoMessage() {} +func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{1} } + +func (m *LabelPair) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *LabelPair) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +type TimeSeries struct { + Labels []*LabelPair `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"` + // Sorted by time, oldest sample first. + Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"` +} + +func (m *TimeSeries) Reset() { *m = TimeSeries{} } +func (m *TimeSeries) String() string { return proto.CompactTextString(m) } +func (*TimeSeries) ProtoMessage() {} +func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{2} } + +func (m *TimeSeries) GetLabels() []*LabelPair { + if m != nil { + return m.Labels + } + return nil +} + +func (m *TimeSeries) GetSamples() []*Sample { + if m != nil { + return m.Samples + } + return nil +} + +type WriteRequest struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{3} } + +func (m *WriteRequest) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +type ReadRequest struct { + Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"` +} + +func (m *ReadRequest) Reset() { *m = ReadRequest{} } +func (m *ReadRequest) String() string { return proto.CompactTextString(m) } +func (*ReadRequest) ProtoMessage() {} +func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{4} } + +func (m *ReadRequest) GetQueries() []*Query { + if m != nil { + return m.Queries + } + return nil +} + +type ReadResponse struct { + // In same order as the request's queries. + Results []*QueryResult `protobuf:"bytes,1,rep,name=results" json:"results,omitempty"` +} + +func (m *ReadResponse) Reset() { *m = ReadResponse{} } +func (m *ReadResponse) String() string { return proto.CompactTextString(m) } +func (*ReadResponse) ProtoMessage() {} +func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{5} } + +func (m *ReadResponse) GetResults() []*QueryResult { + if m != nil { + return m.Results + } + return nil +} + +type Query struct { + StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` + EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` + Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"` +} + +func (m *Query) Reset() { *m = Query{} } +func (m *Query) String() string { return proto.CompactTextString(m) } +func (*Query) ProtoMessage() {} +func (*Query) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{6} } + +func (m *Query) GetStartTimestampMs() int64 { + if m != nil { + return m.StartTimestampMs + } + return 0 +} + +func (m *Query) GetEndTimestampMs() int64 { + if m != nil { + return m.EndTimestampMs + } + return 0 +} + +func (m *Query) GetMatchers() []*LabelMatcher { + if m != nil { + return m.Matchers + } + return nil +} + +type LabelMatcher struct { + Type MatchType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MatchType" json:"type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Value string `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } +func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } +func (*LabelMatcher) ProtoMessage() {} +func (*LabelMatcher) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{7} } + +func (m *LabelMatcher) GetType() MatchType { + if m != nil { + return m.Type + } + return MatchType_EQUAL +} + +func (m *LabelMatcher) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *LabelMatcher) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +type QueryResult struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *QueryResult) Reset() { *m = QueryResult{} } +func (m *QueryResult) String() string { return proto.CompactTextString(m) } +func (*QueryResult) ProtoMessage() {} +func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{8} } + +func (m *QueryResult) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +func init() { + proto.RegisterType((*Sample)(nil), "prometheus.Sample") + proto.RegisterType((*LabelPair)(nil), "prometheus.LabelPair") + proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") + proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") + proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") + proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") + proto.RegisterType((*Query)(nil), "prometheus.Query") + proto.RegisterType((*LabelMatcher)(nil), "prometheus.LabelMatcher") + proto.RegisterType((*QueryResult)(nil), "prometheus.QueryResult") + proto.RegisterEnum("prometheus.MatchType", MatchType_name, MatchType_value) +} +func (m *Sample) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Sample) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Value != 0 { + dAtA[i] = 0x9 + i++ + i = encodeFixed64Remote(dAtA, i, uint64(math.Float64bits(float64(m.Value)))) + } + if m.TimestampMs != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.TimestampMs)) + } + return i, nil +} + +func (m *LabelPair) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LabelPair) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Name) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) + } + if len(m.Value) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintRemote(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + return i, nil +} + +func (m *TimeSeries) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TimeSeries) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Samples) > 0 { + for _, msg := range m.Samples { + dAtA[i] = 0x12 + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *WriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, msg := range m.Timeseries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *ReadRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Queries) > 0 { + for _, msg := range m.Queries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *ReadResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Results) > 0 { + for _, msg := range m.Results { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *Query) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Query) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.StartTimestampMs != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.StartTimestampMs)) + } + if m.EndTimestampMs != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.EndTimestampMs)) + } + if len(m.Matchers) > 0 { + for _, msg := range m.Matchers { + dAtA[i] = 0x1a + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *LabelMatcher) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LabelMatcher) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Type != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.Type)) + } + if len(m.Name) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintRemote(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) + } + if len(m.Value) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintRemote(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + return i, nil +} + +func (m *QueryResult) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryResult) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, msg := range m.Timeseries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func encodeFixed64Remote(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Remote(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Sample) Size() (n int) { + var l int + _ = l + if m.Value != 0 { + n += 9 + } + if m.TimestampMs != 0 { + n += 1 + sovRemote(uint64(m.TimestampMs)) + } + return n +} + +func (m *LabelPair) Size() (n int) { + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovRemote(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovRemote(uint64(l)) + } + return n +} + +func (m *TimeSeries) Size() (n int) { + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + if len(m.Samples) > 0 { + for _, e := range m.Samples { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *WriteRequest) Size() (n int) { + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *ReadRequest) Size() (n int) { + var l int + _ = l + if len(m.Queries) > 0 { + for _, e := range m.Queries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *ReadResponse) Size() (n int) { + var l int + _ = l + if len(m.Results) > 0 { + for _, e := range m.Results { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *Query) Size() (n int) { + var l int + _ = l + if m.StartTimestampMs != 0 { + n += 1 + sovRemote(uint64(m.StartTimestampMs)) + } + if m.EndTimestampMs != 0 { + n += 1 + sovRemote(uint64(m.EndTimestampMs)) + } + if len(m.Matchers) > 0 { + for _, e := range m.Matchers { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *LabelMatcher) Size() (n int) { + var l int + _ = l + if m.Type != 0 { + n += 1 + sovRemote(uint64(m.Type)) + } + l = len(m.Name) + if l > 0 { + n += 1 + l + sovRemote(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovRemote(uint64(l)) + } + return n +} + +func (m *QueryResult) Size() (n int) { + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func sovRemote(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRemote(x uint64) (n int) { + return sovRemote(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Sample) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Sample: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Sample: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + iNdEx += 8 + v = uint64(dAtA[iNdEx-8]) + v |= uint64(dAtA[iNdEx-7]) << 8 + v |= uint64(dAtA[iNdEx-6]) << 16 + v |= uint64(dAtA[iNdEx-5]) << 24 + v |= uint64(dAtA[iNdEx-4]) << 32 + v |= uint64(dAtA[iNdEx-3]) << 40 + v |= uint64(dAtA[iNdEx-2]) << 48 + v |= uint64(dAtA[iNdEx-1]) << 56 + m.Value = float64(math.Float64frombits(v)) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimestampMs", wireType) + } + m.TimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimestampMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LabelPair) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TimeSeries) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TimeSeries: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TimeSeries: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, &LabelPair{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Samples = append(m.Samples, &Sample{}) + if err := m.Samples[len(m.Samples)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *WriteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, &TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Queries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Queries = append(m.Queries, &Query{}) + if err := m.Queries[len(m.Queries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Results", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Results = append(m.Results, &QueryResult{}) + if err := m.Results[len(m.Results)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Query) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Query: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Query: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTimestampMs", wireType) + } + m.StartTimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTimestampMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTimestampMs", wireType) + } + m.EndTimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndTimestampMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Matchers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Matchers = append(m.Matchers, &LabelMatcher{}) + if err := m.Matchers[len(m.Matchers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LabelMatcher) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelMatcher: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelMatcher: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= (MatchType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryResult) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryResult: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryResult: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, &TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRemote(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthRemote + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRemote(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthRemote = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRemote = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("remote.proto", fileDescriptorRemote) } + +var fileDescriptorRemote = []byte{ + // 460 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x4d, 0x6f, 0xd3, 0x4e, + 0x10, 0xc6, 0xbb, 0x71, 0x93, 0xfc, 0x3d, 0xf6, 0x3f, 0x98, 0x11, 0x2f, 0x3e, 0x45, 0xc1, 0xa7, + 0x00, 0x25, 0x12, 0xaf, 0x07, 0x6e, 0x01, 0x05, 0x10, 0x6a, 0x5a, 0xba, 0x35, 0x82, 0x9b, 0xe5, + 0x36, 0x23, 0xd5, 0x92, 0x1d, 0xbb, 0xbb, 0x6b, 0xa4, 0x7c, 0x0b, 0x2e, 0x7c, 0x27, 0x8e, 0x7c, + 0x04, 0x14, 0xbe, 0x08, 0xca, 0x6e, 0xec, 0x6c, 0xd5, 0x9e, 0xb8, 0x65, 0x67, 0x7e, 0xf3, 0xec, + 0xb3, 0x79, 0xc6, 0xe0, 0x0b, 0x2a, 0x4a, 0x45, 0x93, 0x4a, 0x94, 0xaa, 0x44, 0xa8, 0x44, 0x59, + 0x90, 0xba, 0xa0, 0x5a, 0x46, 0x53, 0xe8, 0x9d, 0xa6, 0x45, 0x95, 0x13, 0xde, 0x81, 0xee, 0xb7, + 0x34, 0xaf, 0x29, 0x64, 0x23, 0x36, 0x66, 0xdc, 0x1c, 0xf0, 0x01, 0xf8, 0x2a, 0x2b, 0x48, 0xaa, + 0xb4, 0xa8, 0x92, 0x42, 0x86, 0x9d, 0x11, 0x1b, 0x3b, 0xdc, 0x6b, 0x6b, 0x73, 0x19, 0xbd, 0x04, + 0xf7, 0x30, 0x3d, 0xa3, 0xfc, 0x53, 0x9a, 0x09, 0x44, 0xd8, 0x5f, 0xa6, 0x85, 0x11, 0x71, 0xb9, + 0xfe, 0xbd, 0x53, 0xee, 0xe8, 0xa2, 0x39, 0x44, 0x19, 0x40, 0x9c, 0x15, 0x74, 0x4a, 0x22, 0x23, + 0x89, 0x4f, 0xa0, 0x97, 0x6f, 0x44, 0x64, 0xc8, 0x46, 0xce, 0xd8, 0x7b, 0x76, 0x77, 0xb2, 0x33, + 0x39, 0x69, 0xe5, 0xf9, 0x16, 0xc2, 0x03, 0xe8, 0x4b, 0x6d, 0x7b, 0xe3, 0x68, 0xc3, 0xa3, 0xcd, + 0x9b, 0x17, 0xf1, 0x06, 0x89, 0xde, 0x81, 0xff, 0x45, 0x64, 0x8a, 0x38, 0x5d, 0xd6, 0x24, 0x15, + 0xbe, 0x02, 0xd0, 0x0f, 0xd0, 0x57, 0x6f, 0x2f, 0xbc, 0x67, 0x0b, 0xec, 0x8c, 0x71, 0x8b, 0x8c, + 0x5e, 0x83, 0xc7, 0x29, 0x5d, 0x34, 0x32, 0x8f, 0xa1, 0x7f, 0x59, 0xdb, 0x1a, 0xb7, 0x6d, 0x8d, + 0x93, 0x9a, 0xc4, 0x8a, 0x37, 0x44, 0x34, 0x05, 0xdf, 0xcc, 0xca, 0xaa, 0x5c, 0x4a, 0xc2, 0xa7, + 0xd0, 0x17, 0x24, 0xeb, 0x5c, 0x35, 0xc3, 0xf7, 0xaf, 0x0f, 0xeb, 0x3e, 0x6f, 0xb8, 0xe8, 0x07, + 0x83, 0xae, 0x6e, 0xe0, 0x01, 0xa0, 0x54, 0xa9, 0x50, 0xc9, 0x95, 0x6c, 0x98, 0xce, 0x26, 0xd0, + 0x9d, 0x78, 0x17, 0x10, 0x8e, 0x21, 0xa0, 0xe5, 0x22, 0xb9, 0x21, 0xc7, 0x01, 0x2d, 0x17, 0x36, + 0xf9, 0x02, 0xfe, 0x2b, 0x52, 0x75, 0x7e, 0x41, 0x42, 0x86, 0x8e, 0x76, 0x15, 0x5e, 0xcb, 0x61, + 0x6e, 0x00, 0xde, 0x92, 0xd1, 0x39, 0xf8, 0x76, 0x07, 0x1f, 0xc2, 0xbe, 0x5a, 0x55, 0x66, 0x07, + 0x06, 0x57, 0x93, 0xd4, 0x48, 0xbc, 0xaa, 0x88, 0x6b, 0xa4, 0x5d, 0x97, 0xce, 0x4d, 0xeb, 0xe2, + 0xd8, 0xeb, 0x32, 0x03, 0xcf, 0xfa, 0x53, 0xfe, 0x35, 0xc2, 0x47, 0x1f, 0xc1, 0x6d, 0x3d, 0xa0, + 0x0b, 0xdd, 0xd9, 0xc9, 0xe7, 0xe9, 0x61, 0xb0, 0x87, 0xff, 0x83, 0x7b, 0x74, 0x1c, 0x27, 0xe6, + 0xc8, 0xf0, 0x16, 0x78, 0x7c, 0xf6, 0x7e, 0xf6, 0x35, 0x99, 0x4f, 0xe3, 0xb7, 0x1f, 0x82, 0x0e, + 0x22, 0x0c, 0x4c, 0xe1, 0xe8, 0x78, 0x5b, 0x73, 0xde, 0x04, 0x3f, 0xd7, 0x43, 0xf6, 0x6b, 0x3d, + 0x64, 0xbf, 0xd7, 0x43, 0xf6, 0xfd, 0xcf, 0x70, 0xef, 0xac, 0xa7, 0x3f, 0xb0, 0xe7, 0x7f, 0x03, + 0x00, 0x00, 0xff, 0xff, 0x33, 0x2b, 0x39, 0x47, 0x70, 0x03, 0x00, 0x00, +} diff --git a/prometheus/remote.proto b/prometheus/remote.proto new file mode 100644 index 00000000000..e8bf2280284 --- /dev/null +++ b/prometheus/remote.proto @@ -0,0 +1,70 @@ +// This file is copied (except for package name) from https://github.com/prometheus/prometheus/blob/master/storage/remote/remote.proto + +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package prometheus; // change package from remote to prometheus + +message Sample { + double value = 1; + int64 timestamp_ms = 2; +} + +message LabelPair { + string name = 1; + string value = 2; +} + +message TimeSeries { + repeated LabelPair labels = 1; + // Sorted by time, oldest sample first. + repeated Sample samples = 2; +} + +message WriteRequest { + repeated TimeSeries timeseries = 1; +} + +message ReadRequest { + repeated Query queries = 1; +} + +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated LabelMatcher matchers = 3; +} + +enum MatchType { + EQUAL = 0; + NOT_EQUAL = 1; + REGEX_MATCH = 2; + REGEX_NO_MATCH = 3; +} + +message LabelMatcher { + MatchType type = 1; + string name = 2; + string value = 3; +} + +message QueryResult { + repeated TimeSeries timeseries = 1; +} \ No newline at end of file diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 0623b3d5c71..567290afee9 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -33,7 +33,6 @@ import ( "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/uuid" - "github.com/prometheus/prometheus/storage/remote" "github.com/uber-go/zap" ) @@ -856,7 +855,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me } // Convert the Prometheus remote write request to Influx Points - var req remote.WriteRequest + var req prometheus.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) return @@ -924,7 +923,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met return } - var req remote.ReadRequest + var req prometheus.ReadRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) return @@ -991,8 +990,8 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) - resp := &remote.ReadResponse{ - Results: []*remote.QueryResult{{}}, + resp := &prometheus.ReadResponse{ + Results: []*prometheus.QueryResult{{}}, } // pull all results from the channel @@ -1004,7 +1003,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met // read the series data and convert into Prometheus samples for _, s := range r.Series { - ts := &remote.TimeSeries{ + ts := &prometheus.TimeSeries{ Labels: prometheus.TagsToLabelPairs(s.Tags), } @@ -1019,7 +1018,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest) } timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) - ts.Samples = append(ts.Samples, &remote.Sample{ + ts.Samples = append(ts.Samples, &prometheus.Sample{ TimestampMs: timestamp, Value: val, }) diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index dbe61cec3fd..054b277e3cb 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -22,10 +22,10 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/prometheus" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" "github.com/influxdata/influxdb/services/meta" - "github.com/prometheus/prometheus/storage/remote" ) // Ensure the handler returns results from a query (including nil results). @@ -531,14 +531,14 @@ func TestHandler_Query_CloseNotify(t *testing.T) { // Ensure the prometheus remote write works func TestHandler_PromWrite(t *testing.T) { - req := &remote.WriteRequest{ - Timeseries: []*remote.TimeSeries{ + req := &prometheus.WriteRequest{ + Timeseries: []*prometheus.TimeSeries{ { - Labels: []*remote.LabelPair{ + Labels: []*prometheus.LabelPair{ {Name: "host", Value: "a"}, {Name: "region", Value: "west"}, }, - Samples: []*remote.Sample{ + Samples: []*prometheus.Sample{ {TimestampMs: 1, Value: 1.2}, {TimestampMs: 2, Value: math.NaN()}, }, @@ -594,13 +594,13 @@ func TestHandler_PromWrite(t *testing.T) { // Ensure Prometheus remote read requests are converted to the correct InfluxQL query and // data is returned func TestHandler_PromRead(t *testing.T) { - req := &remote.ReadRequest{ - Queries: []*remote.Query{{ - Matchers: []*remote.LabelMatcher{ - {Type: remote.MatchType_EQUAL, Name: "eq", Value: "a"}, - {Type: remote.MatchType_NOT_EQUAL, Name: "neq", Value: "b"}, - {Type: remote.MatchType_REGEX_MATCH, Name: "regex", Value: "c"}, - {Type: remote.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"}, + req := &prometheus.ReadRequest{ + Queries: []*prometheus.Query{{ + Matchers: []*prometheus.LabelMatcher{ + {Type: prometheus.MatchType_EQUAL, Name: "eq", Value: "a"}, + {Type: prometheus.MatchType_NOT_EQUAL, Name: "neq", Value: "b"}, + {Type: prometheus.MatchType_REGEX_MATCH, Name: "regex", Value: "c"}, + {Type: prometheus.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"}, }, StartTimestampMs: 1, EndTimestampMs: 2, @@ -642,13 +642,13 @@ func TestHandler_PromRead(t *testing.T) { t.Fatal(err.Error()) } - var resp remote.ReadResponse + var resp prometheus.ReadResponse if err := proto.Unmarshal(reqBuf, &resp); err != nil { t.Fatal(err.Error()) } - expLabels := []*remote.LabelPair{{Name: "foo", Value: "bar"}} - expSamples := []*remote.Sample{{TimestampMs: 23000, Value: 1.2}} + expLabels := []*prometheus.LabelPair{{Name: "foo", Value: "bar"}} + expSamples := []*prometheus.Sample{{TimestampMs: 23000, Value: 1.2}} ts := resp.Results[0].Timeseries[0] From a4cc826556879809ba18ef360863fa0143506301 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 7 Sep 2017 12:04:05 -0400 Subject: [PATCH 7/8] Update prometheus RequestToQuery converter --- prometheus/converters.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/prometheus/converters.go b/prometheus/converters.go index 458010cc7fa..276273fe0df 100644 --- a/prometheus/converters.go +++ b/prometheus/converters.go @@ -68,8 +68,6 @@ func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Quer } promQuery := req.Queries[0] - q := &influxql.Query{} - stmt := &influxql.SelectStatement{ IsRawQuery: true, Fields: []*influxql.Field{ @@ -89,9 +87,8 @@ func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Quer } stmt.Condition = cond - q.Statements = append(q.Statements, stmt) - return q, nil + return &influxql.Query{Statements: []influxql.Statement{stmt}}, nil } // condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr From b1dcdaa099e1cbcc8364c3109268879dc353357a Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 7 Sep 2017 13:26:10 -0400 Subject: [PATCH 8/8] Move prometheus remote proto to remote package --- prometheus/converters.go | 25 ++++---- prometheus/remote/generate.go | 3 + prometheus/{ => remote}/remote.pb.go | 86 ++++++++++++++-------------- prometheus/{ => remote}/remote.proto | 2 +- services/httpd/handler.go | 13 +++-- services/httpd/handler_test.go | 30 +++++----- 6 files changed, 81 insertions(+), 78 deletions(-) create mode 100644 prometheus/remote/generate.go rename prometheus/{ => remote}/remote.pb.go (91%) rename prometheus/{ => remote}/remote.proto (96%) diff --git a/prometheus/converters.go b/prometheus/converters.go index 276273fe0df..9b726db52f7 100644 --- a/prometheus/converters.go +++ b/prometheus/converters.go @@ -8,10 +8,9 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/prometheus/remote" ) -//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto - const ( // measurementName is where all prometheus time series go to measurementName = "_" @@ -24,7 +23,7 @@ var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not s // WriteRequestToPoints converts a Prometheus remote write request of time series and their // samples into Points that can be written into Influx -func WriteRequestToPoints(req *WriteRequest) ([]models.Point, error) { +func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) { var maxPoints int for _, ts := range req.Timeseries { maxPoints += len(ts.Samples) @@ -62,7 +61,7 @@ func WriteRequestToPoints(req *WriteRequest) ([]models.Point, error) { // ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL // query that will return the requested data when executed -func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Query, error) { +func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) { if len(req.Queries) != 1 { return nil, errors.New("Prometheus read endpoint currently only supports one query at a time") } @@ -92,16 +91,16 @@ func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Quer } // condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr -func condFromMatcher(m *LabelMatcher) (*influxql.BinaryExpr, error) { +func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) { var op influxql.Token switch m.Type { - case MatchType_EQUAL: + case remote.MatchType_EQUAL: op = influxql.EQ - case MatchType_NOT_EQUAL: + case remote.MatchType_NOT_EQUAL: op = influxql.NEQ - case MatchType_REGEX_MATCH: + case remote.MatchType_REGEX_MATCH: op = influxql.EQREGEX - case MatchType_REGEX_NO_MATCH: + case remote.MatchType_REGEX_NO_MATCH: op = influxql.NEQREGEX default: return nil, fmt.Errorf("unknown match type %v", m.Type) @@ -118,7 +117,7 @@ func condFromMatcher(m *LabelMatcher) (*influxql.BinaryExpr, error) { // into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus // remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels // are kept equivalent. -func condFromMatchers(q *Query, matchers []*LabelMatcher) (*influxql.BinaryExpr, error) { +func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) { if len(matchers) > 0 { lhs, err := condFromMatcher(matchers[0]) if err != nil { @@ -152,8 +151,8 @@ func condFromMatchers(q *Query, matchers []*LabelMatcher) (*influxql.BinaryExpr, } // TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs -func TagsToLabelPairs(tags map[string]string) []*LabelPair { - pairs := make([]*LabelPair, 0, len(tags)) +func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair { + pairs := make([]*remote.LabelPair, 0, len(tags)) for k, v := range tags { if v == "" { // If we select metrics with different sets of labels names, @@ -164,7 +163,7 @@ func TagsToLabelPairs(tags map[string]string) []*LabelPair { // to make the result correct. continue } - pairs = append(pairs, &LabelPair{ + pairs = append(pairs, &remote.LabelPair{ Name: k, Value: v, }) diff --git a/prometheus/remote/generate.go b/prometheus/remote/generate.go new file mode 100644 index 00000000000..26e2eaec47c --- /dev/null +++ b/prometheus/remote/generate.go @@ -0,0 +1,3 @@ +package remote + +//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto diff --git a/prometheus/remote.pb.go b/prometheus/remote/remote.pb.go similarity index 91% rename from prometheus/remote.pb.go rename to prometheus/remote/remote.pb.go index cf2c848497a..a9e78261e14 100644 --- a/prometheus/remote.pb.go +++ b/prometheus/remote/remote.pb.go @@ -3,7 +3,7 @@ // DO NOT EDIT! /* - Package prometheus is a generated protocol buffer package. + Package remote is a generated protocol buffer package. It is generated from these files: remote.proto @@ -19,7 +19,7 @@ LabelMatcher QueryResult */ -package prometheus +package remote import proto "github.com/gogo/protobuf/proto" import fmt "fmt" @@ -220,7 +220,7 @@ func (m *Query) GetMatchers() []*LabelMatcher { } type LabelMatcher struct { - Type MatchType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MatchType" json:"type,omitempty"` + Type MatchType `protobuf:"varint,1,opt,name=type,proto3,enum=remote.MatchType" json:"type,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Value string `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` } @@ -268,16 +268,16 @@ func (m *QueryResult) GetTimeseries() []*TimeSeries { } func init() { - proto.RegisterType((*Sample)(nil), "prometheus.Sample") - proto.RegisterType((*LabelPair)(nil), "prometheus.LabelPair") - proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") - proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") - proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") - proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") - proto.RegisterType((*Query)(nil), "prometheus.Query") - proto.RegisterType((*LabelMatcher)(nil), "prometheus.LabelMatcher") - proto.RegisterType((*QueryResult)(nil), "prometheus.QueryResult") - proto.RegisterEnum("prometheus.MatchType", MatchType_name, MatchType_value) + proto.RegisterType((*Sample)(nil), "remote.Sample") + proto.RegisterType((*LabelPair)(nil), "remote.LabelPair") + proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries") + proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest") + proto.RegisterType((*ReadRequest)(nil), "remote.ReadRequest") + proto.RegisterType((*ReadResponse)(nil), "remote.ReadResponse") + proto.RegisterType((*Query)(nil), "remote.Query") + proto.RegisterType((*LabelMatcher)(nil), "remote.LabelMatcher") + proto.RegisterType((*QueryResult)(nil), "remote.QueryResult") + proto.RegisterEnum("remote.MatchType", MatchType_name, MatchType_value) } func (m *Sample) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -1726,34 +1726,34 @@ var ( func init() { proto.RegisterFile("remote.proto", fileDescriptorRemote) } var fileDescriptorRemote = []byte{ - // 460 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x4d, 0x6f, 0xd3, 0x4e, - 0x10, 0xc6, 0xbb, 0x71, 0x93, 0xfc, 0x3d, 0xf6, 0x3f, 0x98, 0x11, 0x2f, 0x3e, 0x45, 0xc1, 0xa7, - 0x00, 0x25, 0x12, 0xaf, 0x07, 0x6e, 0x01, 0x05, 0x10, 0x6a, 0x5a, 0xba, 0x35, 0x82, 0x9b, 0xe5, - 0x36, 0x23, 0xd5, 0x92, 0x1d, 0xbb, 0xbb, 0x6b, 0xa4, 0x7c, 0x0b, 0x2e, 0x7c, 0x27, 0x8e, 0x7c, - 0x04, 0x14, 0xbe, 0x08, 0xca, 0x6e, 0xec, 0x6c, 0xd5, 0x9e, 0xb8, 0x65, 0x67, 0x7e, 0xf3, 0xec, - 0xb3, 0x79, 0xc6, 0xe0, 0x0b, 0x2a, 0x4a, 0x45, 0x93, 0x4a, 0x94, 0xaa, 0x44, 0xa8, 0x44, 0x59, - 0x90, 0xba, 0xa0, 0x5a, 0x46, 0x53, 0xe8, 0x9d, 0xa6, 0x45, 0x95, 0x13, 0xde, 0x81, 0xee, 0xb7, - 0x34, 0xaf, 0x29, 0x64, 0x23, 0x36, 0x66, 0xdc, 0x1c, 0xf0, 0x01, 0xf8, 0x2a, 0x2b, 0x48, 0xaa, - 0xb4, 0xa8, 0x92, 0x42, 0x86, 0x9d, 0x11, 0x1b, 0x3b, 0xdc, 0x6b, 0x6b, 0x73, 0x19, 0xbd, 0x04, - 0xf7, 0x30, 0x3d, 0xa3, 0xfc, 0x53, 0x9a, 0x09, 0x44, 0xd8, 0x5f, 0xa6, 0x85, 0x11, 0x71, 0xb9, - 0xfe, 0xbd, 0x53, 0xee, 0xe8, 0xa2, 0x39, 0x44, 0x19, 0x40, 0x9c, 0x15, 0x74, 0x4a, 0x22, 0x23, - 0x89, 0x4f, 0xa0, 0x97, 0x6f, 0x44, 0x64, 0xc8, 0x46, 0xce, 0xd8, 0x7b, 0x76, 0x77, 0xb2, 0x33, - 0x39, 0x69, 0xe5, 0xf9, 0x16, 0xc2, 0x03, 0xe8, 0x4b, 0x6d, 0x7b, 0xe3, 0x68, 0xc3, 0xa3, 0xcd, - 0x9b, 0x17, 0xf1, 0x06, 0x89, 0xde, 0x81, 0xff, 0x45, 0x64, 0x8a, 0x38, 0x5d, 0xd6, 0x24, 0x15, - 0xbe, 0x02, 0xd0, 0x0f, 0xd0, 0x57, 0x6f, 0x2f, 0xbc, 0x67, 0x0b, 0xec, 0x8c, 0x71, 0x8b, 0x8c, - 0x5e, 0x83, 0xc7, 0x29, 0x5d, 0x34, 0x32, 0x8f, 0xa1, 0x7f, 0x59, 0xdb, 0x1a, 0xb7, 0x6d, 0x8d, - 0x93, 0x9a, 0xc4, 0x8a, 0x37, 0x44, 0x34, 0x05, 0xdf, 0xcc, 0xca, 0xaa, 0x5c, 0x4a, 0xc2, 0xa7, - 0xd0, 0x17, 0x24, 0xeb, 0x5c, 0x35, 0xc3, 0xf7, 0xaf, 0x0f, 0xeb, 0x3e, 0x6f, 0xb8, 0xe8, 0x07, - 0x83, 0xae, 0x6e, 0xe0, 0x01, 0xa0, 0x54, 0xa9, 0x50, 0xc9, 0x95, 0x6c, 0x98, 0xce, 0x26, 0xd0, - 0x9d, 0x78, 0x17, 0x10, 0x8e, 0x21, 0xa0, 0xe5, 0x22, 0xb9, 0x21, 0xc7, 0x01, 0x2d, 0x17, 0x36, - 0xf9, 0x02, 0xfe, 0x2b, 0x52, 0x75, 0x7e, 0x41, 0x42, 0x86, 0x8e, 0x76, 0x15, 0x5e, 0xcb, 0x61, - 0x6e, 0x00, 0xde, 0x92, 0xd1, 0x39, 0xf8, 0x76, 0x07, 0x1f, 0xc2, 0xbe, 0x5a, 0x55, 0x66, 0x07, - 0x06, 0x57, 0x93, 0xd4, 0x48, 0xbc, 0xaa, 0x88, 0x6b, 0xa4, 0x5d, 0x97, 0xce, 0x4d, 0xeb, 0xe2, - 0xd8, 0xeb, 0x32, 0x03, 0xcf, 0xfa, 0x53, 0xfe, 0x35, 0xc2, 0x47, 0x1f, 0xc1, 0x6d, 0x3d, 0xa0, - 0x0b, 0xdd, 0xd9, 0xc9, 0xe7, 0xe9, 0x61, 0xb0, 0x87, 0xff, 0x83, 0x7b, 0x74, 0x1c, 0x27, 0xe6, - 0xc8, 0xf0, 0x16, 0x78, 0x7c, 0xf6, 0x7e, 0xf6, 0x35, 0x99, 0x4f, 0xe3, 0xb7, 0x1f, 0x82, 0x0e, - 0x22, 0x0c, 0x4c, 0xe1, 0xe8, 0x78, 0x5b, 0x73, 0xde, 0x04, 0x3f, 0xd7, 0x43, 0xf6, 0x6b, 0x3d, - 0x64, 0xbf, 0xd7, 0x43, 0xf6, 0xfd, 0xcf, 0x70, 0xef, 0xac, 0xa7, 0x3f, 0xb0, 0xe7, 0x7f, 0x03, - 0x00, 0x00, 0xff, 0xff, 0x33, 0x2b, 0x39, 0x47, 0x70, 0x03, 0x00, 0x00, + // 449 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x4d, 0x6f, 0xd3, 0x40, + 0x10, 0xed, 0xc6, 0x4d, 0x82, 0xc7, 0x6e, 0x30, 0x43, 0x0f, 0x39, 0x45, 0xc1, 0x12, 0xc2, 0x20, + 0xa8, 0x50, 0x11, 0xdc, 0x38, 0xa4, 0x28, 0x02, 0xa1, 0xa6, 0xa5, 0x5b, 0x23, 0xb8, 0x59, 0x5b, + 0x32, 0x12, 0x96, 0xec, 0xc4, 0xdd, 0x5d, 0x23, 0xe5, 0x5f, 0xc0, 0xbf, 0xe2, 0xc8, 0x4f, 0x40, + 0xe1, 0x8f, 0xa0, 0xec, 0xc6, 0x1f, 0x91, 0x72, 0xea, 0x2d, 0x33, 0xef, 0xbd, 0x99, 0x97, 0x7d, + 0x63, 0xf0, 0x25, 0xe5, 0x4b, 0x4d, 0x27, 0x85, 0x5c, 0xea, 0x25, 0xf6, 0x6c, 0x15, 0x4e, 0xa0, + 0x77, 0x2d, 0xf2, 0x22, 0x23, 0x3c, 0x86, 0xee, 0x0f, 0x91, 0x95, 0x34, 0x64, 0x63, 0x16, 0x31, + 0x6e, 0x0b, 0x7c, 0x04, 0xbe, 0x4e, 0x73, 0x52, 0x5a, 0xe4, 0x45, 0x92, 0xab, 0x61, 0x67, 0xcc, + 0x22, 0x87, 0x7b, 0x75, 0x6f, 0xa6, 0xc2, 0xd7, 0xe0, 0x9e, 0x8b, 0x1b, 0xca, 0x3e, 0x89, 0x54, + 0x22, 0xc2, 0xe1, 0x42, 0xe4, 0x76, 0x88, 0xcb, 0xcd, 0xef, 0x66, 0x72, 0xc7, 0x34, 0x6d, 0x11, + 0x0a, 0x80, 0x38, 0xcd, 0xe9, 0x9a, 0x64, 0x4a, 0x0a, 0x9f, 0x42, 0x2f, 0xdb, 0x0c, 0x51, 0x43, + 0x36, 0x76, 0x22, 0xef, 0xf4, 0xc1, 0xc9, 0xd6, 0x6e, 0x3d, 0x9a, 0x6f, 0x09, 0x18, 0x41, 0x5f, + 0x19, 0xcb, 0x1b, 0x37, 0x1b, 0xee, 0xa0, 0xe2, 0xda, 0x7f, 0xc2, 0x2b, 0x38, 0x3c, 0x03, 0xff, + 0x8b, 0x4c, 0x35, 0x71, 0xba, 0x2d, 0x49, 0x69, 0x3c, 0x05, 0x30, 0xc6, 0xcd, 0xca, 0xed, 0x22, + 0xac, 0xc4, 0x8d, 0x19, 0xde, 0x62, 0x85, 0x6f, 0xc0, 0xe3, 0x24, 0xe6, 0xd5, 0x88, 0x27, 0xd0, + 0xbf, 0x2d, 0xdb, 0xfa, 0xa3, 0x4a, 0x7f, 0x55, 0x92, 0x5c, 0xf1, 0x0a, 0x0d, 0xdf, 0x82, 0x6f, + 0x75, 0xaa, 0x58, 0x2e, 0x14, 0xe1, 0x0b, 0xe8, 0x4b, 0x52, 0x65, 0xa6, 0x2b, 0xe1, 0xc3, 0x5d, + 0xa1, 0xc1, 0x78, 0xc5, 0x09, 0x7f, 0x31, 0xe8, 0x1a, 0x00, 0x9f, 0x03, 0x2a, 0x2d, 0xa4, 0x4e, + 0x76, 0x72, 0x60, 0x26, 0x87, 0xc0, 0x20, 0x71, 0x13, 0x06, 0x46, 0x10, 0xd0, 0x62, 0x9e, 0xec, + 0xc9, 0x6c, 0x40, 0x8b, 0x79, 0x9b, 0xf9, 0x12, 0xee, 0xe5, 0x42, 0x7f, 0xfb, 0x4e, 0x52, 0x0d, + 0x1d, 0xe3, 0xe8, 0x78, 0xe7, 0xcd, 0x67, 0x16, 0xe4, 0x35, 0x2b, 0x4c, 0xc0, 0x6f, 0x23, 0xf8, + 0x18, 0x0e, 0xf5, 0xaa, 0xb0, 0x59, 0x0f, 0x9a, 0xc4, 0x0c, 0x1c, 0xaf, 0x0a, 0xe2, 0x06, 0xae, + 0x4f, 0xa2, 0xb3, 0xef, 0x24, 0x9c, 0xf6, 0x49, 0x4c, 0xc0, 0x6b, 0x3d, 0xc6, 0x5d, 0xe2, 0x7a, + 0xf6, 0x11, 0xdc, 0x7a, 0x3f, 0xba, 0xd0, 0x9d, 0x5e, 0x7d, 0x9e, 0x9c, 0x07, 0x07, 0x78, 0x04, + 0xee, 0xc5, 0x65, 0x9c, 0xd8, 0x92, 0xe1, 0x7d, 0xf0, 0xf8, 0xf4, 0xfd, 0xf4, 0x6b, 0x32, 0x9b, + 0xc4, 0xef, 0x3e, 0x04, 0x1d, 0x44, 0x18, 0xd8, 0xc6, 0xc5, 0xe5, 0xb6, 0xe7, 0x9c, 0x05, 0xbf, + 0xd7, 0x23, 0xf6, 0x67, 0x3d, 0x62, 0x7f, 0xd7, 0x23, 0xf6, 0xf3, 0xdf, 0xe8, 0xe0, 0xa6, 0x67, + 0x3e, 0x9e, 0x57, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x28, 0xd6, 0xf1, 0x18, 0x4c, 0x03, 0x00, + 0x00, } diff --git a/prometheus/remote.proto b/prometheus/remote/remote.proto similarity index 96% rename from prometheus/remote.proto rename to prometheus/remote/remote.proto index e8bf2280284..4e429e47226 100644 --- a/prometheus/remote.proto +++ b/prometheus/remote/remote.proto @@ -15,7 +15,7 @@ syntax = "proto3"; -package prometheus; // change package from remote to prometheus +package remote; message Sample { double value = 1; diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 567290afee9..50ef5ffb71a 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -29,6 +29,7 @@ import ( "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/prometheus" + "github.com/influxdata/influxdb/prometheus/remote" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" @@ -855,7 +856,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me } // Convert the Prometheus remote write request to Influx Points - var req prometheus.WriteRequest + var req remote.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) return @@ -923,7 +924,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met return } - var req prometheus.ReadRequest + var req remote.ReadRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) return @@ -990,8 +991,8 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met // Execute query. results := h.QueryExecutor.ExecuteQuery(q, opts, closing) - resp := &prometheus.ReadResponse{ - Results: []*prometheus.QueryResult{{}}, + resp := &remote.ReadResponse{ + Results: []*remote.QueryResult{{}}, } // pull all results from the channel @@ -1003,7 +1004,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met // read the series data and convert into Prometheus samples for _, s := range r.Series { - ts := &prometheus.TimeSeries{ + ts := &remote.TimeSeries{ Labels: prometheus.TagsToLabelPairs(s.Tags), } @@ -1018,7 +1019,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest) } timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond) - ts.Samples = append(ts.Samples, &prometheus.Sample{ + ts.Samples = append(ts.Samples, &remote.Sample{ TimestampMs: timestamp, Value: val, }) diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 054b277e3cb..2d1dd73778a 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -22,7 +22,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/prometheus" + "github.com/influxdata/influxdb/prometheus/remote" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" "github.com/influxdata/influxdb/services/meta" @@ -531,14 +531,14 @@ func TestHandler_Query_CloseNotify(t *testing.T) { // Ensure the prometheus remote write works func TestHandler_PromWrite(t *testing.T) { - req := &prometheus.WriteRequest{ - Timeseries: []*prometheus.TimeSeries{ + req := &remote.WriteRequest{ + Timeseries: []*remote.TimeSeries{ { - Labels: []*prometheus.LabelPair{ + Labels: []*remote.LabelPair{ {Name: "host", Value: "a"}, {Name: "region", Value: "west"}, }, - Samples: []*prometheus.Sample{ + Samples: []*remote.Sample{ {TimestampMs: 1, Value: 1.2}, {TimestampMs: 2, Value: math.NaN()}, }, @@ -594,13 +594,13 @@ func TestHandler_PromWrite(t *testing.T) { // Ensure Prometheus remote read requests are converted to the correct InfluxQL query and // data is returned func TestHandler_PromRead(t *testing.T) { - req := &prometheus.ReadRequest{ - Queries: []*prometheus.Query{{ - Matchers: []*prometheus.LabelMatcher{ - {Type: prometheus.MatchType_EQUAL, Name: "eq", Value: "a"}, - {Type: prometheus.MatchType_NOT_EQUAL, Name: "neq", Value: "b"}, - {Type: prometheus.MatchType_REGEX_MATCH, Name: "regex", Value: "c"}, - {Type: prometheus.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"}, + req := &remote.ReadRequest{ + Queries: []*remote.Query{{ + Matchers: []*remote.LabelMatcher{ + {Type: remote.MatchType_EQUAL, Name: "eq", Value: "a"}, + {Type: remote.MatchType_NOT_EQUAL, Name: "neq", Value: "b"}, + {Type: remote.MatchType_REGEX_MATCH, Name: "regex", Value: "c"}, + {Type: remote.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"}, }, StartTimestampMs: 1, EndTimestampMs: 2, @@ -642,13 +642,13 @@ func TestHandler_PromRead(t *testing.T) { t.Fatal(err.Error()) } - var resp prometheus.ReadResponse + var resp remote.ReadResponse if err := proto.Unmarshal(reqBuf, &resp); err != nil { t.Fatal(err.Error()) } - expLabels := []*prometheus.LabelPair{{Name: "foo", Value: "bar"}} - expSamples := []*prometheus.Sample{{TimestampMs: 23000, Value: 1.2}} + expLabels := []*remote.LabelPair{{Name: "foo", Value: "bar"}} + expSamples := []*remote.Sample{{TimestampMs: 23000, Value: 1.2}} ts := resp.Results[0].Timeseries[0]