Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New component: starlarktransform processor #27087

Closed
2 tasks
daidokoro opened this issue Sep 24, 2023 · 7 comments
Closed
2 tasks

New component: starlarktransform processor #27087

daidokoro opened this issue Sep 24, 2023 · 7 comments
Labels

Comments

@daidokoro
Copy link
Contributor

daidokoro commented Sep 24, 2023

The purpose and use-cases of the new component

starlarktransform

The starlarktransform processor modifies telemetry based on configuration using Starlark code.

Starlark is a scripting language used for configuration that is designed to be similar to Python. It is designed to be fast, deterministic, and easily embedded into other software projects.

The processor leverages Starlark to modify telemetry data while using familiar, pythonic syntax. Modifying telemetry data is as a simple as modifying a Dict.

Why?

While there are a number of transform processors, most notably, the main OTTL transform processor, this processor aims to grant users more flexibility by allowing them to manipulate telemetry data using a familiar syntax.

Python is a popular, well known language, even among non-developers. By allowing Starlark code to be used as an option to transform telemetry data, users can leverage their existing knowledge of Python.

How it works

The processor uses the Starlark-Go interpreter, this allows you to run this processor without having to install a Starlark language interpreter on the host.

Features

The starlarktransform processor gives you access to the full telemetry event payload. You are able to modify this payload using the Starklark code in any way you want. This allows you do various things such as:

  • Filtering
  • Adding/Removing attributes
  • Modifying attributes
  • Modifying telemetry data directly
  • Telemetry injection

Libs, Functions and Functionality

While similar in syntax to Python, Starlack does not have all the functionality associated with Python. This processor does not have access to Python standard libraries and the implementation found in this processor is limited further to only the following libraries and functions:

  • json

The JSON library allows you to encode and decode JSON strings. The use of this library is mandatory as the telemetry data is passed to the processor as a JSON string. You must decode the JSON string to a Dict before you can modify it. You must also return a JSON decoded Dict to the processor.

# encode dict string to json string
x = json.encode({"foo": ["bar", "baz"]})
print(x)
# output: {"foo":["bar","baz"]}
# decode json string to dict
x = json.decode('{"foo": ["bar", "baz"]}')

You can read more on the JSON library here

  • print

You are able to use the print function to check outputs of your code. The output of the print function is sent to the Open Telemetry runtime log. Values printed by the Print function only show when running Open Telemetry in Debug mode.

def transform(event):
    print("hello world")
    return json.decode(event)

The print statement above would result in the following output in the Open Telemetry runtime log. Again, this output is only visible when running Open Telemetry in Debug mode.

2023-09-23T16:50:17.328+0200	debug	traces/processor.go:25	hello world	{"kind": "processor", "name": "starlarktransform/traces", "pipeline": "traces", "thread": "trace.processor", "source": "starlark/code"}
  • re (regex)

Support for Regular Expressions coming soon

Note that you can define your own functions within your Starlark code, however, there must be at least one function named transform that accepts a single argument event and returns a JSON decoded Dict, this function can call all your other functions as needed.

Warnings

The starlarktransform processor allows you to modify all aspects of your telemetry data. This can result in invalid or bad data being propogated if you are not careful. It is your responsibility to inspect the data and ensure it is valid.

Example configuration for the component

processors:
  starlarktransform:
    code: |
      def transform(event):
        event = json.decode(event)

        <your starlark code>

        return event

You must define a function called transform that accepts a single argument, event. This function is called by the processor and is passed the telemetry event. The function must return the modified, json decoded event.

Full Configuration Example

For following configuration example demonstrates the starlarktransform processor telemetry events for logs, metrics and traces.

receivers:
  otlp:
    protocols:
      http:
        endpoint: "0.0.0.0:4318"
      grpc:
        endpoint: "0.0.0.0:4317"

  filelog:
    start_at: beginning
    include_file_name: true
    include: 
      - $LOGFILE

    operators:
      - type: move
        from: attributes["log.file.name"]
        to: resource["log.file.name"]

      - type: add
        field: attributes.app
        value: dev

processors:

  # - change resource attribute log.file.name to source.log
  # - add resource attribute cluster: dev
  # - filter out any logs that contain the word password
  # - add an attribute to each log: language: golang
  starlarktransform/logs:
    code: |
      def transform(event):
        event = json.decode(event)
        # edit resource attributes
        for data in event['resourceLogs']:
          for attr in data['resource']['attributes']:
            attr['value']['stringValue'] = 'source.log'

        # filter/delete logs
        for data in event['resourceLogs']:
          for slog in  data['scopeLogs']:
            slog['logRecords'] = [ lr for lr in slog['logRecords'] if 'internal' not in lr['body']['stringValue']]
            
            # add an attribute to each log
            for lr in slog['logRecords']:
              lr['attributes'].append({
                'key': 'language',
                'value': {
                  'stringValue': 'golang'
                }})
                
        return event
  # - print event received to otel runtime log
  # - if there are no resources, add a resource attribute source starlarktransform
  # - prefix each metric name with starlarktransform
  starlarktransform/metrics:
    code: |
      def transform(event):
        print("received event", event)
        event = json.decode(event)
        for md in event['resourceMetrics']:
          # if resources are empty
          if not md['resource']:
            md['resource'] = {
              'attributes': [
                {
                  "key": "source",
                  "value": {
                    "stringValue": "starlarktransform"
                  }
                }
              ]
            }

          # prefix each metric name with starlarktransform
          for sm in md['scopeMetrics']:
            for m in sm['metrics']:
              m['name'] = 'starlarktransform.' + m['name']

        return event

  # - add resource attribute source starlarktransform
  # - filter out any spans with http.target /roll attribute
  starlarktransform/traces:
    code: |
      def transform(event):
        event = json.decode(event)
        for td in event['resourceSpans']:
          # add resource attribute
          td['resource']['attributes'].append({
            'key': 'source',
            'value': {
              'stringValue': 'starlarktransform'
            }
          })

          # filter spans with http.target /roll attribute
          has_roll = lambda attrs: [a for a in attrs if a['key'] == 'http.target' and a['value']['stringValue'] == '/cats']
          for sd in td['scopeSpans']:
            sd['spans'] = [
              s for s in sd['spans']
              if not has_roll(s['attributes'])
            ]
        return event
exporters:
  logging:
    verbosity: detailed


service:
  pipelines:
    logs:
      receivers:
      - filelog
      processors:
      - starlarktransform/logs
      exporters:
      - logging

    metrics:
      receivers:
      - otlp
      processors:
      - starlarktransform/metrics
      exporters:
      - logging

    traces:
      receivers:
      - otlp
      processors:
      - starlarktransform/traces
      exporters:
      - logging

Telemetry data types supported

Supports:

  • Metrics
  • Logs
  • Traces

Is this a vendor-specific component?

  • This is a vendor-specific component
  • If this is a vendor-specific component, I am proposing to contribute and support it as a representative of the vendor.

Code Owner(s)

@daidokoro

Sponsor (optional)

No response

Additional context

This proposal follows the previous pytransform processor proposal and aims to address issues raised with the previous implementation.

Previous Concerns:

  1. This overlaps with the transformprocessor

This processor does not aim to compete or replace the OTTL Transform processor. The goal is to provide an approachable or familiar method for accomplishing telemetry data transformations. Benchmarks show the main transform processor is 2x faster than the starlark processor. So if performance is a key requirement, the transform processor is recommended.

The main transform processor also offers significant abstraction, allowing users to accomplish more with less code for certain tasks.

Take the following transformation for example.

processors:
  transform:
    log_statements:
      - context: log
        statements:
        - set(attributes["test"], "pass") where body == "operationA"

To accomplish the same using the starlarktransform processor:

processors:
  starlarktransform:
    code: |
      def transform(event):
        e = json.decode(event)
        for r in e["resourceLogs"]:
          for sl in r["scopeLogs"]:
            for lr in sl["logRecords"]:
              if lr["body"]["stringValue"] == "operationA":
                lr["attributes"].append({
                  "key": "test",
                  "value": {"stringValue": "pass"}
                })
        return e

As you can see, there is no abstraction for dealing with the underlying data types when using the starlark processor. Again asserting that this is not meant to compete with the transform processor, but instead only to provide a familiar alternative.

  1. Performance

As stated above, this processor is approximately 2x slower than the transform processors.

c42cd9c6-d497-4fbb-ac33-7c55cd4696f5

  1. Spawning Subprocesses

Unlike the previous implementstion that used embedded python, the starlark implementation does not spawn subprocesses to execute starlark code. The Interpreter itself is embedded using the Starlark Go package.

  1. Security

Starlark code does not have access to the internet, filesystem or any system processes. There are also no additional libraries, except those explicitly defined and allowed by the interpreter. It is completely sandboxed.


Current code implementation can be found here

Active PR

@daidokoro daidokoro added needs triage New item requiring triage Sponsor Needed New component seeking sponsor labels Sep 24, 2023
@daidokoro
Copy link
Contributor Author

@TylerHelmuth, created a follow-up proposal implementing changes based on the feedback from the Collector Sig.

It is still possible that this processor will not be accepted but I wanted to create a history showing the various objections for any future devs with similar ideas.

Let me know if you have any additional questions or issues.

Thanks.

@TylerHelmuth
Copy link
Member

@daidokoro can you share some scenarios/problems where the starlarktransform processor is the right solution?

@daidokoro
Copy link
Contributor Author

daidokoro commented Sep 26, 2023

Hey @TylerHelmuth,

Going through my list and comparing it to the current version of the transform processor, I would say that the transform processor has been updated sufficiently to handle most of my outliers.

Scenario:

Given:

{
    "resourceSpans": [
      {
        "resource": {
          "attributes": [
            {
              "key": "telemetry.sdk.language",
              "value": {
                "stringValue": "go"
              }
            },
            {
                "key": "zones",
                "value": {
                  "stringValue": "[\"eu-west-1a\", \"eu-west-1b\"]"
                }
            },
            {
                "key": "metadata",
                "value": {
                  "stringValue": "{\"os\":  \"linux\", \"arch\": \"x86\", \"instances\": 2}"
                }
            }
          ]
        },
        "scopeSpans": [
          {
            "scope": {
              "name": "opentelemetry.instrumentation.flask",
              "version": "0.40b0"
            },
            "spans": [
              {
                "traceId": "9cb5bf738137b2248dc7b20445ec2e1c",
                "spanId": "88079ad5c94b5b13",
                "parentSpanId": "",
                "name": "/roll",
                "kind": 2,
                "startTimeUnixNano": "1694388218052842000",
                "endTimeUnixNano": "1694388218053415000",
                "attributes": [],
                "status": {}
              }
            ]
          }
        ]
      }
    ]
  }
  

Desired outcome:

  1. The metadata resource attribute field should be unpacked into resource attributes and removed
  2. The zones resource attribute field should be unpacked to the following keys: zone_0 and zone_1 respectively, the zones fields should be removed

Solutions

Transform Processor

The transform processor could handle the 1st criteria by offloading the JSON into cache, setting the required keys from it and them deleting the metadata field. For 2nd criteria, I'm not sure how this would be handled, I could extract the required values from the array using regex, however, I'm not sure there is a way to enumerate key names. I can't hardcode the key names as there is a potential for the length of the array to be arbitrary.

Starlark Processor

Both criteria can be handled by the following:

processors:
  starlarktransform/traces:
    code: |
      def unpack_zones(event):
        e = json.decode(event)
        for rs in e["resourceSpans"]:
          zones = [
            json.decode(attr["value"]["stringValue"])
            for attr in rs["resource"]["attributes"]
            if attr["key"] == "zones"
          ]

          if not zones:
            return e

          zones = zones[0]

          count = 0
          for zone in zones:
            rs["resource"]["attributes"].append({
              "key": "zone_{}".format(count),
              "value": {
                "stringValue": zone
              }
            })
            count += 1

          rs["resource"]["attributes"] = [
            r for r in rs["resource"]["attributes"]
            if r["key"] != "zones"
          ]
        return e

      def unpack_metadata(e):
        for rs in e["resourceSpans"]:
          metadata = [
            json.decode(attr["value"]["stringValue"])
            for attr in rs["resource"]["attributes"]
            if attr["key"] == "metadata"
          ]

          if not metadata:
            return e

          metadata = metadata[0]
          for k, v in metadata.items():
            rs["resource"]["attributes"].append({
              "key": k,
              "value": {
                "stringValue" if type(v) == "string" else "intValue": v
              }
            })

            # remove item from the list
            rs["resource"]["attributes"] = [
              r for r in rs["resource"]["attributes"]
              if r["key"] != "metadata"
            ]

          return e

      def transform(event):
        event = unpack_zones(event)
        return unpack_metadata(event)

This results in:

Resource SchemaURL:
Resource attributes:
     -> telemetry.sdk.language: Str(go)
     -> zone_0: Str(eu-west-1a)
     -> zone_1: Str(eu-west-1b)
     -> os: Str(linux)
     -> arch: Str(x86)
     -> instances: Int(2)
ScopeSpans #0
ScopeSpans SchemaURL:
InstrumentationScope opentelemetry.instrumentation.flask 0.40b0
Span #0
    Trace ID       : 9cb5bf738137b2248dc7b20445ec2e1c
    Parent ID      :
    ID             : 88079ad5c94b5b13
    Name           : /roll
    Kind           : Server
    Start time     : 2023-09-26 13:32:58.183039488 +0000 UTC
    End time       : 2023-09-26 13:32:58.18304 +0000 UTC
    Status code    : Unset
    Status message :
	{"kind": "exporter", "data_type": "traces", "name": "logging"}

Note that the scenario above isn't the reason the starlark processor was created. Whether or not this is possible using the transform processor, isn't the issue. The issue is the lack of familiarity with the transform processor itself in complex cases. Note that I mentioned, I was unsure how to approach the issue using the transform processor, even though I've been through the docs a few times. It may well be possible to do this in the transform processor, but it was not obvious.

However, I knew how solve the issue instinctively with the starlark processor, as it is simply managing a dict using python dialect.

This brings me to Scenario No. 2:

Any time critical situation in which it is unclear how to solve a particular transform using the transform processor. Users can complete the transform using the starlark processor, then do the necessary research to figure out how to accomplish it using the main transform processor. Sometimes this may involve opening an Issue via github, etc.

@bryan-aguilar
Copy link
Contributor

I believe this has been discussed in a Sig meeting already, but just in case I'm mistaken I'll drop the new component blurb below.
If you have not already please make sure you review the new component guidelines.

If you have not found a volunteer sponsor yet then I encourage you to come to our weekly collector sig meetings. You can add an item to the agenda to discuss this new component proposal.

Copy link
Contributor

github-actions bot commented Feb 5, 2024

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

@github-actions github-actions bot added the Stale label Feb 5, 2024
Copy link
Contributor

github-actions bot commented Apr 5, 2024

This issue has been closed as inactive because it has been stale for 120 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Apr 5, 2024
@daidokoro
Copy link
Contributor Author

daidokoro commented Dec 21, 2024

This processor has been released via open source and is available here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants