diff --git a/exporter/prometheusremotewriteexporter/DESIGN.md b/exporter/prometheusremotewriteexporter/DESIGN.md index d2d6539c646..a85f7d838bc 100644 --- a/exporter/prometheusremotewriteexporter/DESIGN.md +++ b/exporter/prometheusremotewriteexporter/DESIGN.md @@ -1,5 +1,3 @@ - - # **OpenTelemetry Collector Prometheus Remote Write/Cortex Exporter Design** Authors: @huyan0, @danielbang907 diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index 6444368801a..df10f1acdb7 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -22,10 +22,10 @@ Example: ```yaml exporters: -prometheusremotewrite: - endpoint: "http://some.url:9411/api/prom/push" + prometheusremotewrite: + endpoint: "http://some.url:9411/api/prom/push" ``` The full list of settings exposed for this exporter are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). -_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_ +_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_ \ No newline at end of file diff --git a/exporter/prometheusremotewriteexporter/config_test.go b/exporter/prometheusremotewriteexporter/config_test.go index 73927b35cff..29a3b535718 100644 --- a/exporter/prometheusremotewriteexporter/config_test.go +++ b/exporter/prometheusremotewriteexporter/config_test.go @@ -67,7 +67,6 @@ func Test_loadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, Namespace: "test-space", - HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "localhost:8888", TLSSetting: configtls.TLSClientSetting{ diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index c9b57eb3bb2..36f62bf8f2f 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -4,40 +4,39 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -// Note: implementation for this class is in a separate PR package prometheusremotewriteexporter import ( "bufio" "bytes" "context" - "errors" + "fmt" "io" "net/http" "net/url" - "strings" + "strconv" "sync" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old" "go.opentelemetry.io/collector/internal/dataold" ) -// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint -type prwExporter struct { +// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint +type PrwExporter struct { namespace string endpointURL *url.URL client *http.Client @@ -45,20 +44,20 @@ type prwExporter struct { closeChan chan struct{} } -// newPrwExporter initializes a new prwExporter instance and sets fields accordingly. +// newPrwExporter initializes a new PrwExporter instance and sets fields accordingly. // client parameter cannot be nil. -func newPrwExporter(namespace string, endpoint string, client *http.Client) (*prwExporter, error) { +func NewPrwExporter(namespace string, endpoint string, client *http.Client) (*PrwExporter, error) { if client == nil { - return nil, errors.New("http client cannot be nil") + return nil, fmt.Errorf("http client cannot be nil") } endpointURL, err := url.ParseRequestURI(endpoint) if err != nil { - return nil, errors.New("invalid endpoint") + return nil, fmt.Errorf("invalid endpoint") } - return &prwExporter{ + return &PrwExporter{ namespace: namespace, endpointURL: endpointURL, client: client, @@ -69,7 +68,7 @@ func newPrwExporter(namespace string, endpoint string, client *http.Client) (*pr // shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations // to finish before returning -func (prwe *prwExporter) shutdown(context.Context) error { +func (prwe *PrwExporter) Shutdown(context.Context) error { close(prwe.closeChan) prwe.wg.Wait() return nil @@ -78,16 +77,16 @@ func (prwe *prwExporter) shutdown(context.Context) error { // pushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally // exports the map. -func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { +func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { prwe.wg.Add(1) defer prwe.wg.Done() select { case <-prwe.closeChan: - return pdatautil.MetricCount(md), errors.New("shutdown has been called") + return pdatautil.MetricCount(md), fmt.Errorf("shutdown has been called") default: tsMap := map[string]*prompb.TimeSeries{} dropped := 0 - errs := []string{} + errs := []error{} resourceMetrics := dataold.MetricDataToOtlp(pdatautil.MetricsToOldInternalMetrics(md)) for _, resourceMetric := range resourceMetrics { @@ -107,7 +106,7 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int // check for valid type and temporality combination if ok := validateMetrics(metric.MetricDescriptor); !ok { dropped++ - errs = append(errs, "invalid temporality and type combination") + errs = append(errs, fmt.Errorf("invalid temporality and type combination")) continue } // handle individual metric based on type @@ -116,19 +115,33 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: if err := prwe.handleScalarMetric(tsMap, metric); err != nil { dropped++ - errs = append(errs, err.Error()) + errs = append(errs, err) + } + case otlp.MetricDescriptor_HISTOGRAM: + if err := prwe.handleHistogramMetric(tsMap, metric); err != nil { + dropped++ + errs = append(errs, err) } + case otlp.MetricDescriptor_SUMMARY: + if err := prwe.handleSummaryMetric(tsMap, metric); err != nil { + dropped++ + errs = append(errs, err) + } + default: + dropped++ + errs = append(errs, fmt.Errorf("unsupported metric type")) } } } } if err := prwe.export(ctx, tsMap); err != nil { - return pdatautil.MetricCount(md), err + dropped = pdatautil.MetricCount(md) + errs = append(errs, err) } if dropped != 0 { - return dropped, errors.New(strings.Join(errs, "\n")) + return dropped, componenterror.CombineErrors(errs) } return 0, nil @@ -138,19 +151,20 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int // handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into // its corresponding TimeSeries in tsMap. // tsMap and metric cannot be nil, and metric must have a non-nil descriptor -func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { - +func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { mType := metric.MetricDescriptor.Type switch mType { // int points case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64: if metric.Int64DataPoints == nil { - return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name) + return fmt.Errorf("nil data point field in metric %v", metric.GetMetricDescriptor().Name) } for _, pt := range metric.Int64DataPoints { - + if pt == nil { + continue + } // create parameters for addSample name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) labels := createLabelSet(pt.GetLabels(), nameStr, name) @@ -167,10 +181,12 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, // double points case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: if metric.DoubleDataPoints == nil { - return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name) + return fmt.Errorf("nil data point field in metric %v", metric.GetMetricDescriptor().Name) } for _, pt := range metric.DoubleDataPoints { - + if pt == nil { + continue + } // create parameters for addSample name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) labels := createLabelSet(pt.GetLabels(), nameStr, name) @@ -183,18 +199,124 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, } return nil } + return fmt.Errorf("invalid metric type: wants int or double data points") +} + +// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each +// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. +func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + + if metric.HistogramDataPoints == nil { + return fmt.Errorf("invalid metric type: wants histogram points") + } + + for _, pt := range metric.HistogramDataPoints { + if pt == nil { + continue + } + time := convertTimeStamp(pt.TimeUnixNano) + mType := metric.GetMetricDescriptor().GetType() + + // sum, count, and buckets of the histogram should append suffix to baseName + baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + + // treat sum as a sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr) + addSample(tsMap, sum, sumlabels, mType) + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr) + addSample(tsMap, count, countlabels, mType) - return errors.New("invalid metric type: wants int or double data points") + // count for +Inf bound + var totalCount uint64 + + // process each bucket + for le, bk := range pt.GetBuckets() { + bucket := &prompb.Sample{ + Value: float64(bk.Count), + Timestamp: time, + } + boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f', -1, 64) + labels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, boundStr) + addSample(tsMap, bucket, labels, mType) + + totalCount += bk.GetCount() + } + // add le=+Inf bucket + infBucket := &prompb.Sample{ + Value: float64(totalCount), + Timestamp: time, + } + infLabels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, pInfStr) + addSample(tsMap, infBucket, infLabels, mType) + } + return nil +} + +// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each +// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. +func (prwe *PrwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + + if metric.SummaryDataPoints == nil { + return fmt.Errorf("invalid metric type: wants summary points") + } + + for _, pt := range metric.SummaryDataPoints { + + time := convertTimeStamp(pt.TimeUnixNano) + mType := metric.GetMetricDescriptor().GetType() + + // sum and count of the Summary should append suffix to baseName + baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + + // treat sum as sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr) + addSample(tsMap, sum, sumlabels, mType) + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr) + addSample(tsMap, count, countlabels, mType) + + // process each percentile/quantile + for _, qt := range pt.GetPercentileValues() { + quantile := &prompb.Sample{ + Value: qt.Value, + Timestamp: time, + } + percentileStr := strconv.FormatFloat(qt.Percentile, 'f', -1, 64) + qtlabels := createLabelSet(pt.GetLabels(), nameStr, baseName, quantileStr, percentileStr) + addSample(tsMap, quantile, qtlabels, mType) + } + } + return nil } // export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order -func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { +func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { //Calls the helper function to convert the TsMap to the desired format req, err := wrapTimeSeries(tsMap) if err != nil { return err } - //Uses proto.Marshal to convert the WriteRequest into bytes array data, err := proto.Marshal(req) if err != nil { @@ -232,7 +354,7 @@ func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti line = scanner.Text() } errMsg := "server returned HTTP status " + httpResp.Status + ": " + line - return errors.New(errMsg) + return fmt.Errorf(errMsg) } return nil } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 6a15745dd89..f1683894c28 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -16,11 +16,11 @@ package prometheusremotewriteexporter import ( "context" - "fmt" "io/ioutil" "net/http" "net/http/httptest" "net/url" + "strconv" "sync" "testing" @@ -40,6 +40,8 @@ import ( "go.opentelemetry.io/collector/internal/dataold/testdataold" ) +// TODO: add bucket and histogram test cases for Test_PushMetrics + // Test_handleScalarMetric checks whether data points within a single scalar metric can be added to a map of // TimeSeries correctly. // Test cases are two data point belonging to the same TimeSeries, two data point belonging different TimeSeries, @@ -80,9 +82,9 @@ func Test_handleScalarMetric(t *testing.T) { map[string]*prompb.TimeSeries{}, }, { - "invalid_type_array", + "invalid_metric_type", &otlp.Metric{ - MetricDescriptor: getDescriptor("invalid_type_array", histogramComb, validCombinations), + MetricDescriptor: getDescriptor("invalid_nil_array", histogramComb, validCombinations), Int64DataPoints: nil, DoubleDataPoints: nil, HistogramDataPoints: nil, @@ -91,6 +93,30 @@ func Test_handleScalarMetric(t *testing.T) { true, map[string]*prompb.TimeSeries{}, }, + { + "int_nil_point", + &otlp.Metric{ + MetricDescriptor: getDescriptor("int_nil_point", monotonicInt64Comb, validCombinations), + Int64DataPoints: []*otlp.Int64DataPoint{nil}, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + map[string]*prompb.TimeSeries{}, + }, + { + "double_nil_point", + &otlp.Metric{ + MetricDescriptor: getDescriptor("double_nil_point", monotonicDoubleComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: []*otlp.DoubleDataPoint{nil}, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + map[string]*prompb.TimeSeries{}, + }, { "same_ts_int_points", &otlp.Metric{ @@ -126,7 +152,7 @@ func Test_handleScalarMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tsMap := map[string]*prompb.TimeSeries{} - prw := &prwExporter{} + prw := &PrwExporter{} ok := prw.handleScalarMetric(tsMap, tt.m) if tt.returnError { assert.Error(t, ok) @@ -142,8 +168,198 @@ func Test_handleScalarMetric(t *testing.T) { } } +// Test_handleHistogramMetric checks whether data points(sum, count, buckets) within a single Histogram metric can be +// added to a map of TimeSeries correctly. +// Test cases are a histogram data point with two buckets and nil data points case. +func Test_handleHistogramMetric(t *testing.T) { + sum := "sum" + count := "count" + bucket1 := "bucket1" + bucket2 := "bucket2" + bucketInf := "bucketInf" + histPoint := getHistogramDataPoint( + lbs1, + time1, + floatVal2, + uint64(intVal2), []float64{floatVal1, floatVal2}, + []uint64{uint64(intVal1), uint64(intVal1)}) + + // string signature of the data point is the key of the map + sigs := map[string]string{ + sum: typeHistogram + "-" + nameStr + "-" + name1 + "_sum" + lb1Sig, + count: typeHistogram + "-" + nameStr + "-" + name1 + "_count" + lb1Sig, + bucket1: typeHistogram + "-" + nameStr + "-" + name1 + "_bucket" + "-" + "le-" + + strconv.FormatFloat(floatVal1, 'f', -1, 64) + lb1Sig, + bucket2: typeHistogram + "-" + nameStr + "-" + name1 + "_bucket" + "-" + "le-" + + strconv.FormatFloat(floatVal2, 'f', -1, 64) + lb1Sig, + bucketInf: typeHistogram + "-" + nameStr + "-" + name1 + "_bucket" + "-" + "le-" + + "+Inf" + lb1Sig, + } + labels := map[string][]prompb.Label{ + sum: append(promLbs1, getPromLabels(nameStr, name1+"_sum")...), + count: append(promLbs1, getPromLabels(nameStr, name1+"_count")...), + bucket1: append(promLbs1, getPromLabels(nameStr, name1+"_bucket", "le", + strconv.FormatFloat(floatVal1, 'f', -1, 64))...), + bucket2: append(promLbs1, getPromLabels(nameStr, name1+"_bucket", "le", + strconv.FormatFloat(floatVal2, 'f', -1, 64))...), + bucketInf: append(promLbs1, getPromLabels(nameStr, name1+"_bucket", "le", + "+Inf")...), + } + tests := []struct { + name string + m otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "hist_nil_pt", + otlp.Metric{ + MetricDescriptor: getDescriptor("hist_nil_pt", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: []*otlp.HistogramDataPoint{nil}, + SummaryDataPoints: nil, + }, + false, + map[string]*prompb.TimeSeries{}, + }, + { + "single_histogram_point", + otlp.Metric{ + MetricDescriptor: getDescriptor(name1+"", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: []*otlp.HistogramDataPoint{histPoint}, + SummaryDataPoints: nil, + }, + false, + map[string]*prompb.TimeSeries{ + sigs[sum]: getTimeSeries(labels[sum], getSample(floatVal2, msTime1)), + sigs[count]: getTimeSeries(labels[count], getSample(float64(intVal2), msTime1)), + sigs[bucket1]: getTimeSeries(labels[bucket1], getSample(float64(intVal1), msTime1)), + sigs[bucket2]: getTimeSeries(labels[bucket2], getSample(float64(intVal1), msTime1)), + sigs[bucketInf]: getTimeSeries(labels[bucketInf], getSample(float64(intVal2), msTime1)), + }, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &PrwExporter{} + ok := prw.handleHistogramMetric(tsMap, &tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k], k) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_handleSummaryMetric checks whether data points(sum, count, quantiles) within a single Summary metric can be +// added to a map of TimeSeries correctly. +// Test cases are a summary data point with two quantiles and nil data points case. +func Test_handleSummaryMetric(t *testing.T) { + sum := "sum" + count := "count" + q1 := "quantile1" + q2 := "quantile2" + // string signature is the key of the map + sigs := map[string]string{ + sum: typeSummary + "-" + nameStr + "-" + name1 + "_sum" + lb1Sig, + count: typeSummary + "-" + nameStr + "-" + name1 + "_count" + lb1Sig, + q1: typeSummary + "-" + nameStr + "-" + name1 + "-" + "quantile-" + + strconv.FormatFloat(floatVal1, 'f', -1, 64) + lb1Sig, + q2: typeSummary + "-" + nameStr + "-" + name1 + "-" + "quantile-" + + strconv.FormatFloat(floatVal2, 'f', -1, 64) + lb1Sig, + } + labels := map[string][]prompb.Label{ + sum: append(promLbs1, getPromLabels(nameStr, name1+"_sum")...), + count: append(promLbs1, getPromLabels(nameStr, name1+"_count")...), + q1: append(promLbs1, getPromLabels(nameStr, name1, "quantile", + strconv.FormatFloat(floatVal1, 'f', -1, 64))...), + q2: append(promLbs1, getPromLabels(nameStr, name1, "quantile", + strconv.FormatFloat(floatVal2, 'f', -1, 64))...), + } + + summaryPoint := getSummaryDataPoint(lbs1, time1, floatVal2, uint64(intVal2), []float64{floatVal1, floatVal2}, []float64{floatVal1, floatVal1}) + + tests := []struct { + name string + m otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", summaryComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "single_summary_point", + otlp.Metric{ + MetricDescriptor: getDescriptor(name1, summaryComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: []*otlp.SummaryDataPoint{summaryPoint}, + }, + false, + map[string]*prompb.TimeSeries{ + sigs[sum]: getTimeSeries(labels[sum], getSample(floatVal2, msTime1)), + sigs[count]: getTimeSeries(labels[count], getSample(float64(intVal2), msTime1)), + sigs[q1]: getTimeSeries(labels[q1], getSample(float64(intVal1), msTime1)), + sigs[q2]: getTimeSeries(labels[q2], getSample(float64(intVal1), msTime1)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &PrwExporter{} + ok := prw.handleSummaryMetric(tsMap, &tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Equal(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k], k) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + // Test_newPrwExporter checks that a new exporter instance with non-nil fields is initialized -func Test_newPrwExporter(t *testing.T) { +func Test_NewPrwExporter(t *testing.T) { config := &Config{ ExporterSettings: configmodels.ExporterSettings{}, TimeoutSettings: exporterhelper.TimeoutSettings{}, @@ -188,7 +404,8 @@ func Test_newPrwExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - prwe, err := newPrwExporter(tt.namespace, tt.endpoint, tt.client) + prwe, err := NewPrwExporter(tt.namespace, tt.endpoint, tt.client) + if tt.returnError { assert.Error(t, err) return @@ -203,22 +420,22 @@ func Test_newPrwExporter(t *testing.T) { } } -// Test_shutdown checks after shutdown is called, incoming calls to pushMetrics return error. -func Test_shutdown(t *testing.T) { - prwe := &prwExporter{ +// Test_Shutdown checks after Shutdown is called, incoming calls to pushMetrics return error. +func Test_Shutdown(t *testing.T) { + prwe := &PrwExporter{ wg: new(sync.WaitGroup), closeChan: make(chan struct{}), } wg := new(sync.WaitGroup) errChan := make(chan error, 5) - err := prwe.shutdown(context.Background()) + err := prwe.Shutdown(context.Background()) require.NoError(t, err) errChan = make(chan error, 5) for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() - _, ok := prwe.pushMetrics(context.Background(), + _, ok := prwe.PushMetrics(context.Background(), pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataEmpty())) errChan <- ok }() @@ -263,7 +480,6 @@ func Test_export(t *testing.T) { require.NotNil(t, writeReq.GetTimeseries()) assert.Equal(t, *ts1, writeReq.GetTimeseries()[0]) w.WriteHeader(code) - fmt.Fprintf(w, "error message") } // Create in test table format to check if different HTTP response codes or server errors @@ -299,7 +515,9 @@ func Test_export(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - handleFunc(w, r, tt.httpResponseCode) + if handleFunc != nil { + handleFunc(w, r, tt.httpResponseCode) + } })) defer server.Close() serverURL, uErr := url.Parse(server.URL) @@ -324,7 +542,7 @@ func runExportPipeline(t *testing.T, ts *prompb.TimeSeries, endpoint *url.URL) e HTTPClient := http.DefaultClient //after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint - prwe, err := newPrwExporter("test", endpoint.String(), HTTPClient) + prwe, err := NewPrwExporter("test", endpoint.String(), HTTPClient) if err != nil { return err } @@ -334,39 +552,66 @@ func runExportPipeline(t *testing.T, ts *prompb.TimeSeries, endpoint *url.URL) e // Test_pushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as // expected -func Test_pushMetrics(t *testing.T) { - // fail cases - noTempBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataManyMetricsSameResource(10)) - invalidTypeBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataMetricTypeInvalid()) +func Test_PushMetrics(t *testing.T) { + + noTempBatch := pdatautil.MetricsFromOldInternalMetrics((testdataold.GenerateMetricDataManyMetricsSameResource(10))) + invalidTypeBatch := pdatautil.MetricsFromOldInternalMetrics((testdataold.GenerateMetricDataMetricTypeInvalid())) + nilDescBatch := pdatautil.MetricsFromOldInternalMetrics((testdataold.GenerateMetricDataNilMetricDescriptor())) - invalidTemp := testdataold.GenerateMetricDataManyMetricsSameResource(10) - setTemporality(&invalidTemp, otlp.MetricDescriptor_INVALID_TEMPORALITY) - invalidTempBatch := pdatautil.MetricsFromOldInternalMetrics(invalidTemp) + // 10 counter metrics, 2 points in each. Two TimeSeries in total + batch := testdataold.GenerateMetricDataManyMetricsSameResource(10) + setCumulative(&batch) + scalarBatch := pdatautil.MetricsFromOldInternalMetrics((batch)) - nilDescBatch := pdatautil.MetricsFromOldInternalMetrics(testdataold.GenerateMetricDataNilMetricDescriptor()) nilBatch1 := testdataold.GenerateMetricDataManyMetricsSameResource(10) nilBatch2 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + nilBatch3 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + nilBatch4 := testdataold.GenerateMetricDataManyMetricsSameResource(10) + nilBatch5 := testdataold.GenerateMetricDataOneEmptyResourceMetrics() + nilBatch6 := testdataold.GenerateMetricDataOneEmptyInstrumentationLibrary() + nilBatch7 := testdataold.GenerateMetricDataOneMetric() + + nilResource := dataold.MetricDataToOtlp(nilBatch5) + nilResource[0] = nil + nilResourceBatch := pdatautil.MetricsFromOldInternalMetrics(dataold.MetricDataFromOtlp(nilResource)) + + nilInstrumentation := dataold.MetricDataToOtlp(nilBatch6) + nilInstrumentation[0].InstrumentationLibraryMetrics[0] = nil + nilInstrumentationBatch := pdatautil.MetricsFromOldInternalMetrics(dataold.MetricDataFromOtlp(nilInstrumentation)) + + nilMetric := dataold.MetricDataToOtlp(nilBatch7) + nilMetric[0].InstrumentationLibraryMetrics[0].Metrics[0] = nil + nilMetricBatch := pdatautil.MetricsFromOldInternalMetrics(dataold.MetricDataFromOtlp(nilMetric)) + + setCumulative(&nilBatch1) + setCumulative(&nilBatch2) + setCumulative(&nilBatch3) + setCumulative(&nilBatch4) - setTemporality(&nilBatch1, otlp.MetricDescriptor_CUMULATIVE) - setTemporality(&nilBatch2, otlp.MetricDescriptor_CUMULATIVE) setDataPointToNil(&nilBatch1, typeMonotonicInt64) setType(&nilBatch2, typeMonotonicDouble) + setType(&nilBatch3, typeHistogram) + setType(&nilBatch4, typeSummary) - nilIntDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics(nilBatch1) - nilDoubleDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics(nilBatch2) - - // Success cases: 10 counter metrics, 2 points in each. Two TimeSeries in total - batch1 := testdataold.GenerateMetricDataManyMetricsSameResource(10) - setTemporality(&batch1, otlp.MetricDescriptor_CUMULATIVE) - scalarBatch := pdatautil.MetricsFromOldInternalMetrics(batch1) + nilIntDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics((nilBatch1)) + nilDoubleDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics((nilBatch2)) + nilHistogramDataPointsBatch := pdatautil.MetricsFromOldInternalMetrics((nilBatch3)) - // Partial Success cases - batch2 := testdataold.GenerateMetricDataManyMetricsSameResource(10) - setTemporality(&batch2, otlp.MetricDescriptor_CUMULATIVE) - failDesc := dataold.MetricDataToOtlp(batch2)[0].InstrumentationLibraryMetrics[0].Metrics[0].GetMetricDescriptor() - failDesc.Temporality = otlp.MetricDescriptor_INVALID_TEMPORALITY - partialBatch := pdatautil.MetricsFromOldInternalMetrics(batch2) + hist := dataold.MetricDataToOtlp(testdataold.GenerateMetricDataOneMetric()) + hist[0].InstrumentationLibraryMetrics[0].Metrics[0] = &otlp.Metric{ + MetricDescriptor: getDescriptor("hist_test", histogramComb, validCombinations), + HistogramDataPoints: []*otlp.HistogramDataPoint{getHistogramDataPoint( + lbs1, + time1, + floatVal1, + uint64(intVal1), + []float64{floatVal1}, + []uint64{uint64(intVal1)}, + ), + }, + } + histBatch := pdatautil.MetricsFromOldInternalMetrics((dataold.MetricDataFromOtlp(hist))) checkFunc := func(t *testing.T, r *http.Request, expected int) { body, err := ioutil.ReadAll(r.Body) if err != nil { @@ -385,11 +630,25 @@ func Test_pushMetrics(t *testing.T) { assert.EqualValues(t, expected, len(wr.Timeseries)) } + summary := dataold.MetricDataToOtlp(testdataold.GenerateMetricDataOneMetric()) + summary[0].InstrumentationLibraryMetrics[0].Metrics[0] = &otlp.Metric{ + MetricDescriptor: getDescriptor("summary_test", summaryComb, validCombinations), + SummaryDataPoints: []*otlp.SummaryDataPoint{getSummaryDataPoint( + lbs1, + time1, + floatVal1, + uint64(intVal1), + []float64{floatVal1}, + []float64{floatVal2}, + ), + }, + } + summaryBatch := pdatautil.MetricsFromOldInternalMetrics(dataold.MetricDataFromOtlp(summary)) tests := []struct { name string md *pdata.Metrics reqTestFunc func(t *testing.T, r *http.Request, expected int) - expectedTimeSeries int + expected int httpResponseCode int numDroppedTimeSeries int returnErr bool @@ -404,21 +663,39 @@ func Test_pushMetrics(t *testing.T) { true, }, { - "invalid_temporality_case", - &invalidTempBatch, + "nil_desc_case", + &nilDescBatch, nil, 0, http.StatusAccepted, - pdatautil.MetricCount(invalidTempBatch), + pdatautil.MetricCount(nilDescBatch), true, }, { - "nil_desc_case", - &nilDescBatch, + "nil_resourece_case", + &nilResourceBatch, nil, 0, http.StatusAccepted, - pdatautil.MetricCount(nilDescBatch), + pdatautil.MetricCount(nilResourceBatch), + false, + }, + { + "nil_instrumentation_case", + &nilInstrumentationBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilInstrumentationBatch), + false, + }, + { + "nil_metric_case", + &nilMetricBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilMetricBatch), true, }, { @@ -439,6 +716,24 @@ func Test_pushMetrics(t *testing.T) { pdatautil.MetricCount(nilDoubleDataPointsBatch), true, }, + { + "nil_histogram_point_case", + &nilHistogramDataPointsBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilHistogramDataPointsBatch), + true, + }, + { + "nil_histogram_point_case", + &nilHistogramDataPointsBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilHistogramDataPointsBatch), + true, + }, { "no_temp_case", &noTempBatch, @@ -466,14 +761,21 @@ func Test_pushMetrics(t *testing.T) { 0, false, }, - { - "partial_success_case", - &partialBatch, + {"histogram_case", + &histBatch, checkFunc, - 2, + 4, http.StatusAccepted, - 1, - true, + 0, + false, + }, + {"summary_case", + &summaryBatch, + checkFunc, + 3, + http.StatusAccepted, + 0, + false, }, } @@ -481,7 +783,7 @@ func Test_pushMetrics(t *testing.T) { t.Run(tt.name, func(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if tt.reqTestFunc != nil { - tt.reqTestFunc(t, r, tt.expectedTimeSeries) + tt.reqTestFunc(t, r, tt.expected) } w.WriteHeader(tt.httpResponseCode) })) @@ -497,6 +799,7 @@ func Test_pushMetrics(t *testing.T) { NameVal: "prometheusremotewrite", }, Namespace: "", + HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "http://some.url:9411/api/prom/push", // We almost read 0 bytes, so no need to tune ReadBufferSize. @@ -508,9 +811,9 @@ func Test_pushMetrics(t *testing.T) { // c, err := config.HTTPClientSettings.ToClient() // assert.Nil(t, err) c := http.DefaultClient - prwe, nErr := newPrwExporter(config.Namespace, serverURL.String(), c) + prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c) require.NoError(t, nErr) - numDroppedTimeSeries, err := prwe.pushMetrics(context.Background(), *tt.md) + numDroppedTimeSeries, err := prwe.PushMetrics(context.Background(), *tt.md) assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) if tt.returnErr { assert.Error(t, err) diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 007128d239d..378ec7f9dfb 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -50,7 +50,7 @@ func createMetricsExporter(_ context.Context, _ component.ExporterCreateParams, return nil, err } - prwe, err := newPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client) + prwe, err := NewPrwExporter(prwCfg.Namespace, prwCfg.HTTPClientSettings.Endpoint, client) if err != nil { return nil, err @@ -58,13 +58,12 @@ func createMetricsExporter(_ context.Context, _ component.ExporterCreateParams, prwexp, err := exporterhelper.NewMetricsExporter( cfg, - prwe.pushMetrics, + prwe.PushMetrics, exporterhelper.WithTimeout(prwCfg.TimeoutSettings), exporterhelper.WithQueue(prwCfg.QueueSettings), exporterhelper.WithRetry(prwCfg.RetrySettings), - exporterhelper.WithShutdown(prwe.shutdown), + exporterhelper.WithShutdown(prwe.Shutdown), ) - return prwexp, err } @@ -72,7 +71,6 @@ func createDefaultConfig() configmodels.Exporter { // TODO: Enable the queued settings. qs := exporterhelper.CreateDefaultQueueSettings() qs.Enabled = false - return &Config{ ExporterSettings: configmodels.ExporterSettings{ TypeVal: typeStr, @@ -81,8 +79,10 @@ func createDefaultConfig() configmodels.Exporter { Namespace: "", TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), - RetrySettings: exporterhelper.CreateDefaultRetrySettings(), - QueueSettings: qs, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: false, + }, + QueueSettings: qs, HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "http://some.url:9411/api/prom/push", // We almost read 0 bytes, so no need to tune ReadBufferSize. diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index 21a8efe9ab1..aea95e68afb 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -15,13 +15,13 @@ package prometheusremotewriteexporter import ( + "errors" "log" "sort" "strings" "time" "unicode" - "github.com/pkg/errors" "github.com/prometheus/prometheus/prompb" common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" @@ -29,10 +29,16 @@ import ( ) const ( - nameStr = "__name__" - totalStr = "total" - delimeter = "_" - keyStr = "key" + nameStr = "__name__" + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + totalStr = "total" + delimeter = "_" + keyStr = "key" ) // ByLabelName enables the usage of sort.Sort() with a slice of labels @@ -186,7 +192,7 @@ func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string { // and creates a WriteRequest from the struct -- can move to the helper.go file func wrapTimeSeries(tsMap map[string]*prompb.TimeSeries) (*prompb.WriteRequest, error) { if len(tsMap) == 0 { - return nil, errors.Errorf("invalid tsMap: cannot be empty map") + return nil, errors.New("invalid TsMap: cannot be empty map") } TsArray := []prompb.TimeSeries{} for _, v := range tsMap { diff --git a/exporter/prometheusremotewriteexporter/testdata/config.yaml b/exporter/prometheusremotewriteexporter/testdata/config.yaml index d18d81855bf..958fa9a7edf 100644 --- a/exporter/prometheusremotewriteexporter/testdata/config.yaml +++ b/exporter/prometheusremotewriteexporter/testdata/config.yaml @@ -23,7 +23,6 @@ exporters: headers: Prometheus-Remote-Write-Version: "0.1.0" X-Scope-OrgID: 234 - service: pipelines: metrics: diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go index be18b56553c..a583ede0820 100644 --- a/exporter/prometheusremotewriteexporter/testutil_test.go +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -156,6 +156,43 @@ func getDoubleDataPoint(labels []*commonpb.StringKeyValue, value float64, ts uin } } +func getHistogramDataPoint(labels []*commonpb.StringKeyValue, ts uint64, sum float64, count uint64, bounds []float64, buckets []uint64) *otlp.HistogramDataPoint { + bks := []*otlp.HistogramDataPoint_Bucket{} + for _, c := range buckets { + bks = append(bks, &otlp.HistogramDataPoint_Bucket{ + Count: c, + Exemplar: nil, + }) + } + return &otlp.HistogramDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Count: count, + Sum: sum, + Buckets: bks, + ExplicitBounds: bounds, + } +} + +func getSummaryDataPoint(labels []*commonpb.StringKeyValue, ts uint64, sum float64, count uint64, pcts []float64, values []float64) *otlp.SummaryDataPoint { + pcs := []*otlp.SummaryDataPoint_ValueAtPercentile{} + for i, v := range values { + pcs = append(pcs, &otlp.SummaryDataPoint_ValueAtPercentile{ + Percentile: pcts[i], + Value: v, + }) + } + return &otlp.SummaryDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Count: count, + Sum: sum, + PercentileValues: pcs, + } +} + // Prometheus TimeSeries func getPromLabels(lbs ...string) []prompb.Label { pbLbs := prompb.Labels{ @@ -188,18 +225,16 @@ func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) *prompb.Time } } -//setCumulative is for creating the dataold.MetricData to test with -func setTemporality(metricsData *dataold.MetricData, temp otlp.MetricDescriptor_Temporality) { +func setCumulative(metricsData *dataold.MetricData) { for _, r := range dataold.MetricDataToOtlp(*metricsData) { for _, instMetrics := range r.InstrumentationLibraryMetrics { for _, m := range instMetrics.Metrics { - m.MetricDescriptor.Temporality = temp + m.MetricDescriptor.Temporality = otlp.MetricDescriptor_CUMULATIVE } } } } -//setDataPointToNil is for creating the dataold.MetricData to test with func setDataPointToNil(metricsData *dataold.MetricData, dataField string) { for _, r := range dataold.MetricDataToOtlp(*metricsData) { for _, instMetrics := range r.InstrumentationLibraryMetrics { @@ -219,7 +254,6 @@ func setDataPointToNil(metricsData *dataold.MetricData, dataField string) { } } -//setType is for creating the dataold.MetricData to test with func setType(metricsData *dataold.MetricData, dataField string) { for _, r := range dataold.MetricDataToOtlp(*metricsData) { for _, instMetrics := range r.InstrumentationLibraryMetrics { diff --git a/go.mod b/go.mod index 36ad210e8e0..dcdc03abbb9 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/orijtech/prometheus-go-metrics-exporter v0.0.5 github.com/ory/go-acc v0.2.5 github.com/pavius/impi v0.0.3 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 github.com/prometheus/common v0.11.1 github.com/prometheus/prometheus v1.8.2-0.20200626085723-c448ada63d83 diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index 6a90913e6c5..93c509c1ecb 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/exporter/opencensusexporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/exporter/prometheusexporter" + "go.opentelemetry.io/collector/exporter/prometheusremotewriteexporter" "go.opentelemetry.io/collector/exporter/zipkinexporter" "go.opentelemetry.io/collector/extension/fluentbitextension" "go.opentelemetry.io/collector/extension/healthcheckextension" @@ -85,6 +86,7 @@ func Components() ( exporters, err := component.MakeExporterFactoryMap( opencensusexporter.NewFactory(), prometheusexporter.NewFactory(), + prometheusremotewriteexporter.NewFactory(), loggingexporter.NewFactory(), zipkinexporter.NewFactory(), jaegerexporter.NewFactory(), diff --git a/service/defaultcomponents/defaults_test.go b/service/defaultcomponents/defaults_test.go index 65af9b0e6f7..ab4b0e96d48 100644 --- a/service/defaultcomponents/defaults_test.go +++ b/service/defaultcomponents/defaults_test.go @@ -57,6 +57,7 @@ func TestDefaultComponents(t *testing.T) { expectedExporters := []configmodels.Type{ "opencensus", "prometheus", + "prometheusremotewrite", "logging", "zipkin", "jaeger",