Skip to content

Commit

Permalink
Remove internal protos usage in Prometheusremotewrite exporter (#3184)
Browse files Browse the repository at this point in the history
  • Loading branch information
sincejune authored May 14, 2021
1 parent 9ca82fe commit a96e010
Show file tree
Hide file tree
Showing 5 changed files with 532 additions and 945 deletions.
126 changes: 64 additions & 62 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
otlp "go.opentelemetry.io/collector/internal/data/protogen/metrics/v1"
resourcev1 "go.opentelemetry.io/collector/internal/data/protogen/resource/v1"
)

const (
Expand Down Expand Up @@ -108,24 +105,20 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
var errs []error
resourceMetrics := internal.MetricsToOtlp(md.InternalRep()).ResourceMetrics
for _, resourceMetric := range resourceMetrics {
if resourceMetric == nil {
continue
}

resource := resourceMetric.Resource
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
instrumentationLibraryMetricsSlice := resourceMetrics.InstrumentationLibraryMetrics()
// TODO: add resource attributes as labels, probably in next PR
for _, instrumentationMetrics := range resourceMetric.InstrumentationLibraryMetrics {
if instrumentationMetrics == nil {
continue
}
for j := 0; j < instrumentationLibraryMetricsSlice.Len(); j++ {
instrumentationLibraryMetrics := instrumentationLibraryMetricsSlice.At(j)
metricSlice := instrumentationLibraryMetrics.Metrics()

// TODO: decide if instrumentation library information should be exported as labels
for _, metric := range instrumentationMetrics.Metrics {
if metric == nil {
dropped++
continue
}
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)

// check for valid type and temporality combination and for matching data field and type
if ok := validateMetrics(metric); !ok {
dropped++
Expand All @@ -134,18 +127,18 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro
}

// handle individual metric based on type
switch metric.Data.(type) {
case *otlp.Metric_DoubleSum, *otlp.Metric_IntSum, *otlp.Metric_DoubleGauge, *otlp.Metric_IntGauge:
switch metric.DataType() {
case pdata.MetricDataTypeDoubleSum, pdata.MetricDataTypeIntSum, pdata.MetricDataTypeDoubleGauge, pdata.MetricDataTypeIntGauge:
if err := prwe.handleScalarMetric(tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, consumererror.Permanent(err))
}
case *otlp.Metric_DoubleHistogram, *otlp.Metric_IntHistogram:
case pdata.MetricDataTypeHistogram, pdata.MetricDataTypeIntHistogram:
if err := prwe.handleHistogramMetric(tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, consumererror.Permanent(err))
}
case *otlp.Metric_DoubleSummary:
case pdata.MetricDataTypeSummary:
if err := prwe.handleSummaryMetric(tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, consumererror.Permanent(err))
Expand Down Expand Up @@ -193,36 +186,42 @@ func validateAndSanitizeExternalLabels(externalLabels map[string]string) (map[st
// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, resource resourcev1.Resource, metric *otlp.Metric) error {
switch metric.Data.(type) {
func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
switch metric.DataType() {
// int points
case *otlp.Metric_DoubleGauge:
if metric.GetDoubleGauge().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
case pdata.MetricDataTypeDoubleGauge:
dataPoints := metric.DoubleGauge().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetDoubleGauge().GetDataPoints() {
addSingleDoubleDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)

for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case *otlp.Metric_IntGauge:
if metric.GetIntGauge().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
case pdata.MetricDataTypeIntGauge:
dataPoints := metric.IntGauge().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetIntGauge().GetDataPoints() {
addSingleIntDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for i := 0; i < dataPoints.Len(); i++ {
addSingleIntDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case *otlp.Metric_DoubleSum:
if metric.GetDoubleSum().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
case pdata.MetricDataTypeDoubleSum:
dataPoints := metric.DoubleSum().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetDoubleSum().GetDataPoints() {
addSingleDoubleDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)

}
case *otlp.Metric_IntSum:
if metric.GetIntSum().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
case pdata.MetricDataTypeIntSum:
dataPoints := metric.IntSum().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetIntSum().GetDataPoints() {
addSingleIntDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for i := 0; i < dataPoints.Len(); i++ {
addSingleIntDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
}
return nil
Expand All @@ -231,21 +230,23 @@ func (prwe *PrwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries,
// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, resource resourcev1.Resource, metric *otlp.Metric) error {
switch metric.Data.(type) {
case *otlp.Metric_IntHistogram:
if metric.GetIntHistogram().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
switch metric.DataType() {
case pdata.MetricDataTypeIntHistogram:
dataPoints := metric.IntHistogram().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetIntHistogram().GetDataPoints() {
addSingleIntHistogramDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for i := 0; i < dataPoints.Len(); i++ {
addSingleIntHistogramDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case *otlp.Metric_DoubleHistogram:
if metric.GetDoubleHistogram().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
case pdata.MetricDataTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetDoubleHistogram().GetDataPoints() {
addSingleDoubleHistogramDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleHistogramDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
}
return nil
Expand All @@ -254,12 +255,13 @@ func (prwe *PrwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeri
// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each
// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PrwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, resource resourcev1.Resource, metric *otlp.Metric) error {
if metric.GetDoubleSummary().GetDataPoints() == nil {
return fmt.Errorf("nil data point. %s is dropped", metric.GetName())
func (prwe *PrwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for _, pt := range metric.GetDoubleSummary().GetDataPoints() {
addSingleDoubleSummaryDataPoint(pt, resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleSummaryDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
return nil
}
Expand Down
Loading

0 comments on commit a96e010

Please sign in to comment.