Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add replace_match and replace_all_matches functions #10132

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

- `transformprocessor`: Add transformation of metrics (#10100)
- `kubeletstatsreceiver`: Update receiver to use new Metrics Builder. All emitted metrics remain the same. (#9744)
- `transformprocessor`: Add new `replace_match` and `replace_all_matches` functions (#10132)

### 🧰 Bug fixes 🧰

Expand Down
15 changes: 12 additions & 3 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ the fields specified by the list of strings. e.g., `keep_keys(attributes, "http.

- `limit(target, limit)` - `target` is a path expression to a map type field. `limit` is a non-negative integer. The map will be mutated such that the number of items does not exceed the limit. e.g., `limit(attributes, 100)` will limit `attributes` to no more than 100 items. Which items are dropped is random.

- `replace_match(target, pattern, replacement)` - `target` is a path expression to a telemetry field, `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match), and `replacement` is a string. If `target` matches `pattern` it will get replaced with `replacement`. e.g., `replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")`

- `replace_all_matches(target, pattern, replacement)` - `target` is a path expression to a map type field, `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match), and `replacement` is a string. Each string value in `target` that matches `pattern` will get replaced with `replacement`. e.g., `replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there is a significant use case for this, it's rare for many attributes to match the same string I guess. If just two or so which seems imaginable, two replace_match seems reasonable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not a significant number of use cases, but I like the flexibility. Also a good example of function syntax standards.


Supported where operations:
- `==` - matches telemetry where the values are equal to each other
- `!=` - matches telemetry where the values are not equal to each other
Expand All @@ -52,6 +56,7 @@ processors:
- set(status.code, 1) where attributes["http.path"] == "/health"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(name, attributes["http.route"])
- replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")
- limit(attributes, 100)
- limit(resource.attributes, 100)
- truncate_all(attributes, 4096)
Expand All @@ -66,8 +71,9 @@ processors:
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")
- set(body, attributes["http.route"])
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
service:
pipelines:
logs:
Expand All @@ -82,12 +88,12 @@ service:

This processor will perform the operations in order for


All spans

1) Set status code to OK for all spans with a path `/health`
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `name` to the `http.route` attribute if it is set
2) Replace the value of an attribute named `http.target` with `/user/{userId}/list/{listId}` if the value matched `/user/*/list/*`
4) Limit all span attributes such that each span has no more than 100 attributes.
5) Limit all resource attributes such that each resource no more than 100 attributes.
6) Truncate all span attributes such that no string value has more than 4096 characters.
Expand All @@ -104,5 +110,8 @@ All metrics and their data points
All logs

1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
2) Replace any attribute value that matches `/user/*/list/*` with `/user/{userId}/list/{listId}`
3) Set `body` to the `http.route` attribute if it is set
4) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes

[In development]: https://github.com/open-telemetry/opentelemetry-collector#in-development
1 change: 1 addition & 0 deletions processor/transformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.17

require (
github.com/alecthomas/participle/v2 v2.0.0-alpha8
github.com/gobwas/glob v0.2.3
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.51.0
go.opentelemetry.io/collector/pdata v0.51.0
Expand Down
2 changes: 2 additions & 0 deletions processor/transformprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 59 additions & 4 deletions processor/transformprocessor/internal/common/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ import (
"fmt"
"reflect"

"github.com/gobwas/glob"

"go.opentelemetry.io/collector/pdata/pcommon"
)

var registry = map[string]interface{}{
"keep_keys": keepKeys,
"set": set,
"truncate_all": truncateAll,
"limit": limit,
"keep_keys": keepKeys,
"set": set,
"truncate_all": truncateAll,
"limit": limit,
"replace_match": replaceMatch,
"replace_all_matches": replaceAllMatches,
}

type PathExpressionParser func(*Path) (GetSetter, error)
Expand Down Expand Up @@ -140,6 +144,52 @@ func limit(target GetSetter, limit int64) (ExprFunc, error) {
}, nil
}

func replaceMatch(target GetSetter, pattern string, replacement string) (ExprFunc, error) {
glob, err := glob.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("the pattern supplied to replace_match is not a valid pattern, %v", err)
}
return func(ctx TransformContext) interface{} {
val := target.Get(ctx)
if val == nil {
return nil
}
if valStr, ok := val.(string); ok {
if glob.Match(valStr) {
target.Set(ctx, replacement)
}
}
return nil
}, nil
}

func replaceAllMatches(target GetSetter, pattern string, replacement string) (ExprFunc, error) {
glob, err := glob.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("the pattern supplied to replace_match is not a valid pattern, %v", err)
}
return func(ctx TransformContext) interface{} {
val := target.Get(ctx)
if val == nil {
return nil
}
if attrs, ok := val.(pcommon.Map); ok {
updated := pcommon.NewMap()
updated.EnsureCapacity(attrs.Len())
attrs.Range(func(key string, value pcommon.Value) bool {
if glob.Match(value.StringVal()) {
updated.InsertString(key, replacement)
} else {
updated.Insert(key, value)
}
return true
})
target.Set(ctx, updated)
}
return nil
}, nil
}

// TODO(anuraaga): See if reflection can be avoided without complicating definition of transform functions.
// Visible for testing
func NewFunctionCall(inv Invocation, functions map[string]interface{}, pathParser PathExpressionParser) (ExprFunc, error) {
Expand Down Expand Up @@ -192,6 +242,11 @@ func NewFunctionCall(inv Invocation, functions map[string]interface{}, pathParse
return nil, fmt.Errorf("invalid argument at position %v, must be an int", i)
}
args = append(args, reflect.ValueOf(*argDef.Int))
case "string":
if argDef.String == nil {
return nil, fmt.Errorf("invalid argument at position %v, must be a string", i)
}
args = append(args, reflect.ValueOf(*argDef.String))
}
}
val := reflect.ValueOf(f)
Expand Down
47 changes: 47 additions & 0 deletions processor/transformprocessor/internal/common/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,53 @@ func Test_newFunctionCall_invalid(t *testing.T) {
Function: "testing_error",
},
},
{
name: "replace_match invalid pattern",
inv: Invocation{
Function: "replace_match",
Arguments: []Value{
{
Path: &Path{
Fields: []Field{
{
Name: "attributes",
MapKey: strp("test"),
},
},
},
},
{
String: strp("\\*"),
},
{
String: strp("test"),
},
},
},
},
{
name: "replace_all_matches invalid pattern",
inv: Invocation{
Function: "replace_all_matches",
Arguments: []Value{
{
Path: &Path{
Fields: []Field{
{
Name: "attributes",
},
},
},
},
{
String: strp("\\*"),
},
{
String: strp("test"),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading