Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] add data stream routing #1

Conversation

andrzej-stencel
Copy link
Owner

Description:

Adds support for routing telemetry based on data stream attributes data_stream.dataset and data_stream.namespace as described in the Elastic docs: Data stream naming scheme.

This PR is based on [exporter/elasticsearch] Add initial support for metrics #33513.

Link to tracking Issue:

Testing:

Added tests.

Documentation:

Added documentation for the new configuration options.

@andrzej-stencel andrzej-stencel force-pushed the elasticsearch-exporter-attribute-based-data-stream-routing branch from 5645f9d to 1671598 Compare June 17, 2024 10:46
@felixbarny
Copy link

Let's make sure to also read the data_stream.* properties from resource attributes and scope attributes. The order of precedence is:

  1. Signal-level attributes (highest precedence)
  2. Scope attributes
  3. Resource attributes

@andrzej-stencel andrzej-stencel force-pushed the elasticsearch-exporter-attribute-based-data-stream-routing branch from 1671598 to 287e924 Compare June 21, 2024 09:56
@felixbarny
Copy link

What happens when dynamic indexing in data_stream mode is enabled and the data doesn't contain any data_stream attributes?

What's supposed to happens is that we still add data_stream.* attributes to the resulting document.

data_stream:
  type: <logs|metrics|traces>
  dataset: generic
  namespace: default  

Similar to the reroute processor, we should also sanitize the values, i.e. replace invalid characters with underscores, and limit the length of dataset and namespace to 100 characters.

See also the code of the RerouteProcessors for which characters are disallowed: https://github.com/elastic/elasticsearch/blob/95c9aad670cb8d3429444c197ddfbb0001df025a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java#L205-L206.

if err != nil {
return nil, err
}
func (m *encodeModel) encodeMetricDataPoint(resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading the code, I didn't quite understand how grouping metrics with the same attributes to the same ES document works now that the hashing stuff is removed. Does each document contain a single metric/data point now or where is the grouping happening?

From a storage perspective, it's still best to group as many metrics into the same document as possible.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does each document contain a single metric/data point now

Yes, this is the state after removing hashing. To bring back grouping metrics in a document I would need to reimplement hashing in a different place in code (in exporter and not in model). I can do this if needed, in this PR or in a separate one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, let's do this in a follow-up PR then. Can you create an issue so that we can track this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…ch-exporter-attribute-based-data-stream-routing
For consistency and to follow alphabetical order for modes.
@andrzej-stencel
Copy link
Owner Author

andrzej-stencel commented Jun 25, 2024

Closing this, will open a PR against upstream Contrib repo.

EDIT: Created open-telemetry#33755.

andrzej-stencel pushed a commit that referenced this pull request Jul 10, 2024
…try.log_response_body` config (open-telemetry#33854)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
- Add `telemetry.log_request_body` and `telemetry.log_response_body`
config for debugging. Debug log will contain field `request_body` and/or
`response_body` in the same log line instead of separate lines to avoid
interleaved log lines.
- Change "Request failed" log level to debug.

Output:
```
2024-07-02T14:09:24.983+0100	debug	elasticsearchexporter/elasticsearch_bulk.go:67	Request roundtrip completed.	{"kind": "exporter", "data_type": "logs", "name": "elasticsearch", "response_body": "{\"version\":{\"number\":\"1.2.3\"}}\n", "path": "/", "method": "GET", "duration": 0.000865486, "status": "200 OK"}
2024-07-02T14:09:24.984+0100	debug	elasticsearchexporter/elasticsearch_bulk.go:67	Request roundtrip completed.	{"kind": "exporter", "data_type": "logs", "name": "elasticsearch", "request_body": "{\"create\":{\"_index\":\"logs-test-idx\"}}\n{\"@timestamp\":\"2024-07-02T13:09:24.970187592Z\",\"Attributes\":{\"a\":\"test\",\"b\":5,\"batch_index\":\"batch_1\",\"c\":3,\"d\":true,\"item_index\":\"item_1\"},\"Body\":\"Load Generator Counter #0\",\"Scope\":{\"name\":\"\",\"version\":\"\"},\"SeverityNumber\":11,\"SeverityText\":\"INFO3\",\"TraceFlags\":1}\n{\"create\":{\"_index\":\"logs-test-idx\"}}\n{\"@timestamp\":\"2024-07-02T13:09:24.970187592Z\",\"Attributes\":{\"a\":\"test\",\"b\":5,\"batch_index\":\"batch_1\",\"c\":3,\"d\":true,\"item_index\":\"item_2\"},\"Body\":\"Load Generator Counter #1\",\"Scope\":{\"name\":\"\",\"version\":\"\"},\"SeverityNumber\":11,\"SeverityText\":\"INFO3\",\"TraceFlags\":1}\n", "response_body": "{\"took\":0,\"errors\":false,\"items\":[{\"create\":{\"_index\":\"logs-test-idx\",\"_id\":\"\",\"_version\":0,\"result\":\"\",\"status\":201,\"_seq_no\":0,\"_primary_term\":0,\"_shards\":{\"total\":0,\"successful\":0,\"failed\":0},\"error\":{\"type\":\"\",\"reason\":\"\",\"caused_by\":{\"type\":\"\",\"reason\":\"\"}}}},{\"create\":{\"_index\":\"logs-test-idx\",\"_id\":\"\",\"_version\":0,\"result\":\"\",\"status\":201,\"_seq_no\":0,\"_primary_term\":0,\"_shards\":{\"total\":0,\"successful\":0,\"failed\":0},\"error\":{\"type\":\"\",\"reason\":\"\",\"caused_by\":{\"type\":\"\",\"reason\":\"\"}}}}]}\n", "path": "/_bulk", "method": "POST", "duration": 0.000539979, "status": "200 OK"}
```

Required config to log
```
exporters:
  elasticsearch:
    telemetry:
      log_request_body: true
      log_response_body: true
    
service:
  telemetry:
    logs:
      level: debug
```

For easier analysis, limit the size of request body size. Use
`num_workers`=1 and lower `flush.bytes` and/or `flush.interval`.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

Manually verified with a modified integration test.

**Documentation:** <Describe the documentation added.>
andrzej-stencel pushed a commit that referenced this pull request Oct 4, 2024
… Histo --> Histogram (open-telemetry#33824)

## Description

This PR adds a custom metric function to the transformprocessor to
convert exponential histograms to explicit histograms.

Link to tracking issue: Resolves open-telemetry#33827

**Function Name**
```
convert_exponential_histogram_to_explicit_histogram
```

**Arguments:**

- `distribution` (_upper, midpoint, uniform, random_)
- `ExplicitBoundaries: []float64`

**Usage example:**

```yaml
processors:
  transform:
    error_mode: propagate
    metric_statements:
    - context: metric
      statements:
        - convert_exponential_histogram_to_explicit_histogram("random", [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]) 
```

**Converts:**

```
Resource SchemaURL: 
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope  
Metric #0
Descriptor:
     -> Name: response_time
     -> Description: 
     -> Unit: 
     -> DataType: ExponentialHistogram
     -> AggregationTemporality: Delta
ExponentialHistogramDataPoints #0
Data point attributes:
     -> metric_type: Str(timing)
StartTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-07-31 09:35:25.212037 +0000 UTC
Count: 44
Sum: 999.000000
Min: 40.000000
Max: 245.000000
Bucket (32.000000, 64.000000], Count: 10
Bucket (64.000000, 128.000000], Count: 22
Bucket (128.000000, 256.000000], Count: 12
        {"kind": "exporter", "data_type": "metrics", "name": "debug"}
```

**To:**

```
Resource SchemaURL: 
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope  
Metric #0
Descriptor:
     -> Name: response_time
     -> Description: 
     -> Unit: 
     -> DataType: Histogram
     -> AggregationTemporality: Delta
HistogramDataPoints #0
Data point attributes:
     -> metric_type: Str(timing)
StartTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-07-30 21:37:07.830902 +0000 UTC
Count: 44
Sum: 999.000000
Min: 40.000000
Max: 245.000000
ExplicitBounds #0: 10.000000
ExplicitBounds #1: 20.000000
ExplicitBounds #2: 30.000000
ExplicitBounds #3: 40.000000
ExplicitBounds open-telemetry#4: 50.000000
ExplicitBounds open-telemetry#5: 60.000000
ExplicitBounds open-telemetry#6: 70.000000
ExplicitBounds open-telemetry#7: 80.000000
ExplicitBounds open-telemetry#8: 90.000000
ExplicitBounds open-telemetry#9: 100.000000
Buckets #0, Count: 0
Buckets #1, Count: 0
Buckets #2, Count: 0
Buckets #3, Count: 2
Buckets open-telemetry#4, Count: 5
Buckets open-telemetry#5, Count: 0
Buckets open-telemetry#6, Count: 3
Buckets open-telemetry#7, Count: 7
Buckets open-telemetry#8, Count: 2
Buckets open-telemetry#9, Count: 4
Buckets open-telemetry#10, Count: 21
        {"kind": "exporter", "data_type": "metrics", "name": "debug"}
```

### Testing

- Several unit tests have been created. We have also tested by ingesting
and converting exponential histograms from the `statsdreceiver` as well
as directly via the `otlpreceiver` over grpc over several hours with a
large amount of data.

- We have clients that have been running this solution in production for
a number of weeks.

### Readme description:

### convert_exponential_hist_to_explicit_hist

`convert_exponential_hist_to_explicit_hist([ExplicitBounds])`

the `convert_exponential_hist_to_explicit_hist` function converts an
ExponentialHistogram to an Explicit (_normal_) Histogram.

`ExplicitBounds` is represents the list of bucket boundaries for the new
histogram. This argument is __required__ and __cannot be empty__.

__WARNING:__

The process of converting an ExponentialHistogram to an Explicit
Histogram is not perfect and may result in a loss of precision. It is
important to define an appropriate set of bucket boundaries to minimize
this loss. For example, selecting Boundaries that are too high or too
low may result histogram buckets that are too wide or too narrow,
respectively.

---------

Co-authored-by: Kent Quirk <kentquirk@gmail.com>
Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
andrzej-stencel pushed a commit that referenced this pull request Oct 7, 2024
…etry#35544)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

As described at
open-telemetry#35491,
it is useful to provide the option to the users for defining
`receiver_creator`'s templates per container.

In this regard, the current PR introduces a new type of Endpoint called
`PodContainer` that matches the rule type `pod.container`. This Endpoint
is emitted for each container of the Pod similarly to how the `Port`
Endpoints are emitted per container that defines a port.

A complete example on how to use this feature to apply different parsing
on each of the Pod's container is provided in the `How to test this
manually` section.

**Link to tracking Issue:** <Issue number if applicable> Fixes
open-telemetry#35491

**Testing:** <Describe what testing was performed and which tests were
added.> TBA

**Documentation:** <Describe the documentation added.> TBA


### How to test this manually

1. Use the following values file to deploy the Collector's Helm chart
```yaml
mode: daemonset

image:
  repository: otelcontribcol-dev
  tag: "latest"
  pullPolicy: IfNotPresent

command:
  name: otelcontribcol

clusterRole:
  create: true
  rules:
   - apiGroups:
     - ''
     resources:
     - 'pods'
     - 'nodes'
     verbs:
     - 'get'
     - 'list'
     - 'watch'
   - apiGroups: [ "" ]
     resources: [ "nodes/proxy"]
     verbs: [ "get" ]
   - apiGroups:
       - ""
     resources:
       - nodes/stats
     verbs:
       - get
   - nonResourceURLs:
       - "/metrics"
     verbs:
       - get

extraVolumeMounts:
 - name: varlogpods
   mountPath: /var/log/pods
   readOnly: true

extraVolumes:
  - name: varlogpods
    hostPath:
      path: /var/log/pods

config:
  extensions:
    k8s_observer:
      auth_type: serviceAccount
      node: ${env:K8S_NODE_NAME}
      observe_nodes: true
  exporters:
    debug:
      verbosity: basic
  receivers:
    receiver_creator/logs:
      watch_observers: [ k8s_observer ]
      receivers:
        filelog/busybox:
          rule: type == "pod.container" && pod.labels["otel.logs"] == "true" && container_name == "busybox"
          config:
            include:
              - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
            include_file_name: false
            include_file_path: true
            operators:
              - id: container-parser
                type: container
              - type: add
                field: attributes.log.template
                value: busybox
        filelog/lazybox:
          rule: type == "pod.container" && pod.labels["otel.logs"] == "true" && container_name == "lazybox"
          config:
            include:
              - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
            include_file_name: false
            include_file_path: true
            operators:
              - id: container-parser
                type: container
              - type: add
                field: attributes.log.template
                value: lazybox
  service:
    extensions: [health_check, k8s_observer]
    pipelines:
      logs:
        receivers: [receiver_creator/logs]
        processors: [batch]
        exporters: [debug]
```
2. Follow the logs of the Collector's Pod i.e: `k logs -f
daemonset-opentelemetry-collector-agent-2hrg5`
3. Deploy a sample Pod which consists of 2 different containers:

```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: daemonset-logs
  labels:
    app: daemonset-logs
spec:
  selector:
    matchLabels:
      app.kubernetes.io/component: migration-logger
      otel.logs: "true"
  template:
    metadata:
      labels:
        app.kubernetes.io/component: migration-logger
        otel.logs: "true"
    spec:
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      containers:
        - name: lazybox
          image: busybox
          args:
            - /bin/sh
            - -c
            - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 0.1s; done
        - name: busybox
          image: busybox
          args:
            - /bin/sh
            - -c
            - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 0.1s; done
```

Verify in the logs that only 2 filelog receivers are started, one per
container:

```console
2024-10-02T12:05:17.506Z	info	receivercreator@v0.110.0/observerhandler.go:96	starting receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox", "endpoint": "10.244.0.13", "endpoint_id": "k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox"}
2024-10-02T12:05:17.508Z	info	adapter/receiver.go:47	Starting stanza receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox"}
2024-10-02T12:05:17.508Z	info	receivercreator@v0.110.0/observerhandler.go:96	starting receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox", "endpoint": "10.244.0.13", "endpoint_id": "k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox"}
2024-10-02T12:05:17.510Z	info	adapter/receiver.go:47	Starting stanza receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox"}
2024-10-02T12:05:17.709Z	info	fileconsumer/file.go:256	Started watching file	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox", "component": "fileconsumer", "path": "/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/lazybox/0.log"}
2024-10-02T12:05:17.712Z	info	fileconsumer/file.go:256	Started watching file	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox", "component": "fileconsumer", "path": "/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/busybox/0.log"}
```

In addition verify that the proper attributes are added per container
according to the 2 different filelog receiver definitions:


```console
2024-10-02T12:23:55.117Z	info	ResourceLog #0
Resource SchemaURL: 
Resource attributes:
     -> k8s.pod.name: Str(daemonset-logs-sz4zk)
     -> k8s.container.restart_count: Str(0)
     -> k8s.pod.uid: Str(01543800-cfea-4c10-8220-387e60f65151)
     -> k8s.container.name: Str(lazybox)
     -> k8s.namespace.name: Str(default)
     -> container.id: Str(63a8e69bdc6ee95ee7918baf913a548190f32838adeb0e6189a8210e05157b40)
     -> container.image.name: Str(busybox)
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2024-10-02 12:23:54.896772888 +0000 UTC
Timestamp: 2024-10-02 12:23:54.750904381 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(otel logs at 12:23:54)
Attributes:
     -> log.iostream: Str(stdout)
     -> logtag: Str(F)
     -> log: Map({"template":"lazybox"})
     -> log.file.path: Str(/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/lazybox/0.log)
Trace ID: 
Span ID: 
Flags: 0
ResourceLog #1
Resource SchemaURL: 
Resource attributes:
     -> k8s.container.restart_count: Str(0)
     -> k8s.pod.uid: Str(01543800-cfea-4c10-8220-387e60f65151)
     -> k8s.container.name: Str(busybox)
     -> k8s.namespace.name: Str(default)
     -> k8s.pod.name: Str(daemonset-logs-sz4zk)
     -> container.id: Str(47163758424f2bc5382b1e9702301be23cab368b590b5fbf0b30affa09b4a199)
     -> container.image.name: Str(busybox)
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2024-10-02 12:23:54.897788935 +0000 UTC
Timestamp: 2024-10-02 12:23:54.749885634 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(otel logs at 12:23:54)
Attributes:
     -> log.file.path: Str(/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/busybox/0.log)
     -> logtag: Str(F)
     -> log.iostream: Str(stdout)
     -> log: Map({"template":"busybox"})
Trace ID: 
Span ID: 
Flags: 0
```

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
@andrzej-stencel andrzej-stencel deleted the elasticsearch-exporter-attribute-based-data-stream-routing branch November 5, 2024 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants