Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus Remote Write Exporter supporting Cortex - conversion and export for Summary OTLP metrics #1649

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions exporter/prometheusremotewriteexporter/DESIGN.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


# **OpenTelemetry Collector Prometheus Remote Write/Cortex Exporter Design**

Authors: @huyan0, @danielbang907
Expand Down
6 changes: 3 additions & 3 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ Example:

```yaml
exporters:
prometheusremotewrite:
endpoint: "http://some.url:9411/api/prom/push"
prometheusremotewrite:
endpoint: "http://some.url:9411/api/prom/push"
```
The full list of settings exposed for this exporter are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).

_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_
_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_
1 change: 0 additions & 1 deletion exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func Test_loadConfig(t *testing.T) {
MaxElapsedTime: 10 * time.Minute,
},
Namespace: "test-space",

HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "localhost:8888",
TLSSetting: configtls.TLSClientSetting{
Expand Down
182 changes: 152 additions & 30 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,60 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Note: implementation for this class is in a separate PR
package prometheusremotewriteexporter

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"strconv"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old"
"go.opentelemetry.io/collector/internal/dataold"
)

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
type prwExporter struct {
// PrwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
type PrwExporter struct {
namespace string
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
}

// newPrwExporter initializes a new prwExporter instance and sets fields accordingly.
// newPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func newPrwExporter(namespace string, endpoint string, client *http.Client) (*prwExporter, error) {
func NewPrwExporter(namespace string, endpoint string, client *http.Client) (*PrwExporter, error) {

if client == nil {
return nil, errors.New("http client cannot be nil")
return nil, fmt.Errorf("http client cannot be nil")
}

endpointURL, err := url.ParseRequestURI(endpoint)
if err != nil {
return nil, errors.New("invalid endpoint")
return nil, fmt.Errorf("invalid endpoint")
}

return &prwExporter{
return &PrwExporter{
namespace: namespace,
endpointURL: endpointURL,
client: client,
Expand All @@ -69,7 +68,7 @@ func newPrwExporter(namespace string, endpoint string, client *http.Client) (*pr

// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *prwExporter) shutdown(context.Context) error {
func (prwe *PrwExporter) Shutdown(context.Context) error {
close(prwe.closeChan)
prwe.wg.Wait()
return nil
Expand All @@ -78,16 +77,16 @@ func (prwe *prwExporter) shutdown(context.Context) error {
// pushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
prwe.wg.Add(1)
defer prwe.wg.Done()
select {
case <-prwe.closeChan:
return pdatautil.MetricCount(md), errors.New("shutdown has been called")
return pdatautil.MetricCount(md), fmt.Errorf("shutdown has been called")
default:
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
errs := []string{}
errs := []error{}

resourceMetrics := dataold.MetricDataToOtlp(pdatautil.MetricsToOldInternalMetrics(md))
for _, resourceMetric := range resourceMetrics {
Expand All @@ -107,7 +106,7 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
// check for valid type and temporality combination
if ok := validateMetrics(metric.MetricDescriptor); !ok {
dropped++
errs = append(errs, "invalid temporality and type combination")
errs = append(errs, fmt.Errorf("invalid temporality and type combination"))
continue
}
// handle individual metric based on type
Expand All @@ -116,19 +115,33 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if err := prwe.handleScalarMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err.Error())
errs = append(errs, err)
}
case otlp.MetricDescriptor_HISTOGRAM:
if err := prwe.handleHistogramMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err)
}
case otlp.MetricDescriptor_SUMMARY:
if err := prwe.handleSummaryMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err)
}
default:
dropped++
errs = append(errs, fmt.Errorf("unsupported metric type"))
}
}
}
}

if err := prwe.export(ctx, tsMap); err != nil {
return pdatautil.MetricCount(md), err
dropped = pdatautil.MetricCount(md)
errs = append(errs, err)
}

if dropped != 0 {
return dropped, errors.New(strings.Join(errs, "\n"))
return dropped, componenterror.CombineErrors(errs)
}

return 0, nil
Expand All @@ -138,19 +151,20 @@ func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int
// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
mType := metric.MetricDescriptor.Type

switch mType {
// int points
case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64:
if metric.Int64DataPoints == nil {
return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name)
return fmt.Errorf("nil data point field in metric %v", metric.GetMetricDescriptor().Name)
}

for _, pt := range metric.Int64DataPoints {

if pt == nil {
continue
}
// create parameters for addSample
name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)
labels := createLabelSet(pt.GetLabels(), nameStr, name)
Expand All @@ -167,10 +181,12 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
// double points
case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if metric.DoubleDataPoints == nil {
return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name)
return fmt.Errorf("nil data point field in metric %v", metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.DoubleDataPoints {

if pt == nil {
continue
}
// create parameters for addSample
name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)
labels := createLabelSet(pt.GetLabels(), nameStr, name)
Expand All @@ -183,18 +199,124 @@ func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
}
return nil
}
return fmt.Errorf("invalid metric type: wants int or double data points")
}

// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

if metric.HistogramDataPoints == nil {
return fmt.Errorf("invalid metric type: wants histogram points")
}

for _, pt := range metric.HistogramDataPoints {
if pt == nil {
continue
}
time := convertTimeStamp(pt.TimeUnixNano)
mType := metric.GetMetricDescriptor().GetType()

// sum, count, and buckets of the histogram should append suffix to baseName
baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)

// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, mType)

// treat count as a sample in an individual TimeSeries
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, mType)

return errors.New("invalid metric type: wants int or double data points")
// count for +Inf bound
var totalCount uint64

// process each bucket
for le, bk := range pt.GetBuckets() {
bucket := &prompb.Sample{
Value: float64(bk.Count),
Timestamp: time,
}
boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f', -1, 64)
labels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, boundStr)
addSample(tsMap, bucket, labels, mType)

totalCount += bk.GetCount()
}
// add le=+Inf bucket
infBucket := &prompb.Sample{
Value: float64(totalCount),
Timestamp: time,
}
infLabels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, pInfStr)
addSample(tsMap, infBucket, infLabels, mType)
}
return nil
}

// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each
// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PrwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

if metric.SummaryDataPoints == nil {
return fmt.Errorf("invalid metric type: wants summary points")
}

for _, pt := range metric.SummaryDataPoints {

time := convertTimeStamp(pt.TimeUnixNano)
mType := metric.GetMetricDescriptor().GetType()

// sum and count of the Summary should append suffix to baseName
baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)

// treat sum as sample in an individual TimeSeries
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, mType)

// treat count as a sample in an individual TimeSeries
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, mType)

// process each percentile/quantile
for _, qt := range pt.GetPercentileValues() {
quantile := &prompb.Sample{
Value: qt.Value,
Timestamp: time,
}
percentileStr := strconv.FormatFloat(qt.Percentile, 'f', -1, 64)
qtlabels := createLabelSet(pt.GetLabels(), nameStr, baseName, quantileStr, percentileStr)
addSample(tsMap, quantile, qtlabels, mType)
}
}
return nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
//Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
if err != nil {
return err
}

//Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
if err != nil {
Expand Down Expand Up @@ -232,7 +354,7 @@ func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
line = scanner.Text()
}
errMsg := "server returned HTTP status " + httpResp.Status + ": " + line
return errors.New(errMsg)
return fmt.Errorf(errMsg)
}
return nil
}
Loading