Skip to content

Commit

Permalink
add conversion from Int64 and Double OTLP metrics for Promehteus Remo…
Browse files Browse the repository at this point in the history
…te Write Exporter (#1577)

add prometheus remote write exporter

address lint issues

improve test coverage

fix lint error

add prometheus remote write exporter to default components

change metric name label

add conversion from ns to ms

rename tests

add attribute to label functionality in exporter.go

format code

resolve conflicts with master

switch to fmt.Errorf

add conversion from Int64 and Double OTLP metrics

add prometheus remote write exporter

address lint issues

improve test coverage

fix lint error

add prometheus remote write exporter to default components

change metric name label

add conversion from ns to ms

rename tests

add attribute to label functionality in exporter.go

format code

resolve conflicts with master

add check for nil

change data format to dataold
  • Loading branch information
huyan0 authored Aug 28, 2020
1 parent 42dd4c9 commit 03faf67
Show file tree
Hide file tree
Showing 5 changed files with 694 additions and 18 deletions.
170 changes: 168 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,13 +16,24 @@
package prometheusremotewriteexporter

import (
"bufio"
"bytes"
"context"
"errors"
"io"
"net/http"
"net/url"
"strings"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1old"
"go.opentelemetry.io/collector/internal/dataold"
)

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
Expand Down Expand Up @@ -68,5 +79,160 @@ func (prwe *prwExporter) shutdown(context.Context) error {
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
return 0, nil
prwe.wg.Add(1)
defer prwe.wg.Done()
select {
case <-prwe.closeChan:
return pdatautil.MetricCount(md), errors.New("shutdown has been called")
default:
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
errs := []string{}

resourceMetrics := dataold.MetricDataToOtlp(pdatautil.MetricsToOldInternalMetrics(md))
for _, resourceMetric := range resourceMetrics {
if resourceMetric == nil {
continue
}
// TODO: add resource attributes as labels, probably in next PR
for _, instrumentationMetrics := range resourceMetric.InstrumentationLibraryMetrics {
if instrumentationMetrics == nil {
continue
}
// TODO: decide if instrumentation library information should be exported as labels
for _, metric := range instrumentationMetrics.Metrics {
if metric == nil {
continue
}
// check for valid type and temporality combination
if ok := validateMetrics(metric.MetricDescriptor); !ok {
dropped++
errs = append(errs, "invalid temporality and type combination")
continue
}
// handle individual metric based on type
switch metric.GetMetricDescriptor().GetType() {
case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64,
otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if err := prwe.handleScalarMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err.Error())
}
}
}
}
}

if err := prwe.export(ctx, tsMap); err != nil {
return pdatautil.MetricCount(md), err
}

if dropped != 0 {
return dropped, errors.New(strings.Join(errs, "\n"))
}

return 0, nil
}
}

// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {

mType := metric.MetricDescriptor.Type

switch mType {
// int points
case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64:
if metric.Int64DataPoints == nil {
return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name)
}

for _, pt := range metric.Int64DataPoints {

// create parameters for addSample
name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)
labels := createLabelSet(pt.GetLabels(), nameStr, name)
sample := &prompb.Sample{
Value: float64(pt.Value),
// convert ns to ms
Timestamp: convertTimeStamp(pt.TimeUnixNano),
}

addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType())
}
return nil

// double points
case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if metric.DoubleDataPoints == nil {
return errors.New("nil data point field in metric" + metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.DoubleDataPoints {

// create parameters for addSample
name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace)
labels := createLabelSet(pt.GetLabels(), nameStr, name)
sample := &prompb.Sample{
Value: pt.Value,
Timestamp: convertTimeStamp(pt.TimeUnixNano),
}

addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType())
}
return nil
}

return errors.New("invalid metric type: wants int or double data points")
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
//Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
if err != nil {
return err
}

//Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
if err != nil {
return err
}
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)

//Create the HTTP POST request to send to the endpoint
httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return err
}

// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

httpReq = httpReq.WithContext(ctx)

_, cancel := context.WithTimeout(context.Background(), prwe.client.Timeout)
defer cancel()

httpResp, err := prwe.client.Do(httpReq)
if err != nil {
return err
}

if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
errMsg := "server returned HTTP status " + httpResp.Status + ": " + line
return errors.New(errMsg)
}
return nil
}
Loading

0 comments on commit 03faf67

Please sign in to comment.