Skip to content

Commit

Permalink
Add extract_count_metric function to transform processor
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Aug 7, 2023
1 parent 74cffb5 commit 2b3c476
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_transformprocessor_extract_count_metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add extract_count_metric OTTL function to transform processor

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [22853]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
24 changes: 24 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,30 @@ Examples:

- `convert_gauge_to_sum("delta", true)`

### extract_count_metric

> [!NOTE]
> This function supports Histograms, ExponentialHistograms and Summaries.

`extract_count_metric(is_monotonic)`

The `extract_count_metric` function creates a new Sum metric from a Histogram, ExponentialHistogram or Summary's count value. A metric will only be created if there is at least one data point.

`is_monotonic` is a boolean representing the monotonicity of the new metric.

The name for the new metric will be `<original metric name>_count`. The fields that are copied are: `timestamp`, `starttimestamp`, `attibutes`, `description`, and `aggregation_temporality`. As metrics of type Summary don't have an `aggregation_temporality` field, this field will be set to `AGGREGATION_TEMPORALITY_CUMULATIVE` for those metrics.

The new metric that is created will be passed to all subsequent statements in the metrics statements list.

> [!WARNING]
> This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#sums). Use only if you're confident you know what the resulting monotonicity should be.

Examples:

- `extract_count_metric(true)`

- `extract_count_metric(false)`

### extract_sum_metric

> [!NOTE]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
)

type extractCountMetricArguments struct {
Monotonic bool `ottlarg:"0"`
}

func newExtractCountMetricFactory() ottl.Factory[ottlmetric.TransformContext] {
return ottl.NewFactory("extract_count_metric", &extractCountMetricArguments{}, createExtractCountMetricFunction)
}

func createExtractCountMetricFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
args, ok := oArgs.(*extractCountMetricArguments)

if !ok {
return nil, fmt.Errorf("extractCountMetricFactory args must be of type *extractCountMetricArguments")
}

return extractCountMetric(args.Monotonic)
}

func extractCountMetric(monotonic bool) (ottl.ExprFunc[ottlmetric.TransformContext], error) {
return func(_ context.Context, tCtx ottlmetric.TransformContext) (interface{}, error) {
var aggTemp pmetric.AggregationTemporality
metric := tCtx.GetMetric()
invalidMetricTypeError := fmt.Errorf("extract_count_metric requires an input metric of type Histogram, ExponentialHistogram or Summary, got %s", metric.Type())

switch metric.Type() {
case pmetric.MetricTypeHistogram:
aggTemp = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeExponentialHistogram:
aggTemp = metric.ExponentialHistogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
// Summaries don't have an aggregation temporality, but they *should* be cumulative based on the Openmetrics spec.
// This should become an optional argument once those are available in OTTL.
aggTemp = pmetric.AggregationTemporalityCumulative
default:
return nil, invalidMetricTypeError
}

countMetric := pmetric.NewMetric()
countMetric.SetDescription(metric.Description())
countMetric.SetName(metric.Name() + "_count")
countMetric.SetUnit(metric.Unit())
countMetric.SetEmptySum().SetAggregationTemporality(aggTemp)
countMetric.Sum().SetIsMonotonic(monotonic)

switch metric.Type() {
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
for i := 0; i < dataPoints.Len(); i++ {
addCountDataPoint(dataPoints.At(i), countMetric.Sum().DataPoints())
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
for i := 0; i < dataPoints.Len(); i++ {
addCountDataPoint(dataPoints.At(i), countMetric.Sum().DataPoints())
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
for i := 0; i < dataPoints.Len(); i++ {
addCountDataPoint(dataPoints.At(i), countMetric.Sum().DataPoints())
}
default:
return nil, invalidMetricTypeError
}

if countMetric.Sum().DataPoints().Len() > 0 {
countMetric.MoveTo(tCtx.GetMetrics().AppendEmpty())
}

return nil, nil
}, nil
}

func addCountDataPoint(dataPoint SumCountDataPoint, destination pmetric.NumberDataPointSlice) {
newDp := destination.AppendEmpty()
dataPoint.Attributes().CopyTo(newDp.Attributes())
newDp.SetIntValue(int64(dataPoint.Count()))
newDp.SetStartTimestamp(dataPoint.StartTimestamp())
newDp.SetTimestamp(dataPoint.Timestamp())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
)

func Test_extractCountMetric(t *testing.T) {
tests := []histogramTestCase{
{
name: "histogram (non-monotonic)",
input: getTestHistogramMetric(),
monotonicity: false,
want: func(metrics pmetric.MetricSlice) {
histogramMetric := getTestHistogramMetric()
histogramMetric.CopyTo(metrics.AppendEmpty())
countMetric := metrics.AppendEmpty()
countMetric.SetEmptySum()
countMetric.Sum().SetAggregationTemporality(histogramMetric.Histogram().AggregationTemporality())
countMetric.Sum().SetIsMonotonic(false)

countMetric.SetName(histogramMetric.Name() + "_count")
dp := countMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(histogramMetric.Histogram().DataPoints().At(0).Count()))

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "histogram (monotonic)",
input: getTestHistogramMetric(),
monotonicity: true,
want: func(metrics pmetric.MetricSlice) {
histogramMetric := getTestHistogramMetric()
histogramMetric.CopyTo(metrics.AppendEmpty())
countMetric := metrics.AppendEmpty()
countMetric.SetEmptySum()
countMetric.Sum().SetAggregationTemporality(histogramMetric.Histogram().AggregationTemporality())
countMetric.Sum().SetIsMonotonic(true)

countMetric.SetName(histogramMetric.Name() + "_count")
dp := countMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(histogramMetric.Histogram().DataPoints().At(0).Count()))

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "exponential histogram (non-monotonic)",
input: getTestExponentialHistogramMetric(),
monotonicity: false,
want: func(metrics pmetric.MetricSlice) {
expHistogramMetric := getTestExponentialHistogramMetric()
expHistogramMetric.CopyTo(metrics.AppendEmpty())
countMetric := metrics.AppendEmpty()
countMetric.SetEmptySum()
countMetric.Sum().SetAggregationTemporality(expHistogramMetric.ExponentialHistogram().AggregationTemporality())
countMetric.Sum().SetIsMonotonic(false)

countMetric.SetName(expHistogramMetric.Name() + "_count")
dp := countMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(expHistogramMetric.ExponentialHistogram().DataPoints().At(0).Count()))

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "exponential histogram (monotonic)",
input: getTestExponentialHistogramMetric(),
monotonicity: true,
want: func(metrics pmetric.MetricSlice) {
expHistogramMetric := getTestExponentialHistogramMetric()
expHistogramMetric.CopyTo(metrics.AppendEmpty())
countMetric := metrics.AppendEmpty()
countMetric.SetEmptySum()
countMetric.Sum().SetAggregationTemporality(expHistogramMetric.ExponentialHistogram().AggregationTemporality())
countMetric.Sum().SetIsMonotonic(true)

countMetric.SetName(expHistogramMetric.Name() + "_count")
dp := countMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(expHistogramMetric.ExponentialHistogram().DataPoints().At(0).Count()))

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "summary (non-monotonic)",
input: getTestSummaryMetric(),
monotonicity: false,
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
countMetric := metrics.AppendEmpty()
countMetric.SetEmptySum()
countMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
countMetric.Sum().SetIsMonotonic(false)

countMetric.SetName("summary_metric_count")
dp := countMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(summaryMetric.Summary().DataPoints().At(0).Count()))

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "summary (monotonic)",
input: getTestSummaryMetric(),
monotonicity: true,
want: func(metrics pmetric.MetricSlice) {
summaryMetric := getTestSummaryMetric()
summaryMetric.CopyTo(metrics.AppendEmpty())
countMetric := metrics.AppendEmpty()
countMetric.SetEmptySum()
countMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
countMetric.Sum().SetIsMonotonic(true)

countMetric.SetName("summary_metric_count")
dp := countMetric.Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(summaryMetric.Summary().DataPoints().At(0).Count()))

attrs := getTestAttributes()
attrs.CopyTo(dp.Attributes())
},
},
{
name: "gauge (error)",
input: getTestGaugeMetric(),
monotonicity: false,
wantErr: fmt.Errorf("extract_count_metric requires an input metric of type Histogram, ExponentialHistogram or Summary, got Gauge"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetrics := pmetric.NewMetricSlice()
tt.input.CopyTo(actualMetrics.AppendEmpty())

evaluate, err := extractCountMetric(tt.monotonicity)
assert.NoError(t, err)

_, err = evaluate(nil, ottlmetric.NewTransformContext(tt.input, actualMetrics, pcommon.NewInstrumentationScope(), pcommon.NewResource()))
assert.Equal(t, tt.wantErr, err)

if tt.want != nil {
expected := pmetric.NewMetricSlice()
tt.want(expected)
assert.Equal(t, expected, actualMetrics)
}
})
}
}
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func MetricFunctions() map[string]ottl.Factory[ottlmetric.TransformContext] {

metricFunctions := ottl.CreateFactoryMap(
newExtractSumMetricFactory(),
newExtractCountMetricFactory(),
)

for k, v := range metricFunctions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Test_DataPointFunctions(t *testing.T) {
func Test_MetricFunctions(t *testing.T) {
expected := ottlfuncs.StandardFuncs[ottlmetric.TransformContext]()
expected["extract_sum_metric"] = newExtractSumMetricFactory()
expected["extract_count_metric"] = newExtractCountMetricFactory()
actual := MetricFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
Expand Down
30 changes: 30 additions & 0 deletions processor/transformprocessor/internal/metrics/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,35 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) {
// so we should only have one Sum datapoint
},
},
{
statements: []string{`extract_count_metric(true) where name == "operationB"`},
want: func(td pmetric.Metrics) {
countMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty()
countMetric.SetEmptySum()

histogramMetric := pmetric.NewMetric()
fillMetricTwo(histogramMetric)

countMetric.SetDescription(histogramMetric.Description())
countMetric.SetName(histogramMetric.Name() + "_count")
countMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
countMetric.Sum().SetIsMonotonic(true)
countMetric.SetUnit(histogramMetric.Unit())

histogramDp0 := histogramMetric.Histogram().DataPoints().At(0)
countDp0 := countMetric.Sum().DataPoints().AppendEmpty()
histogramDp0.Attributes().CopyTo(countDp0.Attributes())
countDp0.SetIntValue(int64(histogramDp0.Count()))
countDp0.SetStartTimestamp(StartTimestamp)

// we have two histogram datapoints
histogramDp1 := histogramMetric.Histogram().DataPoints().At(1)
countDp1 := countMetric.Sum().DataPoints().AppendEmpty()
histogramDp1.Attributes().CopyTo(countDp1.Attributes())
countDp1.SetIntValue(int64(histogramDp1.Count()))
countDp1.SetStartTimestamp(StartTimestamp)
},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -830,6 +859,7 @@ func fillMetricTwo(m pmetric.Metric) {
dataPoint1.Attributes().PutStr("attr3", "test3")
dataPoint1.Attributes().PutStr("flags", "C|D")
dataPoint1.Attributes().PutStr("total.string", "345678")
dataPoint1.SetCount(3)
}

func fillMetricThree(m pmetric.Metric) {
Expand Down

0 comments on commit 2b3c476

Please sign in to comment.