From a4b0e5928e46c23a0ab42e2a72d468156d62051b Mon Sep 17 00:00:00 2001 From: lkwronski <45148751+lkwronski@users.noreply.github.com> Date: Fri, 10 May 2024 00:06:25 +0200 Subject: [PATCH] [processor/transform] Add common where clause (#31491) **Description:** Add global conditions with where clause **Link to tracking Issue:** Fixes #27830 **Testing:** Unit tests **Documentation:** TODO ~~The main objective is to extend the `ContextStatements` struct by adding a new `Conditions` parameter. By introducing `Conditions` to `ContextStatements`, we can now apply a global condition to all related statements in `WithStatementSequenceGlobalConditions` function.~~ Thanks in advance for your feedback! If this changes will be fine, I will add common where clause into another context `span`, `metrics`. --- ...onski.issue-27830-common-where-clause.yaml | 27 + internal/filter/expr/matcher.go | 10 + internal/filter/filterottl/filter.go | 17 + internal/filter/filterottl/filter_test.go | 51 ++ internal/filter/filterottl/functions.go | 5 + processor/transformprocessor/README.md | 32 +- processor/transformprocessor/config_test.go | 33 + processor/transformprocessor/factory_test.go | 564 ++++++++++++++++++ processor/transformprocessor/go.mod | 4 + processor/transformprocessor/go.sum | 2 + .../internal/common/config.go | 1 + .../internal/common/logs.go | 17 +- .../internal/common/metrics.go | 56 +- .../internal/common/processor.go | 77 ++- .../internal/common/traces.go | 32 +- .../transformprocessor/testdata/config.yaml | 20 + 16 files changed, 923 insertions(+), 25 deletions(-) create mode 100755 .chloggen/lkwronski.issue-27830-common-where-clause.yaml diff --git a/.chloggen/lkwronski.issue-27830-common-where-clause.yaml b/.chloggen/lkwronski.issue-27830-common-where-clause.yaml new file mode 100755 index 000000000000..d35728bcbe88 --- /dev/null +++ b/.chloggen/lkwronski.issue-27830-common-where-clause.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow common where clause + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27830] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/filter/expr/matcher.go b/internal/filter/expr/matcher.go index f73e9f8e8a8c..037b4888d46a 100644 --- a/internal/filter/expr/matcher.go +++ b/internal/filter/expr/matcher.go @@ -25,6 +25,16 @@ func Not[K any](matcher BoolExpr[K]) BoolExpr[K] { return notMatcher[K]{matcher: matcher} } +type alwaysTrueMatcher[K any] struct{} + +func (alm alwaysTrueMatcher[K]) Eval(_ context.Context, _ K) (bool, error) { + return true, nil +} + +func AlwaysTrue[K any]() BoolExpr[K] { + return alwaysTrueMatcher[K]{} +} + type orMatcher[K any] struct { matchers []BoolExpr[K] } diff --git a/internal/filter/filterottl/filter.go b/internal/filter/filterottl/filter.go index 6324c8a35bd9..e4dad6ee9359 100644 --- a/internal/filter/filterottl/filter.go +++ b/internal/filter/filterottl/filter.go @@ -12,6 +12,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" ) @@ -111,3 +112,19 @@ func NewBoolExprForResource(conditions []string, functions map[string]ottl.Facto c := ottlresource.NewConditionSequence(statements, set, ottlresource.WithConditionSequenceErrorMode(errorMode)) return &c, nil } + +// NewBoolExprForScope creates a BoolExpr[ottlscope.TransformContext] that will return true if any of the given OTTL conditions evaluate to true. +// The passed in functions should use the ottlresource.TransformContext. +// If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected +func NewBoolExprForScope(conditions []string, functions map[string]ottl.Factory[ottlscope.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (expr.BoolExpr[ottlscope.TransformContext], error) { + parser, err := ottlscope.NewParser(functions, set) + if err != nil { + return nil, err + } + statements, err := parser.ParseConditions(conditions) + if err != nil { + return nil, err + } + c := ottlscope.NewConditionSequence(statements, set, ottlscope.WithConditionSequenceErrorMode(errorMode)) + return &c, nil +} diff --git a/internal/filter/filterottl/filter_test.go b/internal/filter/filterottl/filter_test.go index 8e6a65ebc4c8..d198f8924ec5 100644 --- a/internal/filter/filterottl/filter_test.go +++ b/internal/filter/filterottl/filter_test.go @@ -15,6 +15,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" ) @@ -270,3 +271,53 @@ func Test_NewBoolExprForResource(t *testing.T) { }) } } + +func Test_NewBoolExprForScope(t *testing.T) { + tests := []struct { + name string + conditions []string + expectedResult bool + }{ + { + name: "basic", + conditions: []string{ + "true == true", + }, + expectedResult: true, + }, + { + name: "multiple conditions resulting true", + conditions: []string{ + "false == true", + "true == true", + }, + expectedResult: true, + }, + { + name: "multiple conditions resulting false", + conditions: []string{ + "false == true", + "true == false", + }, + expectedResult: false, + }, + { + name: "With Converter", + conditions: []string{ + `IsMatch("test", "pass")`, + }, + expectedResult: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resBoolExpr, err := NewBoolExprForScope(tt.conditions, StandardScopeFuncs(), ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + assert.NotNil(t, resBoolExpr) + result, err := resBoolExpr.Eval(context.Background(), ottlscope.TransformContext{}) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + }) + } +} diff --git a/internal/filter/filterottl/functions.go b/internal/filter/filterottl/functions.go index c86ee64f89ae..c3ce56ce4abf 100644 --- a/internal/filter/filterottl/functions.go +++ b/internal/filter/filterottl/functions.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" @@ -40,6 +41,10 @@ func StandardDataPointFuncs() map[string]ottl.Factory[ottldatapoint.TransformCon return ottlfuncs.StandardConverters[ottldatapoint.TransformContext]() } +func StandardScopeFuncs() map[string]ottl.Factory[ottlscope.TransformContext] { + return ottlfuncs.StandardConverters[ottlscope.TransformContext]() +} + func StandardLogFuncs() map[string]ottl.Factory[ottllog.TransformContext] { return ottlfuncs.StandardConverters[ottllog.TransformContext]() } diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index a347c12bebad..acb4ebd451f8 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -14,8 +14,8 @@ The transform processor modifies telemetry based on configuration using the [OpenTelemetry Transformation Language](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl). -For each signal type, the processor takes a list of statements associated to a [Context type](#contexts) and executes the statements against the incoming telemetry in the order specified in the config. -Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed. +For each signal type, the processor takes a list of conditions and statements associated to a [Context type](#contexts) and executes the conditions and statements against the incoming telemetry in the order specified in the config. +Each condition and statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed. - [Config](#config) - [Grammar](#grammar) @@ -28,8 +28,8 @@ Each statement can access and transform telemetry using functions and allow the The transform processor allows configuring multiple context statements for traces, metrics, and logs. The value of `context` specifies which [OTTL Context](#contexts) to use when interpreting the associated statements. -The statement strings, which must be OTTL compatible, will be passed to the OTTL and interpreted using the associated context. -Each context will be processed in the order specified and each statement for a context will be executed in the order specified. +The conditions and statement strings, which must be OTTL compatible, will be passed to the OTTL and interpreted using the associated context. The conditions string should contain a string with a WHERE clause body without the `where` keyword at the beginning. +Each context will be processed in the order specified and each condition and statement for a context will be executed in the order specified. Conditions are executed first, if a context doesn't meet the conditions, the associated statement will be skipped. The transform processor also allows configuring an optional field, `error_mode`, which will determine how the processor reacts to errors that occur while processing a statement. @@ -46,6 +46,9 @@ transform: error_mode: ignore _statements: - context: string + conditions: + - string + - string statements: - string - string @@ -67,6 +70,27 @@ Valid values for `context` are: | metric_statements | `resource`, `scope`, `metric`, and `datapoint` | | log_statements | `resource`, `scope`, and `log` | +`conditions` is a list comprised of multiple where clauses, which will be processed as global conditions for the accompanying set of statements. + +```yaml +transform: + error_mode: ignore + metric_statements: + - context: metric + conditions: + - type == METRIC_DATA_TYPE_SUM + statements: + - set(description, "Sum") + + log_statements: + - context: log + conditions: + - IsMap(body) and body["object"] != nil + statements: + - set(body, attributes["http.route"]) +``` + + ### Example The example takes advantage of context efficiency by grouping transformations with the context which it intends to transform. diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 256f69b9b6b7..1048ba19c36b 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -76,6 +76,39 @@ func TestLoadConfig(t *testing.T) { }, }, }, + { + id: component.NewIDWithName(metadata.Type, "with_conditions"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + Context: "span", + Conditions: []string{`attributes["http.path"] == "/animal"`}, + Statements: []string{ + `set(name, "bear")`, + }, + }, + }, + MetricStatements: []common.ContextStatements{ + { + Context: "datapoint", + Conditions: []string{`attributes["http.path"] == "/animal"`}, + Statements: []string{ + `set(metric.name, "bear")`, + }, + }, + }, + LogStatements: []common.ContextStatements{ + { + Context: "log", + Conditions: []string{`attributes["http.path"] == "/animal"`}, + Statements: []string{ + `set(body, "bear")`, + }, + }, + }, + }, + }, { id: component.NewIDWithName(metadata.Type, "ignore_errors"), expected: &Config{ diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index c9cc89ca2459..b56724a992ee 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -189,3 +189,567 @@ func TestFactoryCreateLogsProcessor_InvalidActions(t *testing.T) { assert.Error(t, err) assert.Nil(t, ap) } + +func TestFactoryCreateLogProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(plog.Logs) + createLogs func() plog.Logs + }{ + { + name: "create logs processor and pass log context is passed with a global condition that meets the specified condition", + conditions: []string{`body == "operationA"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(td plog.Logs) { + newLog := td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + newLog.Attributes().PutStr("test", "pass") + }, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + log := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + log.Body().SetStr("operationA") + return ld + }, + }, + { + name: "create logs processor and pass log context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(attributes["test"], "pass") where body == "operationA"`}, + want: func(td plog.Logs) { + newLog := td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + newLog.Attributes().PutStr("test", "pass") + }, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + log := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + log.Body().SetStr("operationA") + return ld + }, + }, + { + name: "create logs processor and pass log context is passed with a global condition that fails the specified condition", + conditions: []string{`body == "operationB"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(_ plog.Logs) {}, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + log := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + log.Body().SetStr("operationA") + return ld + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "log", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, lp) + assert.NoError(t, err) + + ld := tt.createLogs() + + err = lp.ConsumeLogs(context.Background(), ld) + assert.NoError(t, err) + + exLd := tt.createLogs() + tt.want(exLd) + + assert.Equal(t, exLd, ld) + }) + } +} + +func TestFactoryCreateResourceProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(plog.Logs) + createLogs func() plog.Logs + }{ + { + name: "create logs processor and pass resource context is passed with a global condition that meets the specified condition", + conditions: []string{`attributes["test"] == "foo"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("test", "foo") + return ld + }, + }, + { + name: "create logs processor and pass resource context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(attributes["test"], "pass") where attributes["test"] == "foo"`}, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("test", "foo") + return ld + }, + }, + { + name: "create logs processor and pass resource context is passed with a global condition that fails the specified condition", + conditions: []string{`attributes["test"] == "wrong"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(_ plog.Logs) {}, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("test", "foo") + return ld + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "resource", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, lp) + assert.NoError(t, err) + + ld := tt.createLogs() + + err = lp.ConsumeLogs(context.Background(), ld) + assert.NoError(t, err) + + exLd := tt.createLogs() + tt.want(exLd) + + assert.Equal(t, exLd, ld) + }) + } +} + +func TestFactoryCreateScopeProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(plog.Logs) + createLogs func() plog.Logs + }{ + { + name: "create logs processor and pass scope context is passed with a global condition that meets the specified condition", + conditions: []string{`attributes["test"] == "foo"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().Scope().Attributes().PutStr("test", "foo") + return ld + }, + }, + { + name: "create logs processor and pass scope context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(attributes["test"], "pass") where attributes["test"] == "foo"`}, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().Scope().Attributes().PutStr("test", "foo") + return ld + }, + }, + { + name: "create logs processor and pass scope context is passed with a global condition that fails the specified condition", + conditions: []string{`attributes["test"] == "wrong"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(_ plog.Logs) {}, + createLogs: func() plog.Logs { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().Scope().Attributes().PutStr("test", "foo") + return ld + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.LogStatements = []common.ContextStatements{ + { + Context: "scope", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, lp) + assert.NoError(t, err) + + ld := tt.createLogs() + + err = lp.ConsumeLogs(context.Background(), ld) + assert.NoError(t, err) + + exLd := tt.createLogs() + tt.want(exLd) + + assert.Equal(t, exLd, ld) + }) + } +} + +func TestFactoryCreateMetricProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(pmetric.Metrics) + createMetrics func() pmetric.Metrics + }{ + { + name: "create metrics processor and pass metric context is passed with a global condition that meets the specified condition", + conditions: []string{`name == "operationA"`}, + statements: []string{`set(description, "Sum")`}, + want: func(td pmetric.Metrics) { + newMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + newMetric.SetDescription("Sum") + }, + createMetrics: func() pmetric.Metrics { + td := pmetric.NewMetrics() + metric := td.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("operationA") + return td + }, + }, + { + name: "create metrics processor and pass metric context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(description, "Sum") where name == "operationA"`}, + want: func(td pmetric.Metrics) { + newMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + newMetric.SetDescription("Sum") + }, + createMetrics: func() pmetric.Metrics { + td := pmetric.NewMetrics() + metric := td.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("operationA") + return td + }, + }, + { + name: "create metrics processor and pass metric context is passed with a global condition that fails the specified condition", + conditions: []string{`name == "operationA"`}, + statements: []string{`set(description, "Sum")`}, + want: func(_ pmetric.Metrics) {}, + createMetrics: func() pmetric.Metrics { + td := pmetric.NewMetrics() + metric := td.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("operationB") + return td + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.MetricStatements = []common.ContextStatements{ + { + Context: "metric", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, mp) + assert.NoError(t, err) + + td := tt.createMetrics() + + err = mp.ConsumeMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := tt.createMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func TestFactoryCreateDataPointProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(pmetric.Metrics) + createMetrics func() pmetric.Metrics + }{ + { + name: "create metrics processor and pass datapoint context is passed with a global condition that meets the specified condition", + conditions: []string{`metric.name == "operationA"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(td pmetric.Metrics) { + newMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + newMetric.SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr("test", "pass") + }, + createMetrics: func() pmetric.Metrics { + td := pmetric.NewMetrics() + metric := td.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptySum().DataPoints().AppendEmpty() + metric.SetName("operationA") + return td + }, + }, + { + name: "create metrics processor and pass datapoint context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(attributes["test"], "pass") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + newMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + newMetric.SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr("test", "pass") + }, + createMetrics: func() pmetric.Metrics { + td := pmetric.NewMetrics() + metric := td.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptySum().DataPoints().AppendEmpty() + metric.SetName("operationA") + return td + }, + }, + { + name: "create metrics processor and pass datapoint context is passed with a global condition that fails the specified condition", + conditions: []string{`metric.name == "operationB"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(_ pmetric.Metrics) {}, + createMetrics: func() pmetric.Metrics { + td := pmetric.NewMetrics() + metric := td.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptySum().DataPoints().AppendEmpty() + metric.SetName("operationA") + return td + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.MetricStatements = []common.ContextStatements{ + { + Context: "datapoint", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, mp) + assert.NoError(t, err) + + td := tt.createMetrics() + + err = mp.ConsumeMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := tt.createMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func TestFactoryCreateSpanProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(ptrace.Traces) + createTraces func() ptrace.Traces + }{ + { + name: "create traces processor and pass span context is passed with a global condition that meets the specified condition", + conditions: []string{`name == "operationA"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(td ptrace.Traces) { + newSpan := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + newSpan.Attributes().PutStr("test", "pass") + }, + createTraces: func() ptrace.Traces { + td := ptrace.NewTraces() + span := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("operationA") + return td + }, + }, + { + name: "create traces processor and pass span context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(attributes["test"], "pass") where name == "operationA"`}, + want: func(td ptrace.Traces) { + newSpan := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + newSpan.Attributes().PutStr("test", "pass") + }, + createTraces: func() ptrace.Traces { + td := ptrace.NewTraces() + span := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("operationA") + return td + }, + }, + { + name: "create traces processor and pass span context is passed with a global condition that fails the specified condition", + conditions: []string{`name == "operationB"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(_ ptrace.Traces) {}, + createTraces: func() ptrace.Traces { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + return td + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.TraceStatements = []common.ContextStatements{ + { + Context: "span", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + mp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, mp) + assert.NoError(t, err) + + td := tt.createTraces() + + err = mp.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := tt.createTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func TestFactoryCreateSpanEventProcessor(t *testing.T) { + tests := []struct { + name string + conditions []string + statements []string + want func(ptrace.Traces) + createTraces func() ptrace.Traces + }{ + { + name: "create traces processor and pass spanevent context is passed with a global condition that meets the specified condition", + conditions: []string{`name == "eventA"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") + }, + createTraces: func() ptrace.Traces { + td := ptrace.NewTraces() + event := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().Events().AppendEmpty() + event.SetName("eventA") + return td + }, + }, + { + name: "create traces processor and pass spanevent context is passed with a statement condition that meets the specified condition", + conditions: []string{}, + statements: []string{`set(attributes["test"], "pass") where name == "eventA"`}, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") + }, + createTraces: func() ptrace.Traces { + td := ptrace.NewTraces() + event := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().Events().AppendEmpty() + event.SetName("eventA") + return td + }, + }, + { + name: "create traces processor and pass spanevent context is passed with a global condition that fails the specified condition", + conditions: []string{`name == "eventB"`}, + statements: []string{`set(attributes["test"], "pass")`}, + want: func(_ ptrace.Traces) {}, + createTraces: func() ptrace.Traces { + td := ptrace.NewTraces() + event := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().Events().AppendEmpty() + event.SetName("eventA") + return td + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError + oCfg.TraceStatements = []common.ContextStatements{ + { + Context: "spanevent", + Conditions: tt.conditions, + Statements: tt.statements, + }, + } + mp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, mp) + assert.NoError(t, err) + + td := tt.createTraces() + + err = mp.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := tt.createTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index f8eaac7eba42..7e817f252ea1 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.100.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.100.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.100.0 github.com/stretchr/testify v1.9.0 @@ -32,6 +33,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/iancoleman/strcase v0.3.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -82,3 +84,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => ../../internal/filter diff --git a/processor/transformprocessor/go.sum b/processor/transformprocessor/go.sum index 383306744162..1f9536f9a6c3 100644 --- a/processor/transformprocessor/go.sum +++ b/processor/transformprocessor/go.sum @@ -29,6 +29,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go index 2747ac11d4db..c0f293457329 100644 --- a/processor/transformprocessor/internal/common/config.go +++ b/processor/transformprocessor/internal/common/config.go @@ -33,5 +33,6 @@ func (c *ContextID) UnmarshalText(text []byte) error { type ContextStatements struct { Context ContextID `mapstructure:"context"` + Conditions []string `mapstructure:"conditions"` Statements []string `mapstructure:"statements"` } diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 83f2e81e9bce..fb350bc22137 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -10,6 +10,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" @@ -20,6 +22,7 @@ var _ consumer.Logs = &logStatements{} type logStatements struct { ottl.StatementSequence[ottllog.TransformContext] + expr.BoolExpr[ottllog.TransformContext] } func (l logStatements) Capabilities() consumer.Capabilities { @@ -36,10 +39,16 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { logs := slogs.LogRecords() for k := 0; k < logs.Len(); k++ { tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) - err := l.Execute(ctx, tCtx) + condition, err := l.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := l.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } } @@ -105,8 +114,12 @@ func (pc LogParserCollection) ParseContextStatements(contextStatements ContextSt if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.parserCollection, filterottl.StandardLogFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } lStatements := ottllog.NewStatementSequence(parsedStatements, pc.settings, ottllog.WithStatementSequenceErrorMode(pc.errorMode)) - return logStatements{lStatements}, nil + return logStatements{lStatements, globalExpr}, nil default: statements, err := pc.parseCommonContextStatements(contextStatements) if err != nil { diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 602245ac0015..dd63e820487d 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -11,6 +11,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" @@ -22,6 +24,7 @@ var _ consumer.Metrics = &metricStatements{} type metricStatements struct { ottl.StatementSequence[ottlmetric.TransformContext] + expr.BoolExpr[ottlmetric.TransformContext] } func (m metricStatements) Capabilities() consumer.Capabilities { @@ -38,10 +41,16 @@ func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics metrics := smetrics.Metrics() for k := 0; k < metrics.Len(); k++ { tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - err := m.Execute(ctx, tCtx) + condition, err := m.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := m.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } } @@ -52,6 +61,7 @@ var _ consumer.Metrics = &dataPointStatements{} type dataPointStatements struct { ottl.StatementSequence[ottldatapoint.TransformContext] + expr.BoolExpr[ottldatapoint.TransformContext] } func (d dataPointStatements) Capabilities() consumer.Capabilities { @@ -94,10 +104,16 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.Execute(ctx, tCtx) + condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := d.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -105,10 +121,16 @@ func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pme func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.Execute(ctx, tCtx) + condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := d.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -116,10 +138,16 @@ func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.Execute(ctx, tCtx) + condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := d.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -127,10 +155,16 @@ func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Con func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { for i := 0; i < dps.Len(); i++ { tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource) - err := d.Execute(ctx, tCtx) + condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := d.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -206,15 +240,23 @@ func (pc MetricParserCollection) ParseContextStatements(contextStatements Contex if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.parserCollection, filterottl.StandardMetricFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } mStatements := ottlmetric.NewStatementSequence(parseStatements, pc.settings, ottlmetric.WithStatementSequenceErrorMode(pc.errorMode)) - return metricStatements{mStatements}, nil + return metricStatements{mStatements, globalExpr}, nil case DataPoint: parsedStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, contextStatements.Conditions, pc.parserCollection, filterottl.StandardDataPointFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.settings, ottldatapoint.WithStatementSequenceErrorMode(pc.errorMode)) - return dataPointStatements{dpStatements}, nil + return dataPointStatements{dpStatements, globalExpr}, nil default: statements, err := pc.parseCommonContextStatements(contextStatements) if err != nil { diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go index e3f291c4242f..77de35b81eeb 100644 --- a/processor/transformprocessor/internal/common/processor.go +++ b/processor/transformprocessor/internal/common/processor.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" @@ -25,6 +27,7 @@ var _ baseContext = &resourceStatements{} type resourceStatements struct { ottl.StatementSequence[ottlresource.TransformContext] + expr.BoolExpr[ottlresource.TransformContext] } func (r resourceStatements) Capabilities() consumer.Capabilities { @@ -37,10 +40,16 @@ func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) tCtx := ottlresource.NewTransformContext(rspans.Resource()) - err := r.Execute(ctx, tCtx) + condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := r.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -49,10 +58,16 @@ func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metri for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) tCtx := ottlresource.NewTransformContext(rmetrics.Resource()) - err := r.Execute(ctx, tCtx) + condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := r.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -61,10 +76,16 @@ func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) tCtx := ottlresource.NewTransformContext(rlogs.Resource()) - err := r.Execute(ctx, tCtx) + condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := r.Execute(ctx, tCtx) + if err != nil { + return err + } + } } return nil } @@ -76,6 +97,7 @@ var _ baseContext = &scopeStatements{} type scopeStatements struct { ottl.StatementSequence[ottlscope.TransformContext] + expr.BoolExpr[ottlscope.TransformContext] } func (s scopeStatements) Capabilities() consumer.Capabilities { @@ -90,10 +112,16 @@ func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er for j := 0; j < rspans.ScopeSpans().Len(); j++ { sspans := rspans.ScopeSpans().At(j) tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource()) - err := s.Execute(ctx, tCtx) + condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := s.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } return nil @@ -105,10 +133,16 @@ func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { smetrics := rmetrics.ScopeMetrics().At(j) tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource()) - err := s.Execute(ctx, tCtx) + condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := s.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } return nil @@ -120,10 +154,16 @@ func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { for j := 0; j < rlogs.ScopeLogs().Len(); j++ { slogs := rlogs.ScopeLogs().At(j) tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource()) - err := s.Execute(ctx, tCtx) + condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := s.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } return nil @@ -149,16 +189,37 @@ func (pc parserCollection) parseCommonContextStatements(contextStatement Context if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, contextStatement.Conditions, pc, filterottl.StandardResourceFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.settings, ottlresource.WithStatementSequenceErrorMode(pc.errorMode)) - return resourceStatements{rStatements}, nil + return resourceStatements{rStatements, globalExpr}, nil case Scope: parsedStatements, err := pc.scopeParser.ParseStatements(contextStatement.Statements) if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, contextStatement.Conditions, pc, filterottl.StandardScopeFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.settings, ottlscope.WithStatementSequenceErrorMode(pc.errorMode)) - return scopeStatements{sStatements}, nil + return scopeStatements{sStatements, globalExpr}, nil default: return nil, fmt.Errorf("unknown context %v", contextStatement.Context) } } + +func parseGlobalExpr[K any]( + boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings) (expr.BoolExpr[K], error), + conditions []string, + pc parserCollection, + standardFuncs map[string]ottl.Factory[K]) (expr.BoolExpr[K], error) { + + if len(conditions) > 0 { + return boolExprFunc(conditions, standardFuncs, pc.errorMode, pc.settings) + } + // By default, set the global expression to always true unless conditions are specified. + return expr.AlwaysTrue[K](), nil +} diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index fe4c59709770..517b9e80969b 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -10,6 +10,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" @@ -21,6 +23,7 @@ var _ consumer.Traces = &traceStatements{} type traceStatements struct { ottl.StatementSequence[ottlspan.TransformContext] + expr.BoolExpr[ottlspan.TransformContext] } func (t traceStatements) Capabilities() consumer.Capabilities { @@ -37,10 +40,16 @@ func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er spans := sspans.Spans() for k := 0; k < spans.Len(); k++ { tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource()) - err := t.Execute(ctx, tCtx) + condition, err := t.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := t.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } } @@ -51,6 +60,7 @@ var _ consumer.Traces = &spanEventStatements{} type spanEventStatements struct { ottl.StatementSequence[ottlspanevent.TransformContext] + expr.BoolExpr[ottlspanevent.TransformContext] } func (s spanEventStatements) Capabilities() consumer.Capabilities { @@ -70,10 +80,16 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces spanEvents := span.Events() for n := 0; n < spanEvents.Len(); n++ { tCtx := ottlspanevent.NewTransformContext(spanEvents.At(n), span, sspans.Scope(), rspans.Resource()) - err := s.Execute(ctx, tCtx) + condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err } + if condition { + err := s.Execute(ctx, tCtx) + if err != nil { + return err + } + } } } } @@ -152,15 +168,23 @@ func (pc TraceParserCollection) ParseContextStatements(contextStatements Context if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.settings, ottlspan.WithStatementSequenceErrorMode(pc.errorMode)) - return traceStatements{sStatements}, nil + return traceStatements{sStatements, globalExpr}, nil case SpanEvent: parsedStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements) if err != nil { return nil, err } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanEventFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.settings, ottlspanevent.WithStatementSequenceErrorMode(pc.errorMode)) - return spanEventStatements{seStatements}, nil + return spanEventStatements{seStatements, globalExpr}, nil default: return pc.parseCommonContextStatements(contextStatements) } diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index 81a097e1098c..8cf295298e54 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -24,6 +24,26 @@ transform: statements: - set(attributes["name"], "bear") +transform/with_conditions: + trace_statements: + - context: span + conditions: + - attributes["http.path"] == "/animal" + statements: + - set(name, "bear") + metric_statements: + - context: datapoint + conditions: + - attributes["http.path"] == "/animal" + statements: + - set(metric.name, "bear") + log_statements: + - context: log + conditions: + - attributes["http.path"] == "/animal" + statements: + - set(body, "bear") + transform/ignore_errors: error_mode: ignore trace_statements: