-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporter/elasticsearch] add data stream routing #1
[exporter/elasticsearch] add data stream routing #1
Conversation
5645f9d
to
1671598
Compare
Let's make sure to also read the data_stream.* properties from resource attributes and scope attributes. The order of precedence is:
|
1671598
to
287e924
Compare
What happens when dynamic indexing in data_stream mode is enabled and the data doesn't contain any data_stream attributes? What's supposed to happens is that we still add
Similar to the See also the code of the |
if err != nil { | ||
return nil, err | ||
} | ||
func (m *encodeModel) encodeMetricDataPoint(resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From reading the code, I didn't quite understand how grouping metrics with the same attributes to the same ES document works now that the hashing stuff is removed. Does each document contain a single metric/data point now or where is the grouping happening?
From a storage perspective, it's still best to group as many metrics into the same document as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does each document contain a single metric/data point now
Yes, this is the state after removing hashing. To bring back grouping metrics in a document I would need to reimplement hashing in a different place in code (in exporter and not in model). I can do this if needed, in this PR or in a separate one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, let's do this in a follow-up PR then. Can you create an issue so that we can track this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created open-telemetry#33756.
…ch-exporter-attribute-based-data-stream-routing
For consistency and to follow alphabetical order for modes.
Closing this, will open a PR against upstream Contrib repo. EDIT: Created open-telemetry#33755. |
…try.log_response_body` config (open-telemetry#33854) **Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> - Add `telemetry.log_request_body` and `telemetry.log_response_body` config for debugging. Debug log will contain field `request_body` and/or `response_body` in the same log line instead of separate lines to avoid interleaved log lines. - Change "Request failed" log level to debug. Output: ``` 2024-07-02T14:09:24.983+0100 debug elasticsearchexporter/elasticsearch_bulk.go:67 Request roundtrip completed. {"kind": "exporter", "data_type": "logs", "name": "elasticsearch", "response_body": "{\"version\":{\"number\":\"1.2.3\"}}\n", "path": "/", "method": "GET", "duration": 0.000865486, "status": "200 OK"} 2024-07-02T14:09:24.984+0100 debug elasticsearchexporter/elasticsearch_bulk.go:67 Request roundtrip completed. {"kind": "exporter", "data_type": "logs", "name": "elasticsearch", "request_body": "{\"create\":{\"_index\":\"logs-test-idx\"}}\n{\"@timestamp\":\"2024-07-02T13:09:24.970187592Z\",\"Attributes\":{\"a\":\"test\",\"b\":5,\"batch_index\":\"batch_1\",\"c\":3,\"d\":true,\"item_index\":\"item_1\"},\"Body\":\"Load Generator Counter #0\",\"Scope\":{\"name\":\"\",\"version\":\"\"},\"SeverityNumber\":11,\"SeverityText\":\"INFO3\",\"TraceFlags\":1}\n{\"create\":{\"_index\":\"logs-test-idx\"}}\n{\"@timestamp\":\"2024-07-02T13:09:24.970187592Z\",\"Attributes\":{\"a\":\"test\",\"b\":5,\"batch_index\":\"batch_1\",\"c\":3,\"d\":true,\"item_index\":\"item_2\"},\"Body\":\"Load Generator Counter #1\",\"Scope\":{\"name\":\"\",\"version\":\"\"},\"SeverityNumber\":11,\"SeverityText\":\"INFO3\",\"TraceFlags\":1}\n", "response_body": "{\"took\":0,\"errors\":false,\"items\":[{\"create\":{\"_index\":\"logs-test-idx\",\"_id\":\"\",\"_version\":0,\"result\":\"\",\"status\":201,\"_seq_no\":0,\"_primary_term\":0,\"_shards\":{\"total\":0,\"successful\":0,\"failed\":0},\"error\":{\"type\":\"\",\"reason\":\"\",\"caused_by\":{\"type\":\"\",\"reason\":\"\"}}}},{\"create\":{\"_index\":\"logs-test-idx\",\"_id\":\"\",\"_version\":0,\"result\":\"\",\"status\":201,\"_seq_no\":0,\"_primary_term\":0,\"_shards\":{\"total\":0,\"successful\":0,\"failed\":0},\"error\":{\"type\":\"\",\"reason\":\"\",\"caused_by\":{\"type\":\"\",\"reason\":\"\"}}}}]}\n", "path": "/_bulk", "method": "POST", "duration": 0.000539979, "status": "200 OK"} ``` Required config to log ``` exporters: elasticsearch: telemetry: log_request_body: true log_response_body: true service: telemetry: logs: level: debug ``` For easier analysis, limit the size of request body size. Use `num_workers`=1 and lower `flush.bytes` and/or `flush.interval`. **Link to tracking Issue:** <Issue number if applicable> **Testing:** <Describe what testing was performed and which tests were added.> Manually verified with a modified integration test. **Documentation:** <Describe the documentation added.>
… Histo --> Histogram (open-telemetry#33824) ## Description This PR adds a custom metric function to the transformprocessor to convert exponential histograms to explicit histograms. Link to tracking issue: Resolves open-telemetry#33827 **Function Name** ``` convert_exponential_histogram_to_explicit_histogram ``` **Arguments:** - `distribution` (_upper, midpoint, uniform, random_) - `ExplicitBoundaries: []float64` **Usage example:** ```yaml processors: transform: error_mode: propagate metric_statements: - context: metric statements: - convert_exponential_histogram_to_explicit_histogram("random", [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]) ``` **Converts:** ``` Resource SchemaURL: ScopeMetrics #0 ScopeMetrics SchemaURL: InstrumentationScope Metric #0 Descriptor: -> Name: response_time -> Description: -> Unit: -> DataType: ExponentialHistogram -> AggregationTemporality: Delta ExponentialHistogramDataPoints #0 Data point attributes: -> metric_type: Str(timing) StartTimestamp: 1970-01-01 00:00:00 +0000 UTC Timestamp: 2024-07-31 09:35:25.212037 +0000 UTC Count: 44 Sum: 999.000000 Min: 40.000000 Max: 245.000000 Bucket (32.000000, 64.000000], Count: 10 Bucket (64.000000, 128.000000], Count: 22 Bucket (128.000000, 256.000000], Count: 12 {"kind": "exporter", "data_type": "metrics", "name": "debug"} ``` **To:** ``` Resource SchemaURL: ScopeMetrics #0 ScopeMetrics SchemaURL: InstrumentationScope Metric #0 Descriptor: -> Name: response_time -> Description: -> Unit: -> DataType: Histogram -> AggregationTemporality: Delta HistogramDataPoints #0 Data point attributes: -> metric_type: Str(timing) StartTimestamp: 1970-01-01 00:00:00 +0000 UTC Timestamp: 2024-07-30 21:37:07.830902 +0000 UTC Count: 44 Sum: 999.000000 Min: 40.000000 Max: 245.000000 ExplicitBounds #0: 10.000000 ExplicitBounds #1: 20.000000 ExplicitBounds #2: 30.000000 ExplicitBounds #3: 40.000000 ExplicitBounds open-telemetry#4: 50.000000 ExplicitBounds open-telemetry#5: 60.000000 ExplicitBounds open-telemetry#6: 70.000000 ExplicitBounds open-telemetry#7: 80.000000 ExplicitBounds open-telemetry#8: 90.000000 ExplicitBounds open-telemetry#9: 100.000000 Buckets #0, Count: 0 Buckets #1, Count: 0 Buckets #2, Count: 0 Buckets #3, Count: 2 Buckets open-telemetry#4, Count: 5 Buckets open-telemetry#5, Count: 0 Buckets open-telemetry#6, Count: 3 Buckets open-telemetry#7, Count: 7 Buckets open-telemetry#8, Count: 2 Buckets open-telemetry#9, Count: 4 Buckets open-telemetry#10, Count: 21 {"kind": "exporter", "data_type": "metrics", "name": "debug"} ``` ### Testing - Several unit tests have been created. We have also tested by ingesting and converting exponential histograms from the `statsdreceiver` as well as directly via the `otlpreceiver` over grpc over several hours with a large amount of data. - We have clients that have been running this solution in production for a number of weeks. ### Readme description: ### convert_exponential_hist_to_explicit_hist `convert_exponential_hist_to_explicit_hist([ExplicitBounds])` the `convert_exponential_hist_to_explicit_hist` function converts an ExponentialHistogram to an Explicit (_normal_) Histogram. `ExplicitBounds` is represents the list of bucket boundaries for the new histogram. This argument is __required__ and __cannot be empty__. __WARNING:__ The process of converting an ExponentialHistogram to an Explicit Histogram is not perfect and may result in a loss of precision. It is important to define an appropriate set of bucket boundaries to minimize this loss. For example, selecting Boundaries that are too high or too low may result histogram buckets that are too wide or too narrow, respectively. --------- Co-authored-by: Kent Quirk <kentquirk@gmail.com> Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
…etry#35544) **Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> As described at open-telemetry#35491, it is useful to provide the option to the users for defining `receiver_creator`'s templates per container. In this regard, the current PR introduces a new type of Endpoint called `PodContainer` that matches the rule type `pod.container`. This Endpoint is emitted for each container of the Pod similarly to how the `Port` Endpoints are emitted per container that defines a port. A complete example on how to use this feature to apply different parsing on each of the Pod's container is provided in the `How to test this manually` section. **Link to tracking Issue:** <Issue number if applicable> Fixes open-telemetry#35491 **Testing:** <Describe what testing was performed and which tests were added.> TBA **Documentation:** <Describe the documentation added.> TBA ### How to test this manually 1. Use the following values file to deploy the Collector's Helm chart ```yaml mode: daemonset image: repository: otelcontribcol-dev tag: "latest" pullPolicy: IfNotPresent command: name: otelcontribcol clusterRole: create: true rules: - apiGroups: - '' resources: - 'pods' - 'nodes' verbs: - 'get' - 'list' - 'watch' - apiGroups: [ "" ] resources: [ "nodes/proxy"] verbs: [ "get" ] - apiGroups: - "" resources: - nodes/stats verbs: - get - nonResourceURLs: - "/metrics" verbs: - get extraVolumeMounts: - name: varlogpods mountPath: /var/log/pods readOnly: true extraVolumes: - name: varlogpods hostPath: path: /var/log/pods config: extensions: k8s_observer: auth_type: serviceAccount node: ${env:K8S_NODE_NAME} observe_nodes: true exporters: debug: verbosity: basic receivers: receiver_creator/logs: watch_observers: [ k8s_observer ] receivers: filelog/busybox: rule: type == "pod.container" && pod.labels["otel.logs"] == "true" && container_name == "busybox" config: include: - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log include_file_name: false include_file_path: true operators: - id: container-parser type: container - type: add field: attributes.log.template value: busybox filelog/lazybox: rule: type == "pod.container" && pod.labels["otel.logs"] == "true" && container_name == "lazybox" config: include: - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log include_file_name: false include_file_path: true operators: - id: container-parser type: container - type: add field: attributes.log.template value: lazybox service: extensions: [health_check, k8s_observer] pipelines: logs: receivers: [receiver_creator/logs] processors: [batch] exporters: [debug] ``` 2. Follow the logs of the Collector's Pod i.e: `k logs -f daemonset-opentelemetry-collector-agent-2hrg5` 3. Deploy a sample Pod which consists of 2 different containers: ```yaml apiVersion: apps/v1 kind: DaemonSet metadata: name: daemonset-logs labels: app: daemonset-logs spec: selector: matchLabels: app.kubernetes.io/component: migration-logger otel.logs: "true" template: metadata: labels: app.kubernetes.io/component: migration-logger otel.logs: "true" spec: tolerations: - key: node-role.kubernetes.io/master effect: NoSchedule containers: - name: lazybox image: busybox args: - /bin/sh - -c - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 0.1s; done - name: busybox image: busybox args: - /bin/sh - -c - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 0.1s; done ``` Verify in the logs that only 2 filelog receivers are started, one per container: ```console 2024-10-02T12:05:17.506Z info receivercreator@v0.110.0/observerhandler.go:96 starting receiver {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox", "endpoint": "10.244.0.13", "endpoint_id": "k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox"} 2024-10-02T12:05:17.508Z info adapter/receiver.go:47 Starting stanza receiver {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox"} 2024-10-02T12:05:17.508Z info receivercreator@v0.110.0/observerhandler.go:96 starting receiver {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox", "endpoint": "10.244.0.13", "endpoint_id": "k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox"} 2024-10-02T12:05:17.510Z info adapter/receiver.go:47 Starting stanza receiver {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox"} 2024-10-02T12:05:17.709Z info fileconsumer/file.go:256 Started watching file {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox", "component": "fileconsumer", "path": "/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/lazybox/0.log"} 2024-10-02T12:05:17.712Z info fileconsumer/file.go:256 Started watching file {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox", "component": "fileconsumer", "path": "/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/busybox/0.log"} ``` In addition verify that the proper attributes are added per container according to the 2 different filelog receiver definitions: ```console 2024-10-02T12:23:55.117Z info ResourceLog #0 Resource SchemaURL: Resource attributes: -> k8s.pod.name: Str(daemonset-logs-sz4zk) -> k8s.container.restart_count: Str(0) -> k8s.pod.uid: Str(01543800-cfea-4c10-8220-387e60f65151) -> k8s.container.name: Str(lazybox) -> k8s.namespace.name: Str(default) -> container.id: Str(63a8e69bdc6ee95ee7918baf913a548190f32838adeb0e6189a8210e05157b40) -> container.image.name: Str(busybox) ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope LogRecord #0 ObservedTimestamp: 2024-10-02 12:23:54.896772888 +0000 UTC Timestamp: 2024-10-02 12:23:54.750904381 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Str(otel logs at 12:23:54) Attributes: -> log.iostream: Str(stdout) -> logtag: Str(F) -> log: Map({"template":"lazybox"}) -> log.file.path: Str(/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/lazybox/0.log) Trace ID: Span ID: Flags: 0 ResourceLog #1 Resource SchemaURL: Resource attributes: -> k8s.container.restart_count: Str(0) -> k8s.pod.uid: Str(01543800-cfea-4c10-8220-387e60f65151) -> k8s.container.name: Str(busybox) -> k8s.namespace.name: Str(default) -> k8s.pod.name: Str(daemonset-logs-sz4zk) -> container.id: Str(47163758424f2bc5382b1e9702301be23cab368b590b5fbf0b30affa09b4a199) -> container.image.name: Str(busybox) ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope LogRecord #0 ObservedTimestamp: 2024-10-02 12:23:54.897788935 +0000 UTC Timestamp: 2024-10-02 12:23:54.749885634 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Str(otel logs at 12:23:54) Attributes: -> log.file.path: Str(/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/busybox/0.log) -> logtag: Str(F) -> log.iostream: Str(stdout) -> log: Map({"template":"busybox"}) Trace ID: Span ID: Flags: 0 ``` Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
Description:
Adds support for routing telemetry based on data stream attributes
data_stream.dataset
anddata_stream.namespace
as described in the Elastic docs: Data stream naming scheme.This PR is based on [exporter/elasticsearch] Add initial support for metrics #33513.
Link to tracking Issue:
Testing:
Added tests.
Documentation:
Added documentation for the new configuration options.