Skip to content

Commit

Permalink
[prometheusremotewriteexporter] Support conversion of OTel Exponentia…
Browse files Browse the repository at this point in the history
…l Histogram to Prometheus Native Histogram (#17370)

Support conversion of OTel Exponential Histogram to Prometheus Native Histogram.

pmetric.ExponentialHistogramDataPoint translates to prompb.Histogram as follows:

- Exp. Scale -> Nat. Schema: histogram is dropped if Scale -4 < and > 8.
- TODO: We can potentially downscale hist to 8 if Scale > 8? (as a separate PR)
- Exp. Count -> Nat. Count: if Flags().NoRecordedValue() is false
- Exp. Sum_ -> Nat. Sum: if it Sum is set and Flags().NoRecordedValue() is false
- Exp. TimeUnixNano -> Nat. Timestamp: TimeUnixNano ns converted Timestamp ms
- Exp. ZeroCount -> Nat. ZeroCount
- Nat. ZeroThreshold is set to the default value 1e-128, once opentelemetry-proto#441 is merged, we can use Exp. ZeroThreshold to set the value.
- Exp. Positive -> Nat. PositiveSpans and Nat. PositiveDeltas.
- Exp. Negative -> Nat. NegativeSpans and Nat. NegativeDeltas.
- Exp. Min_ and Max_ are not used.
- Exp. StartTimeUnixNano is not used.

Exp. Exemplars, Labels, and ExponentialHistogramDataPoint will be translated to native hist data structures and will make up one or more prompb.TimeSeries.

Link to tracking Issue:
Resolves #16207

Testing:
Unit tests in pkg/transalator/prometheusremote and exporter/prometheusremotewriteexporter
  • Loading branch information
kovrus authored Jan 12, 2023
1 parent 78bf8a0 commit 9c08b92
Show file tree
Hide file tree
Showing 8 changed files with 701 additions and 9 deletions.
16 changes: 16 additions & 0 deletions .chloggen/exponetial-to-prom-native-histogram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for converting OTLP Exponential Histograms to Prometheus Native Histograms

# One or more tracking issues related to the change
issues: [16207]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
43 changes: 37 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"runtime"
"sync"
"testing"

Expand Down Expand Up @@ -384,10 +383,23 @@ func Test_PushMetrics(t *testing.T) {

doubleGaugeBatch := getMetricsFromMetricList(validMetrics1[validDoubleGauge], validMetrics2[validDoubleGauge])

histogramBatch := getMetricsFromMetricList(validMetrics1[validHistogram], validMetrics2[validHistogram])
expHistogramBatch := getMetricsFromMetricList(
getExpHistogramMetric("exponential_hist", lbs1, time1, &floatVal1, uint64(2), 2, []uint64{1, 1}),
getExpHistogramMetric("exponential_hist", lbs2, time2, &floatVal2, uint64(2), 0, []uint64{2, 2}),
)
emptyExponentialHistogramBatch := getMetricsFromMetricList(
getExpHistogramMetric("empty_exponential_hist", lbs1, time1, &floatValZero, uint64(0), 0, []uint64{}),
getExpHistogramMetric("empty_exponential_hist", lbs1, time1, &floatValZero, uint64(0), 1, []uint64{}),
getExpHistogramMetric("empty_exponential_hist", lbs2, time2, &floatValZero, uint64(0), 0, []uint64{}),
getExpHistogramMetric("empty_exponential_hist_two", lbs2, time2, &floatValZero, uint64(0), 0, []uint64{}),
)
exponentialNoSumHistogramBatch := getMetricsFromMetricList(
getExpHistogramMetric("no_sum_exponential_hist", lbs1, time1, nil, uint64(2), 0, []uint64{1, 1}),
getExpHistogramMetric("no_sum_exponential_hist", lbs1, time2, nil, uint64(2), 0, []uint64{2, 2}),
)

histogramBatch := getMetricsFromMetricList(validMetrics1[validHistogram], validMetrics2[validHistogram])
emptyDataPointHistogramBatch := getMetricsFromMetricList(validMetrics1[validEmptyHistogram], validMetrics2[validEmptyHistogram])

histogramNoSumBatch := getMetricsFromMetricList(validMetrics1[validHistogramNoSum], validMetrics2[validHistogramNoSum])

summaryBatch := getMetricsFromMetricList(validMetrics1[validSummary], validMetrics2[validSummary])
Expand Down Expand Up @@ -484,6 +496,27 @@ func Test_PushMetrics(t *testing.T) {
expectedTimeSeries: 2,
httpResponseCode: http.StatusAccepted,
},
{
name: "exponential_histogram_case",
metrics: expHistogramBatch,
reqTestFunc: checkFunc,
expectedTimeSeries: 2,
httpResponseCode: http.StatusAccepted,
},
{
name: "valid_empty_exponential_histogram_case",
metrics: emptyExponentialHistogramBatch,
reqTestFunc: checkFunc,
expectedTimeSeries: 3,
httpResponseCode: http.StatusAccepted,
},
{
name: "exponential_histogram_no_sum_case",
metrics: exponentialNoSumHistogramBatch,
reqTestFunc: checkFunc,
expectedTimeSeries: 1,
httpResponseCode: http.StatusAccepted,
},
{
name: "histogram_case",
metrics: histogramBatch,
Expand Down Expand Up @@ -798,9 +831,7 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) {
// and that we can retrieve those exact requests back from the WAL, when the
// exporter starts up once again, that it picks up where it left off.
func TestWALOnExporterRoundTrip(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10142")
}
t.Skip("skipping test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10142")
if testing.Short() {
t.Skip("This test could run for long")
}
Expand Down
29 changes: 29 additions & 0 deletions exporter/prometheusremotewriteexporter/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,35 @@ func getHistogramMetricEmptyDataPoint(name string, attributes pcommon.Map, ts ui
return metric
}

func getExpHistogramMetric(
name string,
attributes pcommon.Map,
ts uint64,
sum *float64,
count uint64,
offset int32,
bucketCounts []uint64,
) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := metric.ExponentialHistogram().DataPoints().AppendEmpty()
if strings.HasPrefix(name, "staleNaN") {
dp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
}
dp.SetCount(count)

if sum != nil {
dp.SetSum(*sum)
}
dp.Positive().SetOffset(offset)
dp.Positive().BucketCounts().FromRaw(bucketCounts)
attributes.CopyTo(dp.Attributes())

dp.SetTimestamp(pcommon.Timestamp(ts))
return metric
}

func getHistogramMetric(name string, attributes pcommon.Map, ts uint64, sum *float64, count uint64, bounds []float64,
buckets []uint64) pmetric.Metric {
metric := pmetric.NewMetric()
Expand Down
16 changes: 14 additions & 2 deletions pkg/translator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool {
return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
case pmetric.MetricTypeHistogram:
return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
case pmetric.MetricTypeExponentialHistogram:
return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
}
return false
}
Expand Down Expand Up @@ -315,7 +317,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon
// cumulative count for conversion to cumulative histogram
var cumulativeCount uint64

promExemplars := getPromExemplars(pt)
promExemplars := getPromExemplars[pmetric.HistogramDataPoint](pt)

var bucketBounds []bucketBoundsData

Expand Down Expand Up @@ -352,7 +354,12 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon
addExemplars(tsMap, promExemplars, bucketBounds)
}

func getPromExemplars(pt pmetric.HistogramDataPoint) []prompb.Exemplar {
type exemplarType interface {
pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint
Exemplars() pmetric.ExemplarSlice
}

func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
var promExemplars []prompb.Exemplar

for i := 0; i < pt.Exemplars().Len(); i++ {
Expand Down Expand Up @@ -427,6 +434,11 @@ func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp {
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
Expand Down
20 changes: 20 additions & 0 deletions pkg/translator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ func Test_isValidAggregationTemporality(t *testing.T) {
"", l, pmetric.AggregationTemporalityCumulative, 0, 0, 0, []float64{}, []uint64{}),
want: true,
},
{
name: "cumulative exponential histogram",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
h := metric.SetEmptyExponentialHistogram()
h.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
return metric
}(),
want: true,
},
{
name: "missing type",
metric: pmetric.NewMetric(),
Expand All @@ -83,6 +93,16 @@ func Test_isValidAggregationTemporality(t *testing.T) {
"", l, pmetric.AggregationTemporalityDelta, 0, 0, 0, []float64{}, []uint64{}),
want: false,
},
{
name: "delta exponential histogram",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
h := metric.SetEmptyExponentialHistogram()
h.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
return metric
}(),
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
167 changes: 167 additions & 0 deletions pkg/translator/prometheusremotewrite/histograms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"fmt"
"math"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

const defaultZeroThreshold = 1e-128

func addSingleExponentialHistogramDataPoint(
metric string,
pt pmetric.ExponentialHistogramDataPoint,
resource pcommon.Resource,
settings Settings,
series map[string]*prompb.TimeSeries,
) error {
labels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
model.MetricNameLabel, metric,
)

sig := timeSeriesSignature(
pmetric.MetricTypeExponentialHistogram.String(),
&labels,
)
ts, ok := series[sig]
if !ok {
ts = &prompb.TimeSeries{
Labels: labels,
}
series[sig] = ts
}

histogram, err := exponentialToNativeHistogram(pt)
if err != nil {
return err
}
ts.Histograms = append(ts.Histograms, histogram)

exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt)
ts.Exemplars = append(ts.Exemplars, exemplars...)

return nil
}

// exponentialToNativeHistogram translates OTel Exponential Histogram data point
// to Prometheus Native Histogram.
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) {
scale := p.Scale()
if scale < -4 || scale > 8 {
return prompb.Histogram{},
fmt.Errorf("cannot convert exponential to native histogram."+
" Scale must be <= 8 and >= -4, was %d", scale)
// TODO: downscale to 8 if scale > 8
}

pSpans, pDeltas := convertBucketsLayout(p.Positive())
nSpans, nDeltas := convertBucketsLayout(p.Negative())

h := prompb.Histogram{
Schema: scale,

ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()},
// TODO use zero_threshold, if set, see
// https://github.com/open-telemetry/opentelemetry-proto/pull/441
ZeroThreshold: defaultZeroThreshold,

PositiveSpans: pSpans,
PositiveDeltas: pDeltas,
NegativeSpans: nSpans,
NegativeDeltas: nDeltas,

Timestamp: convertTimeStamp(p.Timestamp()),
}

if p.Flags().NoRecordedValue() {
h.Sum = math.Float64frombits(value.StaleNaN)
h.Count = &prompb.Histogram_CountInt{CountInt: value.StaleNaN}
} else {
if p.HasSum() {
h.Sum = p.Sum()
}
h.Count = &prompb.Histogram_CountInt{CountInt: p.Count()}
}
return h, nil
}

// convertBucketsLayout translates OTel Exponential Histogram dense buckets
// representation to Prometheus Native Histogram sparse bucket representation.
//
// The translation logic is taken from the client_golang `histogram.go#makeBuckets`
// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go
// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket
// index 0 corresponds to the range (1, base] while Prometheus bucket index 0
// to the range (base 1].
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets) ([]*prompb.BucketSpan, []int64) {
bucketCounts := buckets.BucketCounts()
if bucketCounts.Len() == 0 {
return nil, nil
}

var (
spans []*prompb.BucketSpan
deltas []int64
prevCount int64
nextBucketIdx int32
)

appendDelta := func(count int64) {
spans[len(spans)-1].Length++
deltas = append(deltas, count-prevCount)
prevCount = count
}

for i := 0; i < bucketCounts.Len(); i++ {
count := int64(bucketCounts.At(i))
if count == 0 {
continue
}

// The offset is adjusted by 1 as described above.
bucketIdx := int32(i) + buckets.Offset() + 1
delta := bucketIdx - nextBucketIdx
if i == 0 || delta > 2 {
// We have to create a new span, either because we are
// at the very beginning, or because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
spans = append(spans, &prompb.BucketSpan{
Offset: delta,
Length: 0,
})
} else {
// We have found a small gap (or no gap at all).
// Insert empty buckets as needed.
for j := int32(0); j < delta; j++ {
appendDelta(0)
}
}
appendDelta(count)
nextBucketIdx = bucketIdx + 1
}

return spans, deltas
}
Loading

0 comments on commit 9c08b92

Please sign in to comment.