From 6ceef084dddbf4be3b2a659f0b9a5be418929003 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Mon, 3 Jun 2024 14:27:09 +0200 Subject: [PATCH 01/24] [exporter/elasticsearch] route based on data stream attributes --- ...r-attribute-based-data-stream-routing.yaml | 27 +++ exporter/elasticsearchexporter/README.md | 58 +++++-- exporter/elasticsearchexporter/attribute.go | 8 +- exporter/elasticsearchexporter/config.go | 29 +++- exporter/elasticsearchexporter/config_test.go | 104 ++++++++++-- exporter/elasticsearchexporter/exporter.go | 112 ++++++++----- .../elasticsearchexporter/exporter_test.go | 155 +++++++++++++++++- exporter/elasticsearchexporter/factory.go | 22 ++- exporter/elasticsearchexporter/model.go | 136 +++------------ exporter/elasticsearchexporter/model_test.go | 14 +- .../testdata/config.yaml | 22 +++ exporter/elasticsearchexporter/utils_test.go | 11 ++ 12 files changed, 499 insertions(+), 199 deletions(-) create mode 100644 .chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml new file mode 100644 index 000000000000..842b7b62161b --- /dev/null +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/elasticsearch + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add data stream routing + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 8b52bf4da343..bd69d937417b 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -14,7 +14,7 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). +This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). ## Configuration options @@ -83,32 +83,52 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export ### Elasticsearch document routing Telemetry data will be written to signal specific data streams by default: -logs to `logs-generic-default`, and traces to `traces-generic-default`. +logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`. This can be customised through the following settings: -- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to. +- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`. + - `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default` -- `logs_dynamic_index` (optional): - takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute) + +- `logs_dynamic_index` (optional): uses resource or log record attributes to dynamically construct index name. See `mode` for details. - `enabled`(default=false): Enable/Disable dynamic index for log records -- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. + - `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed. + - `data_stream` - uses resource, scope or log record attributes `data_stream.dataset` and `data_stream.namespace` + to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. + Log record attributes take precedence over scope attributes, which take precedence over resource attributes. + - `prefix_suffix` - uses resource or log record attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > log record attribute) + +- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. -- `metrics_dynamic_index` (optional): - takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `metrics_index`. + +- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. See `mode` for details. ⚠️ Note that metrics support is currently in development. - - `enabled`(default=false): Enable/Disable dynamic index for metrics + - `enabled`(default=true): Enable/disable dynamic index for metrics + - `mode` (default=`data_stream`): defines how dynamic index name is constructed. + - `data_stream` - uses resource, scope or data point attributes `data_stream.dataset` and `data_stream.namespace` + to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. + Data point attributes take precedence over scope attributes, which take precedence over resource attributes. + - `prefix_suffix` - uses resource, scope or data point attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. + Data point attributes take precedence over scope attributes, which take precedence over resource attributes. + - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. -- `traces_dynamic_index` (optional): - takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute) + +- `traces_dynamic_index` (optional): uses resource or span attributes to dynamically construct index name. See `mode` for details. - `enabled`(default=false): Enable/Disable dynamic index for trace spans -- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format. - - `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date, - e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`. - The last string appended belongs to the date when the data is being generated. + - `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed. + - `data_stream` - uses resource attributes `data_stream.dataset` and `data_stream.namespace` + to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. + Span attributes take precedence over scope attributes, which take precedence over resource attributes. + - `prefix_suffix` - uses resource or span attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` + to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > span attribute) + +- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. + - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, + e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`. + The last string appended belongs to the date when the data is being generated. - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. @@ -189,6 +209,8 @@ The only metric types supported are: Other metric types (Histogram, Exponential Histogram, Summary) are ignored. +Dynamic indexing in `data_stream` mode is enabled by default for metrics. See `metrics_dynamic_index` configuration property for details. + [confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings [configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings [configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 987b13f807bb..85feec25f8bb 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,8 +7,12 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" + indexPrefix = "elasticsearch.index.prefix" + indexSuffix = "elasticsearch.index.suffix" + dataStreamDataset = "data_stream.dataset" + dataStreamNamespace = "data_stream.namespace" + defaultDataStreamDataset = "generic" + defaultDataStreamNamespace = "default" ) // resource is higher priotized than record attribute diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index daf83e4da3ba..4feff0325d56 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -4,6 +4,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "encoding" "encoding/base64" "errors" "fmt" @@ -80,7 +81,33 @@ type LogstashFormatSettings struct { } type DynamicIndexSetting struct { - Enabled bool `mapstructure:"enabled"` + Enabled bool `mapstructure:"enabled"` + Mode DynamicIndexMode `mapstructure:"mode"` +} + +type DynamicIndexMode string + +const DynamicIndexModeDataStream DynamicIndexMode = "data_stream" +const DynamicIndexModePrefixSuffix DynamicIndexMode = "prefix_suffix" + +var _ encoding.TextUnmarshaler = (*DynamicIndexMode)(nil) + +func (m *DynamicIndexMode) UnmarshalText(text []byte) error { + if m == nil { + return errors.New("cannot unmarshal to a nil *DynamicIndexMode") + } + + str := string(text) + switch str { + case string(DynamicIndexModeDataStream): + *m = DynamicIndexModeDataStream + case string(DynamicIndexModePrefixSuffix): + *m = DynamicIndexModePrefixSuffix + default: + return fmt.Errorf("unknown dynamic index mode %s", str) + } + + return nil } // AuthenticationSettings defines user authentication related settings. diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 8ca137118a0a..c0739e29468e 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -57,12 +57,24 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"https://elastic.example.com:9200"}, - Index: "", - LogsIndex: "logs-generic-default", + Endpoints: []string{"https://elastic.example.com:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, MetricsIndex: "metrics-generic-default", - TracesIndex: "trace_index", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: "trace_index", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -110,12 +122,24 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "my_log_index", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "my_log_index", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, MetricsIndex: "metrics-generic-default", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: "traces-generic-default", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -163,12 +187,24 @@ func TestConfig(t *testing.T) { NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, - Endpoints: []string{"http://localhost:9200"}, - Index: "", - LogsIndex: "logs-generic-default", + Endpoints: []string{"http://localhost:9200"}, + Index: "", + LogsIndex: "logs-generic-default", + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, MetricsIndex: "my_metric_index", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: "traces-generic-default", + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ Timeout: 2 * time.Minute, MaxIdleConns: &defaultMaxIdleConns, @@ -239,6 +275,44 @@ func TestConfig(t *testing.T) { cfg.Endpoint = "https://elastic.example.com:9200" }), }, + { + id: component.NewIDWithName(metadata.Type, "data-stream-mode"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + cfg.LogsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + } + cfg.MetricsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + } + cfg.TracesDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + } + }), + }, + { + id: component.NewIDWithName(metadata.Type, "prefix-suffix-mode"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + cfg.LogsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModePrefixSuffix, + } + cfg.MetricsDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModePrefixSuffix, + } + cfg.TracesDynamicIndex = DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModePrefixSuffix, + } + }), + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 01103adfa12f..ce696c274f7d 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -22,11 +22,12 @@ type elasticsearchExporter struct { component.TelemetrySettings userAgent string - config *Config - index string - logstashFormat LogstashFormatSettings - dynamicIndex bool - model mappingModel + config *Config + index string + logstashFormat LogstashFormatSettings + dynamicIndex bool + dynamicIndexMode DynamicIndexMode + model mappingModel bulkIndexer *esBulkIndexerCurrent } @@ -36,6 +37,7 @@ func newExporter( set exporter.Settings, index string, dynamicIndex bool, + dynamicIndexMode DynamicIndexMode, ) (*elasticsearchExporter, error) { if err := cfg.Validate(); err != nil { return nil, err @@ -59,11 +61,12 @@ func newExporter( TelemetrySettings: set.TelemetrySettings, userAgent: userAgent, - config: cfg, - index: index, - dynamicIndex: dynamicIndex, - model: model, - logstashFormat: cfg.LogstashFormat, + config: cfg, + index: index, + dynamicIndex: dynamicIndex, + dynamicIndexMode: dynamicIndexMode, + model: model, + logstashFormat: cfg.LogstashFormat, }, nil } @@ -117,10 +120,17 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, record) - suffix := getFromAttributes(indexSuffix, resource, scope, record) + if e.dynamicIndexMode == DynamicIndexModeDataStream { + dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + fIndex = fmt.Sprintf("logs-%s-%s", dataSet, namespace) + } else { + prefix := getFromAttributes(indexPrefix, resource, scope, record) + suffix := getFromAttributes(indexSuffix, resource, scope, record) + + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } } if e.logstashFormat.Enabled { @@ -150,50 +160,69 @@ func (e *elasticsearchExporter) pushMetricsData( resource := resourceMetric.Resource() scopeMetrics := resourceMetric.ScopeMetrics() for j := 0; j < scopeMetrics.Len(); j++ { - scope := scopeMetrics.At(j).Scope() - metricSlice := scopeMetrics.At(j).Metrics() - - if err := e.pushMetricSlice(ctx, resource, metricSlice, scope); err != nil { - if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr + scopeMetrics := scopeMetrics.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + metric := scopeMetrics.Metrics().At(k) + + // We only support Sum and Gauge metrics at the moment. + var dataPoints pmetric.NumberDataPointSlice + switch metric.Type() { + case pmetric.MetricTypeSum: + dataPoints = metric.Sum().DataPoints() + case pmetric.MetricTypeGauge: + dataPoints = metric.Gauge().DataPoints() } - errs = append(errs, err) + for l := 0; l < dataPoints.Len(); l++ { + dataPoint := dataPoints.At(l) + if err := e.pushMetricDataPoint(ctx, resource, scopeMetrics.Scope(), metric, dataPoint); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + } } - } } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushMetricSlice( +func (e *elasticsearchExporter) pushMetricDataPoint( ctx context.Context, resource pcommon.Resource, - slice pmetric.MetricSlice, scope pcommon.InstrumentationScope, + metric pmetric.Metric, + dataPoint pmetric.NumberDataPoint, ) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributesNew(indexPrefix, "", resource.Attributes()) - suffix := getFromAttributesNew(indexSuffix, "", resource.Attributes()) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } - - documents, err := e.model.encodeMetrics(resource, slice, scope) - if err != nil { - return fmt.Errorf("failed to encode a metric event: %w", err) + if e.dynamicIndexMode == DynamicIndexModeDataStream { + dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + fIndex = fmt.Sprintf("metrics-%s-%s", dataSet, namespace) + } else { + prefix := getFromAttributesNew(indexPrefix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + suffix := getFromAttributesNew(indexSuffix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } } - for _, document := range documents { - err := pushDocuments(ctx, fIndex, document, e.bulkIndexer) + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) if err != nil { return err } + fIndex = formattedIndex } - return nil + document, err := e.model.encodeMetricDataPoint(resource, scope, metric, dataPoint) + if err != nil { + return fmt.Errorf("failed to encode a metric data point: %w", err) + } + + return pushDocuments(ctx, fIndex, document, e.bulkIndexer) } func (e *elasticsearchExporter) pushTraceData( @@ -228,10 +257,17 @@ func (e *elasticsearchExporter) pushTraceData( func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, span) - suffix := getFromAttributes(indexSuffix, resource, scope, span) + if e.dynamicIndexMode == DynamicIndexModeDataStream { + dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + fIndex = fmt.Sprintf("traces-%s-%s", dataSet, namespace) + } else { + prefix := getFromAttributes(indexPrefix, resource, scope, span) + suffix := getFromAttributes(indexSuffix, resource, scope, span) + + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } } if e.logstashFormat.Enabled { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index d2fda9b9aeb9..62006c5c3667 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -168,7 +168,7 @@ func TestExporterLogs(t *testing.T) { <-done }) - t.Run("publish with dynamic index", func(t *testing.T) { + t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -213,6 +213,43 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + assert.Equal(t, "logs-record.dataset-resource.namespace", create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicIndex.Enabled = true + cfg.LogsDynamicIndex.Mode = DynamicIndexModeDataStream + }) + logs := newLogsWithAttributeAndResourceMap( + map[string]string{ + dataStreamDataset: "record.dataset", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + dataStreamNamespace: "resource.namespace", + }, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -469,6 +506,81 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(2) }) + t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := "resource.prefix-metrics.index-data.point.suffix" + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + cfg.MetricsDynamicIndex.Mode = DynamicIndexModePrefixSuffix + }) + metrics := newMetricsWithAttributeAndResourceMap( + map[string]string{ + indexSuffix: "-data.point.suffix", + }, + map[string]string{ + indexPrefix: "resource.prefix-", + indexSuffix: "-resource.suffix", + }, + ) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + }) + + t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := "metrics-resource.dataset-data.point.namespace" + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + }) + metrics := newMetricsWithAttributeAndResourceMap( + map[string]string{ + dataStreamNamespace: "data.point.namespace", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + dataStreamNamespace: "resource.namespace", + }, + ) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + }) + } func TestExporterTraces(t *testing.T) { @@ -486,7 +598,7 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index", func(t *testing.T) { + t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -531,6 +643,45 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + + rec := newBulkRecorder() + + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + + expected := "traces-span.dataset-default" + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesDynamicIndex.Enabled = true + cfg.TracesDynamicIndex.Mode = DynamicIndexModeDataStream + }) + + mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap( + map[string]string{ + dataStreamDataset: "span.dataset", + }, + map[string]string{ + dataStreamDataset: "resource.dataset", + }, + )) + + rec.WaitItems(1) + }) + t.Run("publish with logstash format index", func(t *testing.T) { var defaultCfg Config diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 8829215c8ac6..e0bcaf6884b3 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -49,8 +49,20 @@ func createDefaultConfig() component.Config { ClientConfig: httpClientConfig, Index: "", LogsIndex: defaultLogsIndex, - MetricsIndex: defaultMetricsIndex, - TracesIndex: defaultTracesIndex, + LogsDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, + MetricsIndex: defaultMetricsIndex, + MetricsDynamicIndex: DynamicIndexSetting{ + Enabled: true, + Mode: DynamicIndexModeDataStream, + }, + TracesIndex: defaultTracesIndex, + TracesDynamicIndex: DynamicIndexSetting{ + Enabled: false, + Mode: DynamicIndexModePrefixSuffix, + }, Retry: RetrySettings{ Enabled: true, MaxRequests: 3, @@ -93,7 +105,7 @@ func createLogsExporter( index = cf.Index } - exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled, cf.LogsDynamicIndex.Mode) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -116,7 +128,7 @@ func createMetricsExporter( ) (exporter.Metrics, error) { cf := cfg.(*Config) - exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled, cf.MetricsDynamicIndex.Mode) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -137,7 +149,7 @@ func createTracesExporter(ctx context.Context, cf := cfg.(*Config) - exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled, cf.TracesDynamicIndex.Mode) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 626b8f566e1f..7652d2453c2c 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -5,12 +5,8 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "bytes" - "encoding/binary" "encoding/json" "fmt" - "hash" - "hash/fnv" - "math" "time" "go.opentelemetry.io/collector/pdata/pcommon" @@ -64,7 +60,7 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) - encodeMetrics(pcommon.Resource, pmetric.MetricSlice, pcommon.InstrumentationScope) ([][]byte, error) + encodeMetricDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) } @@ -169,123 +165,31 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } -func (m *encodeModel) encodeMetrics(resource pcommon.Resource, metrics pmetric.MetricSlice, _ pcommon.InstrumentationScope) ([][]byte, error) { - var baseDoc objmodel.Document - - baseDoc.AddAttributes("", resource.Attributes()) - - // Put all metrics that have the same attributes and timestamp in one document. - docs := map[uint32]*objmodel.Document{} - for i := 0; i < metrics.Len(); i++ { - metric := metrics.At(i) - - var dps pmetric.NumberDataPointSlice - - // Only Gauge and Sum metric types are supported at the moment. - switch metric.Type() { - case pmetric.MetricTypeGauge: - dps = metric.Gauge().DataPoints() - case pmetric.MetricTypeSum: - dps = metric.Sum().DataPoints() - } - - for j := 0; j < dps.Len(); j++ { - dp := dps.At(j) - - hash := metricHash(dp.Timestamp(), dp.Attributes()) - doc, docExists := docs[hash] - if !docExists { - doc = baseDoc.Clone() - doc.AddTimestamp("@timestamp", dp.Timestamp()) - doc.AddAttributes("", dp.Attributes()) - - docs[hash] = doc - } - - switch dp.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - doc.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) - case pmetric.NumberDataPointValueTypeInt: - doc.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) - } - } - } - - res := make([][]byte, 0, len(docs)) - - for _, doc := range docs { - if m.dedup { - doc.Dedup() - } else if m.dedot { - doc.Sort() - } - - var buf bytes.Buffer - err := doc.Serialize(&buf, m.dedot) - if err != nil { - return nil, err - } +func (m *encodeModel) encodeMetricDataPoint(resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) { + var document objmodel.Document - res = append(res, buf.Bytes()) + document.AddAttributes("", resource.Attributes()) + document.AddTimestamp("@timestamp", dataPoint.Timestamp()) + document.AddAttributes("", dataPoint.Attributes()) + switch dataPoint.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dataPoint.DoubleValue())) + case pmetric.NumberDataPointValueTypeInt: + document.AddAttribute(metric.Name(), pcommon.NewValueInt(dataPoint.IntValue())) } - return res, nil -} - -func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { - hasher := fnv.New32a() - - timestampBuf := make([]byte, 8) - binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) - hasher.Write(timestampBuf) - - mapHash(hasher, attributes) - - return hasher.Sum32() -} - -func mapHash(hasher hash.Hash, m pcommon.Map) { - m.Range(func(k string, v pcommon.Value) bool { - hasher.Write([]byte(k)) - valueHash(hasher, v) - - return true - }) -} - -func valueHash(h hash.Hash, v pcommon.Value) { - switch v.Type() { - case pcommon.ValueTypeEmpty: - h.Write([]byte{0}) - case pcommon.ValueTypeStr: - h.Write([]byte(v.Str())) - case pcommon.ValueTypeBool: - if v.Bool() { - h.Write([]byte{1}) - } else { - h.Write([]byte{0}) - } - case pcommon.ValueTypeDouble: - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double())) - h.Write(buf) - case pcommon.ValueTypeInt: - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(v.Int())) - h.Write(buf) - case pcommon.ValueTypeBytes: - h.Write(v.Bytes().AsRaw()) - case pcommon.ValueTypeMap: - mapHash(h, v.Map()) - case pcommon.ValueTypeSlice: - sliceHash(h, v.Slice()) + if m.dedup { + document.Dedup() + } else if m.dedot { + document.Sort() } -} -func sliceHash(h hash.Hash, s pcommon.Slice) { - for i := 0; i < s.Len(); i++ { - valueHash(h, s.At(i)) + var buf bytes.Buffer + err := document.Serialize(&buf, m.dedot) + if err != nil { + return nil, err } + return buf.Bytes(), nil } func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 26199ba8cca9..0cc56b186600 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -92,10 +92,20 @@ func TestEncodeMetric(t *testing.T) { dedup: true, mode: MappingNone, } - docsBytes, err := model.encodeMetrics(metrics.ResourceMetrics().At(0).Resource(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope()) + var docsBytes [][]byte + for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { + documentBytes, err := model.encodeMetricDataPoint( + metrics.ResourceMetrics().At(0).Resource(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) + require.NoError(t, err) + docsBytes = append(docsBytes, documentBytes) + } + + assert.Len(t, docsBytes, len(strings.Split(expectedMetricsEncoded, "\n"))) // Convert the byte arrays to strings and sort the docs to make the test deterministic. - require.NoError(t, err) docs := make([]string, 0, len(docsBytes)) for _, docBytes := range docsBytes { docs = append(docs, string(docBytes)) diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index acd6e92f9001..3f1409c6cfab 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -82,3 +82,25 @@ elasticsearch/deprecated_index: index: my_log_index elasticsearch/confighttp_endpoint: endpoint: https://elastic.example.com:9200 +elasticsearch/prefix-suffix-mode: + endpoint: https://elastic.example.com:9200 + logs_dynamic_index: + enabled: true + mode: prefix_suffix + metrics_dynamic_index: + enabled: true + mode: prefix_suffix + traces_dynamic_index: + enabled: true + mode: prefix_suffix +elasticsearch/data-stream-mode: + endpoint: https://elastic.example.com:9200 + logs_dynamic_index: + enabled: true + mode: data_stream + metrics_dynamic_index: + enabled: true + mode: data_stream + traces_dynamic_index: + enabled: true + mode: data_stream diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index de3d60418b24..21b839823604 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -236,6 +237,16 @@ func newLogsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[stri return logs } +func newMetricsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[string]string) pmetric.Metrics { + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + + fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), resMp) + fillResourceAttributeMap(resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes(), attrMp) + + return metrics +} + func newTracesWithAttributeAndResourceMap(attrMp map[string]string, resMp map[string]string) ptrace.Traces { traces := ptrace.NewTraces() resourceSpans := traces.ResourceSpans() From dc8e9803d5cd73de20285ed2759d60d41090bc05 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Tue, 25 Jun 2024 12:29:44 +0200 Subject: [PATCH 02/24] add issue number to changelog entry --- ...sticsearch-exporter-attribute-based-data-stream-routing.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml index 842b7b62161b..80f294615042 100644 --- a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -10,7 +10,7 @@ component: exporter/elasticsearch note: Add data stream routing # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [33755] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From cb88ebcef50226de5630e4c080f032ce59ef989b Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 26 Jun 2024 12:17:13 +0200 Subject: [PATCH 03/24] fill in missing data stream attributes --- exporter/elasticsearchexporter/attribute.go | 16 +++-- .../data-stream-router.go | 67 +++++++++++++++++++ exporter/elasticsearchexporter/exporter.go | 14 +--- exporter/elasticsearchexporter/factory.go | 4 ++ 4 files changed, 84 insertions(+), 17 deletions(-) create mode 100644 exporter/elasticsearchexporter/data-stream-router.go diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 85feec25f8bb..25f08568ec86 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,12 +7,16 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" - dataStreamDataset = "data_stream.dataset" - dataStreamNamespace = "data_stream.namespace" - defaultDataStreamDataset = "generic" - defaultDataStreamNamespace = "default" + indexPrefix = "elasticsearch.index.prefix" + indexSuffix = "elasticsearch.index.suffix" + dataStreamDataset = "data_stream.dataset" + dataStreamNamespace = "data_stream.namespace" + dataStreamType = "data_stream.type" + defaultDataStreamDataset = "generic" + defaultDataStreamNamespace = "default" + defaultDataStreamTypeLogs = "logs" + defaultDataStreamTypeMetrics = "metrics" + defaultDataStreamTypeTraces = "traces" ) // resource is higher priotized than record attribute diff --git a/exporter/elasticsearchexporter/data-stream-router.go b/exporter/elasticsearchexporter/data-stream-router.go new file mode 100644 index 000000000000..7b1044115e89 --- /dev/null +++ b/exporter/elasticsearchexporter/data-stream-router.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes. +// It searches for the routing attributes on the log record, scope, and resource. +// It creates missing routing attributes on the log record if they are not found. +func routeLogRecord( + record *plog.LogRecord, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, +) string { + dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) + dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeLogs, record.Attributes(), scope.Attributes(), resource.Attributes()) + return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) +} + +// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. +// It searches for the routing attributes on the data point, scope, and resource. +// It creates missing routing attributes on the data point if they are not found. +func routeDataPoint( + dataPoint pmetric.NumberDataPoint, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, +) string { + dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeMetrics, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) + return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) +} + +// routeSpan returns the name of the index to send the span to according to data stream routing attributes. +// It searches for the routing attributes on the span, scope, and resource. +// It creates missing routing attributes on the span if they are not found. +func routeSpan( + span ptrace.Span, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, +) string { + dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) + namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) + dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeTraces, span.Attributes(), scope.Attributes(), resource.Attributes()) + return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) +} + +func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string { + // Try to retrieve the attribute value from an existing attribute. + value := getFromAttributesNew(attributeName, "", recordAttributes, scopeAttributes, resourceAttributes) + + // If the value is not found, set the default value on the record. + if value == "" { + value = defaultValue + recordAttributes.PutStr(attributeName, value) + } + return value +} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ce696c274f7d..7dc564695168 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -121,10 +121,7 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { - dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) - - fIndex = fmt.Sprintf("logs-%s-%s", dataSet, namespace) + fIndex = routeLogRecord(&record, scope, resource) } else { prefix := getFromAttributes(indexPrefix, resource, scope, record) suffix := getFromAttributes(indexSuffix, resource, scope, record) @@ -199,9 +196,7 @@ func (e *elasticsearchExporter) pushMetricDataPoint( fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { - dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("metrics-%s-%s", dataSet, namespace) + fIndex = routeDataPoint(dataPoint, scope, resource) } else { prefix := getFromAttributesNew(indexPrefix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) suffix := getFromAttributesNew(indexSuffix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) @@ -258,10 +253,7 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { - dataSet := getFromAttributesNew(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := getFromAttributesNew(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) - - fIndex = fmt.Sprintf("traces-%s-%s", dataSet, namespace) + fIndex = routeSpan(span, scope, resource) } else { prefix := getFromAttributes(indexPrefix, resource, scope, span) suffix := getFromAttributes(indexSuffix, resource, scope, span) diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index e0bcaf6884b3..c107fb090b2a 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -115,6 +116,7 @@ func createLogsExporter( set, cfg, exporter.pushLogsData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), @@ -137,6 +139,7 @@ func createMetricsExporter( set, cfg, exporter.pushMetricsData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), @@ -158,6 +161,7 @@ func createTracesExporter(ctx context.Context, set, cfg, exporter.pushTraceData, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), From 7f41b41c769086580fdcf3225b3e07bf89c1ef0f Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Wed, 26 Jun 2024 12:45:44 +0200 Subject: [PATCH 04/24] make gotidy --- exporter/elasticsearchexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index bbff538ff408..9c49a61022cb 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -17,6 +17,7 @@ require ( go.opentelemetry.io/collector/config/confighttp v0.103.0 go.opentelemetry.io/collector/config/configopaque v1.10.0 go.opentelemetry.io/collector/confmap v0.103.0 + go.opentelemetry.io/collector/consumer v0.103.0 go.opentelemetry.io/collector/exporter v0.103.0 go.opentelemetry.io/collector/extension/auth v0.103.0 go.opentelemetry.io/collector/pdata v1.10.0 @@ -70,7 +71,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/config/configtls v0.103.0 // indirect go.opentelemetry.io/collector/config/internal v0.103.0 // indirect - go.opentelemetry.io/collector/consumer v0.103.0 // indirect go.opentelemetry.io/collector/extension v0.103.0 // indirect go.opentelemetry.io/collector/featuregate v1.10.0 // indirect go.opentelemetry.io/collector/receiver v0.103.0 // indirect From 2aa1c7200aebb21841ef42acb46c23c3895fa93e Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 27 Jun 2024 17:47:32 +0100 Subject: [PATCH 05/24] Add back metrics grouping logic; Fix missing scope DS attrs --- exporter/elasticsearchexporter/exporter.go | 56 ++++++--- .../elasticsearchexporter/exporter_test.go | 105 +++++++++++++++++ exporter/elasticsearchexporter/model.go | 106 +++++++++++++++--- exporter/elasticsearchexporter/model_test.go | 70 ++++++------ exporter/elasticsearchexporter/utils_test.go | 24 ++++ 5 files changed, 295 insertions(+), 66 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 7dc564695168..c158f6cdb545 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -16,6 +16,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) type elasticsearchExporter struct { @@ -156,8 +158,12 @@ func (e *elasticsearchExporter) pushMetricsData( resourceMetric := resourceMetrics.At(i) resource := resourceMetric.Resource() scopeMetrics := resourceMetric.ScopeMetrics() + + resourceDocs := make(map[string]map[uint32]objmodel.Document) + for j := 0; j < scopeMetrics.Len(); j++ { scopeMetrics := scopeMetrics.At(j) + scope := scopeMetrics.Scope() for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) @@ -172,27 +178,51 @@ func (e *elasticsearchExporter) pushMetricsData( for l := 0; l < dataPoints.Len(); l++ { dataPoint := dataPoints.At(l) - if err := e.pushMetricDataPoint(ctx, resource, scopeMetrics.Scope(), metric, dataPoint); err != nil { - if cerr := ctx.Err(); cerr != nil { - return cerr - } + fIndex, err := e.getMetricDataPointIndex(resource, scope, dataPoint) + if err != nil { + errs = append(errs, err) + continue + } + if _, ok := resourceDocs[fIndex]; !ok { + resourceDocs[fIndex] = make(map[uint32]objmodel.Document) + } + if err := e.model.upsertMetricDataPoint(resourceDocs[fIndex], resource, scope, metric, dataPoint); err != nil { errs = append(errs, err) } } } } + + for fIndex, docs := range resourceDocs { + for _, doc := range docs { + var ( + docBytes []byte + err error + ) + docBytes, err = e.model.encodeDocument(doc) + if err != nil { + errs = append(errs, err) + continue + } + + if err := pushDocuments(ctx, fIndex, docBytes, e.bulkIndexer); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + } + } } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushMetricDataPoint( - ctx context.Context, +func (e *elasticsearchExporter) getMetricDataPointIndex( resource pcommon.Resource, scope pcommon.InstrumentationScope, - metric pmetric.Metric, dataPoint pmetric.NumberDataPoint, -) error { +) (string, error) { fIndex := e.index if e.dynamicIndex { if e.dynamicIndexMode == DynamicIndexModeDataStream { @@ -207,17 +237,11 @@ func (e *elasticsearchExporter) pushMetricDataPoint( if e.logstashFormat.Enabled { formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) if err != nil { - return err + return "", err } fIndex = formattedIndex } - - document, err := e.model.encodeMetricDataPoint(resource, scope, metric, dataPoint) - if err != nil { - return fmt.Errorf("failed to encode a metric data point: %w", err) - } - - return pushDocuments(ctx, fIndex, document, e.bulkIndexer) + return fIndex, nil } func (e *elasticsearchExporter) pushTraceData( diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 62006c5c3667..7de3f3f2fda1 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -581,6 +581,111 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with metrics grouping", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.MetricsIndex = "metrics.index" + cfg.Mapping.Mode = "ecs" + cfg.MetricsDynamicIndex.Mode = "data_stream" + }) + + addToMetricSlice := func(metricSlice pmetric.MetricSlice) { + fooMetric := metricSlice.AppendEmpty() + fooMetric.SetName("metric.foo") + fooDps := fooMetric.SetEmptyGauge().DataPoints() + fooDp := fooDps.AppendEmpty() + fooDp.SetIntValue(1) + fooOtherDp := fooDps.AppendEmpty() + fillResourceAttributeMap(fooOtherDp.Attributes(), map[string]string{ + "dp.attribute": "dp.attribute.value", + }) + fooOtherDp.SetDoubleValue(1.0) + + barMetric := metricSlice.AppendEmpty() + barMetric.SetName("metric.bar") + barDps := barMetric.SetEmptyGauge().DataPoints() + barDp := barDps.AppendEmpty() + barDp.SetDoubleValue(1.0) + barOtherDp := barDps.AppendEmpty() + fillResourceAttributeMap(barOtherDp.Attributes(), map[string]string{ + "dp.attribute": "dp.attribute.value", + }) + barOtherDp.SetDoubleValue(1.0) + barOtherIndexDp := barDps.AppendEmpty() + fillResourceAttributeMap(barOtherIndexDp.Attributes(), map[string]string{ + "dp.attribute": "dp.attribute.value", + dataStreamNamespace: "bar", + }) + barOtherIndexDp.SetDoubleValue(1.0) + + bazMetric := metricSlice.AppendEmpty() + bazMetric.SetName("metric.baz") + bazDps := bazMetric.SetEmptyGauge().DataPoints() + bazDp := bazDps.AppendEmpty() + bazDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) + bazDp.SetDoubleValue(1.0) + } + + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), map[string]string{ + dataStreamNamespace: "resource.namespace", + }) + scopeA := resourceMetrics.ScopeMetrics().AppendEmpty() + addToMetricSlice(scopeA.Metrics()) + + scopeB := resourceMetrics.ScopeMetrics().AppendEmpty() + fillResourceAttributeMap(scopeB.Scope().Attributes(), map[string]string{ + dataStreamDataset: "scope.b", + }) + addToMetricSlice(scopeB.Metrics()) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(8) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic-bar"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"bar","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-generic-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic","namespace":"resource.namespace","type":"metrics"},"metric":{"baz":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-bar"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"bar","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"dp":{"attribute":"dp.attribute.value"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"metric":{"bar":1,"foo":1}}`), + }, + { + Action: []byte(`{"create":{"_index":"metrics-scope.b-resource.namespace"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"scope.b","namespace":"resource.namespace","type":"metrics"},"metric":{"baz":1}}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) } func TestExporterTraces(t *testing.T) { diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 7652d2453c2c..b3a8be3ec2c6 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -5,8 +5,12 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "bytes" + "encoding/binary" "encoding/json" "fmt" + "hash" + "hash/fnv" + "math" "time" "go.opentelemetry.io/collector/pdata/pcommon" @@ -60,8 +64,9 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) - encodeMetricDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) + upsertMetricDataPoint(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, pmetric.NumberDataPoint) error + encodeDocument(objmodel.Document) ([]byte, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -165,19 +170,7 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } -func (m *encodeModel) encodeMetricDataPoint(resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dataPoint pmetric.NumberDataPoint) ([]byte, error) { - var document objmodel.Document - - document.AddAttributes("", resource.Attributes()) - document.AddTimestamp("@timestamp", dataPoint.Timestamp()) - document.AddAttributes("", dataPoint.Attributes()) - switch dataPoint.ValueType() { - case pmetric.NumberDataPointValueTypeDouble: - document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dataPoint.DoubleValue())) - case pmetric.NumberDataPointValueTypeInt: - document.AddAttribute(metric.Name(), pcommon.NewValueInt(dataPoint.IntValue())) - } - +func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) { if m.dedup { document.Dedup() } else if m.dedot { @@ -192,6 +185,35 @@ func (m *encodeModel) encodeMetricDataPoint(resource pcommon.Resource, _ pcommon return buf.Bytes(), nil } +func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error { + hash := metricHash(dp.Timestamp(), dp.Attributes()) + var ( + document objmodel.Document + ok bool + ) + if document, ok = documents[hash]; !ok { + document.AddAttributes("", resource.Attributes()) + document.AddTimestamp("@timestamp", dp.Timestamp()) + if scopeDataset, ok := scope.Attributes().Get(dataStreamDataset); ok { + document.AddString(dataStreamDataset, scopeDataset.Str()) + } + if scopeNamespace, ok := scope.Attributes().Get(dataStreamNamespace); ok { + document.AddString(dataStreamNamespace, scopeNamespace.Str()) + } + document.AddAttributes("", dp.Attributes()) + } + + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) + case pmetric.NumberDataPointValueTypeInt: + document.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) + } + + documents[hash] = document + return nil +} + func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { var document objmodel.Document document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. @@ -383,3 +405,59 @@ func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecor document.AddTimestamp("@timestamp", record.ObservedTimestamp()) } + +func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { + hasher := fnv.New32a() + + timestampBuf := make([]byte, 8) + binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) + hasher.Write(timestampBuf) + + mapHash(hasher, attributes) + + return hasher.Sum32() +} + +func mapHash(hasher hash.Hash, m pcommon.Map) { + m.Range(func(k string, v pcommon.Value) bool { + hasher.Write([]byte(k)) + valueHash(hasher, v) + + return true + }) +} + +func valueHash(h hash.Hash, v pcommon.Value) { + switch v.Type() { + case pcommon.ValueTypeEmpty: + h.Write([]byte{0}) + case pcommon.ValueTypeStr: + h.Write([]byte(v.Str())) + case pcommon.ValueTypeBool: + if v.Bool() { + h.Write([]byte{1}) + } else { + h.Write([]byte{0}) + } + case pcommon.ValueTypeDouble: + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double())) + h.Write(buf) + case pcommon.ValueTypeInt: + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(v.Int())) + h.Write(buf) + case pcommon.ValueTypeBytes: + h.Write(v.Bytes().AsRaw()) + case pcommon.ValueTypeMap: + mapHash(h, v.Map()) + case pcommon.ValueTypeSlice: + sliceHash(h, v.Slice()) + } +} + +func sliceHash(h hash.Hash, s pcommon.Slice) { + for i := 0; i < s.Len(); i++ { + valueHash(h, s.At(i)) + } +} diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 0cc56b186600..358faaa1c8be 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -6,8 +6,6 @@ package elasticsearchexporter import ( "fmt" "os" - "sort" - "strings" "testing" "time" @@ -82,40 +80,40 @@ func TestEncodeLog(t *testing.T) { }) } -func TestEncodeMetric(t *testing.T) { - // Prepare metrics to test. - metrics := createTestMetrics(t) - - // Encode the metrics. - model := &encodeModel{ - dedot: true, - dedup: true, - mode: MappingNone, - } - - var docsBytes [][]byte - for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { - documentBytes, err := model.encodeMetricDataPoint( - metrics.ResourceMetrics().At(0).Resource(), - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) - require.NoError(t, err) - docsBytes = append(docsBytes, documentBytes) - } - - assert.Len(t, docsBytes, len(strings.Split(expectedMetricsEncoded, "\n"))) - // Convert the byte arrays to strings and sort the docs to make the test deterministic. - docs := make([]string, 0, len(docsBytes)) - for _, docBytes := range docsBytes { - docs = append(docs, string(docBytes)) - } - sort.Strings(docs) - allDocsSorted := strings.Join(docs, "\n") - - // Test that the result matches the expected value. - assert.Equal(t, expectedMetricsEncoded, allDocsSorted) -} +//func TestEncodeMetric(t *testing.T) { +// // Prepare metrics to test. +// metrics := createTestMetrics(t) +// +// // Encode the metrics. +// model := &encodeModel{ +// dedot: true, +// dedup: true, +// mode: MappingNone, +// } +// +// var docsBytes [][]byte +// for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { +// documentBytes, err := model.encodeMetricDataPoint( +// metrics.ResourceMetrics().At(0).Resource(), +// metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), +// metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), +// metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) +// require.NoError(t, err) +// docsBytes = append(docsBytes, documentBytes) +// } +// +// assert.Len(t, docsBytes, len(strings.Split(expectedMetricsEncoded, "\n"))) +// // Convert the byte arrays to strings and sort the docs to make the test deterministic. +// docs := make([]string, 0, len(docsBytes)) +// for _, docBytes := range docsBytes { +// docs = append(docs, string(docBytes)) +// } +// sort.Strings(docs) +// allDocsSorted := strings.Join(docs, "\n") +// +// // Test that the result matches the expected value. +// assert.Equal(t, expectedMetricsEncoded, allDocsSorted) +//} func createTestMetrics(t *testing.T) pmetric.Metrics { metricsUnmarshaler := &pmetric.JSONUnmarshaler{} diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index 21b839823604..f57f16272c24 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -10,6 +10,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "slices" "strings" "sync" "testing" @@ -27,6 +28,29 @@ type itemRequest struct { Document json.RawMessage } +func itemRequestsSortFunc(a, b itemRequest) int { + comp := bytes.Compare(a.Action, b.Action) + if comp == 0 { + return bytes.Compare(a.Document, b.Document) + } + return comp +} + +func assertItemsEqual(t *testing.T, expected, actual []itemRequest, assertOrder bool) { + expectedItems := expected + actualItems := actual + if !assertOrder { + // Make copies to avoid mutating the args + expectedItems = make([]itemRequest, len(expected)) + copy(expectedItems, expected) + slices.SortFunc(expectedItems, itemRequestsSortFunc) + actualItems = make([]itemRequest, len(actual)) + copy(actualItems, actual) + slices.SortFunc(actualItems, itemRequestsSortFunc) + } + assert.Equal(t, expectedItems, actualItems) +} + type itemResponse struct { Status int `json:"status"` } From ad4a0195089eb496d2eb9c23595f2ffdd978be7a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 27 Jun 2024 17:50:29 +0100 Subject: [PATCH 06/24] Update changelog --- ...icsearch-exporter-attribute-based-data-stream-routing.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml index 80f294615042..b3fc3e5c1dca 100644 --- a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -7,10 +7,10 @@ change_type: enhancement component: exporter/elasticsearch # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Add data stream routing +note: Add data stream routing; Add metrics grouping # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [33755] +issues: [33755, 33756] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From 1eb99c533265293b927584affe401dc715c27fa2 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 27 Jun 2024 18:04:32 +0100 Subject: [PATCH 07/24] Fix scope override bug --- exporter/elasticsearchexporter/data-stream-router.go | 12 +++++------- exporter/elasticsearchexporter/model.go | 8 +------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/exporter/elasticsearchexporter/data-stream-router.go b/exporter/elasticsearchexporter/data-stream-router.go index 7b1044115e89..8e71f2c94ba7 100644 --- a/exporter/elasticsearchexporter/data-stream-router.go +++ b/exporter/elasticsearchexporter/data-stream-router.go @@ -55,13 +55,11 @@ func routeSpan( } func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string { - // Try to retrieve the attribute value from an existing attribute. - value := getFromAttributesNew(attributeName, "", recordAttributes, scopeAttributes, resourceAttributes) + // Fetch value according to precedence and default. + value := getFromAttributesNew(attributeName, defaultValue, recordAttributes, scopeAttributes, resourceAttributes) + + // Always set the value on the record, as record attributes have the highest precedence. + recordAttributes.PutStr(attributeName, value) - // If the value is not found, set the default value on the record. - if value == "" { - value = defaultValue - recordAttributes.PutStr(attributeName, value) - } return value } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index b3a8be3ec2c6..ad6534a3050f 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -185,7 +185,7 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) return buf.Bytes(), nil } -func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error { +func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error { hash := metricHash(dp.Timestamp(), dp.Attributes()) var ( document objmodel.Document @@ -194,12 +194,6 @@ func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Docume if document, ok = documents[hash]; !ok { document.AddAttributes("", resource.Attributes()) document.AddTimestamp("@timestamp", dp.Timestamp()) - if scopeDataset, ok := scope.Attributes().Get(dataStreamDataset); ok { - document.AddString(dataStreamDataset, scopeDataset.Str()) - } - if scopeNamespace, ok := scope.Attributes().Get(dataStreamNamespace); ok { - document.AddString(dataStreamNamespace, scopeNamespace.Str()) - } document.AddAttributes("", dp.Attributes()) } From d134ef793c83da5cf6610a791cd26e3e69d8013a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 27 Jun 2024 18:08:43 +0100 Subject: [PATCH 08/24] Remove ability to override DS type --- exporter/elasticsearchexporter/data-stream-router.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/exporter/elasticsearchexporter/data-stream-router.go b/exporter/elasticsearchexporter/data-stream-router.go index 8e71f2c94ba7..0079cb0f9030 100644 --- a/exporter/elasticsearchexporter/data-stream-router.go +++ b/exporter/elasticsearchexporter/data-stream-router.go @@ -22,8 +22,8 @@ func routeLogRecord( ) string { dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) - dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeLogs, record.Attributes(), scope.Attributes(), resource.Attributes()) - return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) + record.Attributes().PutStr(dataStreamType, defaultDataStreamTypeLogs) + return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeLogs, dataSet, namespace) } // routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. @@ -36,8 +36,8 @@ func routeDataPoint( ) string { dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeMetrics, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) + dataPoint.Attributes().PutStr(dataStreamType, defaultDataStreamTypeMetrics) + return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeMetrics, dataSet, namespace) } // routeSpan returns the name of the index to send the span to according to data stream routing attributes. @@ -50,8 +50,8 @@ func routeSpan( ) string { dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) - dataType := ensureAttribute(dataStreamType, defaultDataStreamTypeTraces, span.Attributes(), scope.Attributes(), resource.Attributes()) - return fmt.Sprintf("%s-%s-%s", dataType, dataSet, namespace) + span.Attributes().PutStr(dataStreamType, defaultDataStreamTypeTraces) + return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeTraces, dataSet, namespace) } func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string { From cf382bcd66001e3011327533d5faa1b0ac2163d4 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 27 Jun 2024 22:42:46 +0100 Subject: [PATCH 09/24] Refactor tests --- .../elasticsearchexporter/exporter_test.go | 106 ++++-------------- 1 file changed, 20 insertions(+), 86 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 7de3f3f2fda1..436769f919f2 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -10,7 +10,6 @@ import ( "fmt" "net/http" "runtime" - "strings" "sync" "sync/atomic" "testing" @@ -180,16 +179,8 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, create["_index"].(string)) + assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -218,15 +209,7 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - assert.Equal(t, "logs-record.dataset-resource.namespace", create["_index"].(string)) + assert.Equal(t, "logs-record.dataset-resource.namespace", actionJsonToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -255,15 +238,7 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - assert.Contains(t, create["_index"], "not-used-index") + assert.Contains(t, actionJsonToIndex(t, docs[0].Action), "not-used-index") return itemsAllOK(docs) }) @@ -287,17 +262,8 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - - assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + assert.Contains(t, actionJsonToIndex(t, docs[0].Action), expected) return itemsAllOK(docs) }) @@ -511,16 +477,8 @@ func TestExporterMetrics(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := "resource.prefix-metrics.index-data.point.suffix" - assert.Equal(t, expected, create["_index"].(string)) + assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -549,16 +507,8 @@ func TestExporterMetrics(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := "metrics-resource.dataset-data.point.namespace" - assert.Equal(t, expected, create["_index"].(string)) + assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -755,17 +705,8 @@ func TestExporterTraces(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - expected := "traces-span.dataset-default" - assert.Equal(t, expected, create["_index"].(string)) + assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -794,16 +735,7 @@ func TestExporterTraces(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - - assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.TracesIndex), true) + assert.Contains(t, actionJsonToIndex(t, docs[0].Action), defaultCfg.TracesIndex) return itemsAllOK(docs) }) @@ -830,17 +762,8 @@ func TestExporterTraces(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - - assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + assert.Contains(t, actionJsonToIndex(t, docs[0].Action), expected) return itemsAllOK(docs) }) @@ -1032,3 +955,14 @@ type roundTripperFunc func(*http.Request) (*http.Response, error) func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +func actionJsonToIndex(t *testing.T, actionJson json.RawMessage) string { + action := struct { + Create struct { + Index string `json:"_index"` + } `json:"create"` + }{} + err := json.Unmarshal(actionJson, &action) + require.NoError(t, err) + return action.Create.Index +} From fcbdd959d82cef93d6709840507eab0914889aa0 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 10:26:22 +0100 Subject: [PATCH 10/24] Update exporter/elasticsearchexporter/model.go Co-authored-by: Andrew Wilkins --- exporter/elasticsearchexporter/model.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index ad6534a3050f..6df90e214d78 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -400,6 +400,7 @@ func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecor document.AddTimestamp("@timestamp", record.ObservedTimestamp()) } +// TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity func metricHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { hasher := fnv.New32a() From 608d88ee8f3c06e50aba2edb006749396be40f4b Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 10:29:50 +0100 Subject: [PATCH 11/24] Rename to data_stream_router.go --- .../{data-stream-router.go => data_stream_router.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename exporter/elasticsearchexporter/{data-stream-router.go => data_stream_router.go} (100%) diff --git a/exporter/elasticsearchexporter/data-stream-router.go b/exporter/elasticsearchexporter/data_stream_router.go similarity index 100% rename from exporter/elasticsearchexporter/data-stream-router.go rename to exporter/elasticsearchexporter/data_stream_router.go From 0567fb1d46d8f61a0491d069bc7641c28ed81ef4 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:02:15 +0100 Subject: [PATCH 12/24] Remove distinction between data_stream and prefix_suffix mode --- exporter/elasticsearchexporter/README.md | 6 +- exporter/elasticsearchexporter/attribute.go | 6 +- .../data_stream_router.go | 77 +++++++++++-------- exporter/elasticsearchexporter/exporter.go | 26 +------ .../elasticsearchexporter/exporter_test.go | 14 ++-- 5 files changed, 63 insertions(+), 66 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index bd69d937417b..1308534ef01c 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -98,7 +98,7 @@ This can be customised through the following settings: to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Log record attributes take precedence over scope attributes, which take precedence over resource attributes. - `prefix_suffix` - uses resource or log record attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > log record attribute) + to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > scope attribute > log record attribute) - `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. @@ -112,7 +112,7 @@ This can be customised through the following settings: Data point attributes take precedence over scope attributes, which take precedence over resource attributes. - `prefix_suffix` - uses resource, scope or data point attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. - Data point attributes take precedence over scope attributes, which take precedence over resource attributes. + (priority: resource attribute > scope attribute > data point attribute) - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. @@ -123,7 +123,7 @@ This can be customised through the following settings: to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Span attributes take precedence over scope attributes, which take precedence over resource attributes. - `prefix_suffix` - uses resource or span attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > span attribute) + to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > scope attribute > span attribute) - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 25f08568ec86..78d036bdb73d 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -47,11 +47,11 @@ func getFromAttributes(name string, resource, scope, record attrGetter) string { return str } -func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string { +func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) { for _, attributeMap := range attributeMaps { if value, exists := attributeMap.Get(name); exists { - return value.AsString() + return value.AsString(), true } } - return defaultValue + return defaultValue, false } diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 0079cb0f9030..a52c11dd991a 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -12,54 +12,71 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes. -// It searches for the routing attributes on the log record, scope, and resource. -// It creates missing routing attributes on the log record if they are not found. +func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func( + pcommon.Map, + pcommon.Map, + pcommon.Map, + string, +) string { + return func( + recordAttr pcommon.Map, + scopeAttr pcommon.Map, + resourceAttr pcommon.Map, + fIndex string, + ) string { + // Order: + // 1. read data_stream.* from attributes + // 2. read elasticsearch.index.* from attributes + // 3. use default hardcoded data_stream.* + dataset, datasetExists := getFromAttributesNew(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr) + namespace, namespaceExists := getFromAttributesNew(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr) + dataStreamMode := datasetExists || namespaceExists + if !dataStreamMode { + prefix, prefixExists := getFromAttributesNew(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) + suffix, suffixExists := getFromAttributesNew(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) + if prefixExists || suffixExists { + return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } + } + recordAttr.PutStr(dataStreamDataset, dataset) + recordAttr.PutStr(dataStreamNamespace, namespace) + recordAttr.PutStr(dataStreamType, defaultDSType) + return fmt.Sprintf("%s-%s-%s", defaultDSType, dataset, namespace) + } +} + +// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes and prefix/suffix attributes. +// This function may mutate record attributes. func routeLogRecord( - record *plog.LogRecord, + record plog.LogRecord, scope pcommon.InstrumentationScope, resource pcommon.Resource, + fIndex string, ) string { - dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes()) - record.Attributes().PutStr(dataStreamType, defaultDataStreamTypeLogs) - return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeLogs, dataSet, namespace) + route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace) + return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex) } // routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. -// It searches for the routing attributes on the data point, scope, and resource. -// It creates missing routing attributes on the data point if they are not found. +// This function may mutate record attributes. func routeDataPoint( dataPoint pmetric.NumberDataPoint, scope pcommon.InstrumentationScope, resource pcommon.Resource, + fIndex string, ) string { - dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - dataPoint.Attributes().PutStr(dataStreamType, defaultDataStreamTypeMetrics) - return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeMetrics, dataSet, namespace) + route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace) + return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex) } // routeSpan returns the name of the index to send the span to according to data stream routing attributes. -// It searches for the routing attributes on the span, scope, and resource. -// It creates missing routing attributes on the span if they are not found. +// This function may mutate record attributes. func routeSpan( span ptrace.Span, scope pcommon.InstrumentationScope, resource pcommon.Resource, + fIndex string, ) string { - dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes()) - namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes()) - span.Attributes().PutStr(dataStreamType, defaultDataStreamTypeTraces) - return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeTraces, dataSet, namespace) -} - -func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string { - // Fetch value according to precedence and default. - value := getFromAttributesNew(attributeName, defaultValue, recordAttributes, scopeAttributes, resourceAttributes) - - // Always set the value on the record, as record attributes have the highest precedence. - recordAttributes.PutStr(attributeName, value) - - return value + route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace) + return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index c158f6cdb545..968168e0b547 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -122,14 +122,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - if e.dynamicIndexMode == DynamicIndexModeDataStream { - fIndex = routeLogRecord(&record, scope, resource) - } else { - prefix := getFromAttributes(indexPrefix, resource, scope, record) - suffix := getFromAttributes(indexSuffix, resource, scope, record) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } + fIndex = routeLogRecord(record, scope, resource, fIndex) } if e.logstashFormat.Enabled { @@ -225,13 +218,7 @@ func (e *elasticsearchExporter) getMetricDataPointIndex( ) (string, error) { fIndex := e.index if e.dynamicIndex { - if e.dynamicIndexMode == DynamicIndexModeDataStream { - fIndex = routeDataPoint(dataPoint, scope, resource) - } else { - prefix := getFromAttributesNew(indexPrefix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - suffix := getFromAttributesNew(indexSuffix, "", dataPoint.Attributes(), scope.Attributes(), resource.Attributes()) - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } + fIndex = routeDataPoint(dataPoint, scope, resource, fIndex) } if e.logstashFormat.Enabled { @@ -276,14 +263,7 @@ func (e *elasticsearchExporter) pushTraceData( func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - if e.dynamicIndexMode == DynamicIndexModeDataStream { - fIndex = routeSpan(span, scope, resource) - } else { - prefix := getFromAttributes(indexPrefix, resource, scope, span) - suffix := getFromAttributes(indexSuffix, resource, scope, span) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } + fIndex = routeSpan(span, scope, resource, fIndex) } if e.logstashFormat.Enabled { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 436769f919f2..cde04b751f12 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -167,7 +167,7 @@ func TestExporterLogs(t *testing.T) { <-done }) - t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -204,7 +204,7 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + t.Run("publish with dynamic index, data_stream", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) @@ -472,12 +472,12 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - expected := "resource.prefix-metrics.index-data.point.suffix" + expected := "resource.prefix-metrics.index-resource.suffix" assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) return itemsAllOK(docs) @@ -502,7 +502,7 @@ func TestExporterMetrics(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + t.Run("publish with dynamic index, data_stream", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) @@ -653,7 +653,7 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(2) }) - t.Run("publish with dynamic index prefix_suffix mode", func(t *testing.T) { + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() var ( @@ -698,7 +698,7 @@ func TestExporterTraces(t *testing.T) { rec.WaitItems(1) }) - t.Run("publish with dynamic index data_stream mode", func(t *testing.T) { + t.Run("publish with dynamic index, data_stream", func(t *testing.T) { rec := newBulkRecorder() From aa93bdee05bbb62c1c0f8e9ad3c23b0b6cadd26b Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:13:40 +0100 Subject: [PATCH 13/24] Update README --- exporter/elasticsearchexporter/README.md | 38 +++++++----------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 1308534ef01c..dab9f2b4081a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -91,39 +91,23 @@ This can be customised through the following settings: - `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default` -- `logs_dynamic_index` (optional): uses resource or log record attributes to dynamically construct index name. See `mode` for details. - - `enabled`(default=false): Enable/Disable dynamic index for log records - - `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed. - - `data_stream` - uses resource, scope or log record attributes `data_stream.dataset` and `data_stream.namespace` - to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. - Log record attributes take precedence over scope attributes, which take precedence over resource attributes. - - `prefix_suffix` - uses resource or log record attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > scope attribute > log record attribute) +- `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. + - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `logs-generic-default`. Except for prefix/suffix mode, the resulting docs will contain the corresponding `data_stream.*` fields. - `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. -- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. See `mode` for details. +- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. ⚠️ Note that metrics support is currently in development. - - `enabled`(default=true): Enable/disable dynamic index for metrics - - `mode` (default=`data_stream`): defines how dynamic index name is constructed. - - `data_stream` - uses resource, scope or data point attributes `data_stream.dataset` and `data_stream.namespace` - to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. - Data point attributes take precedence over scope attributes, which take precedence over resource attributes. - - `prefix_suffix` - uses resource, scope or data point attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. - (priority: resource attribute > scope attribute > data point attribute) + - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `metrics-generic-default`. Except for prefix/suffix mode, the resulting docs will contain the corresponding `data_stream.*` fields. - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. -- `traces_dynamic_index` (optional): uses resource or span attributes to dynamically construct index name. See `mode` for details. - - `enabled`(default=false): Enable/Disable dynamic index for trace spans - - `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed. - - `data_stream` - uses resource attributes `data_stream.dataset` and `data_stream.namespace` - to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. - Span attributes take precedence over scope attributes, which take precedence over resource attributes. - - `prefix_suffix` - uses resource or span attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix` - to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > scope attribute > span attribute) +- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. + - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `traces-generic-default`. Except for prefix/suffix mode, the resulting docs will contain the corresponding `data_stream.*` fields. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, @@ -135,7 +119,7 @@ This can be customised through the following settings: ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing -behaviours, which may be configured throug the following settings: +behaviours, which may be configured through the following settings: - `mapping`: Events are encoded to JSON. The `mapping` allows users to configure additional mapping rules. @@ -161,7 +145,7 @@ behaviours, which may be configured throug the following settings: In ECS mapping mode, the Elastisearch Exporter attempts to map fields from [OpenTelemetry Semantic Conventions][SemConv] (version 1.22.0) to [Elastic Common Schema][ECS]. -This mode may be used for compatibility with existing dashboards that work with with ECS. +This mode may be used for compatibility with existing dashboards that work with ECS. ### Elasticsearch ingest pipeline From 1a5e9baa9728c0263bdaee7de2df532bd3b73178 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:20:19 +0100 Subject: [PATCH 14/24] Remove mode config completely --- exporter/elasticsearchexporter/config.go | 3 +- exporter/elasticsearchexporter/config_test.go | 47 ------------------- exporter/elasticsearchexporter/exporter.go | 12 ++--- .../elasticsearchexporter/exporter_test.go | 4 -- exporter/elasticsearchexporter/factory.go | 9 ++-- .../testdata/config.yaml | 22 --------- 6 files changed, 9 insertions(+), 88 deletions(-) diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 6cda3cdbd0cd..d2f5c6d86c3d 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -82,8 +82,7 @@ type LogstashFormatSettings struct { } type DynamicIndexSetting struct { - Enabled bool `mapstructure:"enabled"` - Mode DynamicIndexMode `mapstructure:"mode"` + Enabled bool `mapstructure:"enabled"` } type DynamicIndexMode string diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index c0739e29468e..c409f175497e 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -62,17 +62,14 @@ func TestConfig(t *testing.T) { LogsIndex: "logs-generic-default", LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, MetricsIndex: "metrics-generic-default", MetricsDynamicIndex: DynamicIndexSetting{ Enabled: true, - Mode: DynamicIndexModeDataStream, }, TracesIndex: "trace_index", TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ @@ -127,17 +124,14 @@ func TestConfig(t *testing.T) { LogsIndex: "my_log_index", LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, MetricsIndex: "metrics-generic-default", MetricsDynamicIndex: DynamicIndexSetting{ Enabled: true, - Mode: DynamicIndexModeDataStream, }, TracesIndex: "traces-generic-default", TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ @@ -192,17 +186,14 @@ func TestConfig(t *testing.T) { LogsIndex: "logs-generic-default", LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, MetricsIndex: "my_metric_index", MetricsDynamicIndex: DynamicIndexSetting{ Enabled: true, - Mode: DynamicIndexModeDataStream, }, TracesIndex: "traces-generic-default", TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, Pipeline: "mypipeline", ClientConfig: confighttp.ClientConfig{ @@ -275,44 +266,6 @@ func TestConfig(t *testing.T) { cfg.Endpoint = "https://elastic.example.com:9200" }), }, - { - id: component.NewIDWithName(metadata.Type, "data-stream-mode"), - configFile: "config.yaml", - expected: withDefaultConfig(func(cfg *Config) { - cfg.Endpoint = "https://elastic.example.com:9200" - cfg.LogsDynamicIndex = DynamicIndexSetting{ - Enabled: true, - Mode: DynamicIndexModeDataStream, - } - cfg.MetricsDynamicIndex = DynamicIndexSetting{ - Enabled: true, - Mode: DynamicIndexModeDataStream, - } - cfg.TracesDynamicIndex = DynamicIndexSetting{ - Enabled: true, - Mode: DynamicIndexModeDataStream, - } - }), - }, - { - id: component.NewIDWithName(metadata.Type, "prefix-suffix-mode"), - configFile: "config.yaml", - expected: withDefaultConfig(func(cfg *Config) { - cfg.Endpoint = "https://elastic.example.com:9200" - cfg.LogsDynamicIndex = DynamicIndexSetting{ - Enabled: true, - Mode: DynamicIndexModePrefixSuffix, - } - cfg.MetricsDynamicIndex = DynamicIndexSetting{ - Enabled: true, - Mode: DynamicIndexModePrefixSuffix, - } - cfg.TracesDynamicIndex = DynamicIndexSetting{ - Enabled: true, - Mode: DynamicIndexModePrefixSuffix, - } - }), - }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 968168e0b547..ed017a24a1a5 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -39,7 +39,6 @@ func newExporter( set exporter.Settings, index string, dynamicIndex bool, - dynamicIndexMode DynamicIndexMode, ) (*elasticsearchExporter, error) { if err := cfg.Validate(); err != nil { return nil, err @@ -63,12 +62,11 @@ func newExporter( TelemetrySettings: set.TelemetrySettings, userAgent: userAgent, - config: cfg, - index: index, - dynamicIndex: dynamicIndex, - dynamicIndexMode: dynamicIndexMode, - model: model, - logstashFormat: cfg.LogstashFormat, + config: cfg, + index: index, + dynamicIndex: dynamicIndex, + model: model, + logstashFormat: cfg.LogstashFormat, }, nil } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index cde04b751f12..93bd960f72a3 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -216,7 +216,6 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.LogsDynamicIndex.Enabled = true - cfg.LogsDynamicIndex.Mode = DynamicIndexModeDataStream }) logs := newLogsWithAttributeAndResourceMap( map[string]string{ @@ -485,7 +484,6 @@ func TestExporterMetrics(t *testing.T) { exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { cfg.MetricsIndex = "metrics.index" - cfg.MetricsDynamicIndex.Mode = DynamicIndexModePrefixSuffix }) metrics := newMetricsWithAttributeAndResourceMap( map[string]string{ @@ -541,7 +539,6 @@ func TestExporterMetrics(t *testing.T) { exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { cfg.MetricsIndex = "metrics.index" cfg.Mapping.Mode = "ecs" - cfg.MetricsDynamicIndex.Mode = "data_stream" }) addToMetricSlice := func(metricSlice pmetric.MetricSlice) { @@ -713,7 +710,6 @@ func TestExporterTraces(t *testing.T) { exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { cfg.TracesDynamicIndex.Enabled = true - cfg.TracesDynamicIndex.Mode = DynamicIndexModeDataStream }) mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap( diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 6a831bf18d87..7826fb59a47e 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -52,17 +52,14 @@ func createDefaultConfig() component.Config { LogsIndex: defaultLogsIndex, LogsDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, MetricsIndex: defaultMetricsIndex, MetricsDynamicIndex: DynamicIndexSetting{ Enabled: true, - Mode: DynamicIndexModeDataStream, }, TracesIndex: defaultTracesIndex, TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, - Mode: DynamicIndexModePrefixSuffix, }, Retry: RetrySettings{ Enabled: true, @@ -107,7 +104,7 @@ func createLogsExporter( } logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled, cf.LogsDynamicIndex.Mode) + exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -132,7 +129,7 @@ func createMetricsExporter( cf := cfg.(*Config) logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled, cf.MetricsDynamicIndex.Mode) + exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -155,7 +152,7 @@ func createTracesExporter(ctx context.Context, cf := cfg.(*Config) logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled, cf.TracesDynamicIndex.Mode) + exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 3f1409c6cfab..acd6e92f9001 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -82,25 +82,3 @@ elasticsearch/deprecated_index: index: my_log_index elasticsearch/confighttp_endpoint: endpoint: https://elastic.example.com:9200 -elasticsearch/prefix-suffix-mode: - endpoint: https://elastic.example.com:9200 - logs_dynamic_index: - enabled: true - mode: prefix_suffix - metrics_dynamic_index: - enabled: true - mode: prefix_suffix - traces_dynamic_index: - enabled: true - mode: prefix_suffix -elasticsearch/data-stream-mode: - endpoint: https://elastic.example.com:9200 - logs_dynamic_index: - enabled: true - mode: data_stream - metrics_dynamic_index: - enabled: true - mode: data_stream - traces_dynamic_index: - enabled: true - mode: data_stream From 2283d7494df883f627e6d3ad36c81e3824317a7a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:22:28 +0100 Subject: [PATCH 15/24] Remove other ref to modes --- exporter/elasticsearchexporter/README.md | 8 +++----- exporter/elasticsearchexporter/config.go | 26 ------------------------ 2 files changed, 3 insertions(+), 31 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5e54e6a8379c..a52330ea0535 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -93,7 +93,7 @@ This can be customised through the following settings: - `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `logs-generic-default`. Except for prefix/suffix mode, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `logs-generic-default`. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. @@ -101,13 +101,13 @@ This can be customised through the following settings: - `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. ⚠️ Note that metrics support is currently in development. - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `metrics-generic-default`. Except for prefix/suffix mode, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `metrics-generic-default`. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. - `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `traces-generic-default`. Except for prefix/suffix mode, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `traces-generic-default`. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, @@ -193,8 +193,6 @@ The only metric types supported are: Other metric types (Histogram, Exponential Histogram, Summary) are ignored. -Dynamic indexing in `data_stream` mode is enabled by default for metrics. See `metrics_dynamic_index` configuration property for details. - [confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings [configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings [configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index d2f5c6d86c3d..4d0907777677 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -4,7 +4,6 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( - "encoding" "encoding/base64" "errors" "fmt" @@ -85,31 +84,6 @@ type DynamicIndexSetting struct { Enabled bool `mapstructure:"enabled"` } -type DynamicIndexMode string - -const DynamicIndexModeDataStream DynamicIndexMode = "data_stream" -const DynamicIndexModePrefixSuffix DynamicIndexMode = "prefix_suffix" - -var _ encoding.TextUnmarshaler = (*DynamicIndexMode)(nil) - -func (m *DynamicIndexMode) UnmarshalText(text []byte) error { - if m == nil { - return errors.New("cannot unmarshal to a nil *DynamicIndexMode") - } - - str := string(text) - switch str { - case string(DynamicIndexModeDataStream): - *m = DynamicIndexModeDataStream - case string(DynamicIndexModePrefixSuffix): - *m = DynamicIndexModePrefixSuffix - default: - return fmt.Errorf("unknown dynamic index mode %s", str) - } - - return nil -} - // AuthenticationSettings defines user authentication related settings. type AuthenticationSettings struct { // User is used to configure HTTP Basic Authentication. From d2f7d37acfd03227bae44b7ed75f007e95f51c6b Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:31:14 +0100 Subject: [PATCH 16/24] Fix tests --- exporter/elasticsearchexporter/exporter.go | 11 ++- exporter/elasticsearchexporter/model_test.go | 79 +++++++++++--------- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ed017a24a1a5..6cb64da0983d 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -24,12 +24,11 @@ type elasticsearchExporter struct { component.TelemetrySettings userAgent string - config *Config - index string - logstashFormat LogstashFormatSettings - dynamicIndex bool - dynamicIndexMode DynamicIndexMode - model mappingModel + config *Config + index string + logstashFormat LogstashFormatSettings + dynamicIndex bool + model mappingModel bulkIndexer *esBulkIndexerCurrent } diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 358faaa1c8be..675f78af83f7 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -6,6 +6,8 @@ package elasticsearchexporter import ( "fmt" "os" + "sort" + "strings" "testing" "time" @@ -80,40 +82,49 @@ func TestEncodeLog(t *testing.T) { }) } -//func TestEncodeMetric(t *testing.T) { -// // Prepare metrics to test. -// metrics := createTestMetrics(t) -// -// // Encode the metrics. -// model := &encodeModel{ -// dedot: true, -// dedup: true, -// mode: MappingNone, -// } -// -// var docsBytes [][]byte -// for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { -// documentBytes, err := model.encodeMetricDataPoint( -// metrics.ResourceMetrics().At(0).Resource(), -// metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), -// metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), -// metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) -// require.NoError(t, err) -// docsBytes = append(docsBytes, documentBytes) -// } -// -// assert.Len(t, docsBytes, len(strings.Split(expectedMetricsEncoded, "\n"))) -// // Convert the byte arrays to strings and sort the docs to make the test deterministic. -// docs := make([]string, 0, len(docsBytes)) -// for _, docBytes := range docsBytes { -// docs = append(docs, string(docBytes)) -// } -// sort.Strings(docs) -// allDocsSorted := strings.Join(docs, "\n") -// -// // Test that the result matches the expected value. -// assert.Equal(t, expectedMetricsEncoded, allDocsSorted) -//} +func TestEncodeMetric(t *testing.T) { + // Prepare metrics to test. + metrics := createTestMetrics(t) + + // Encode the metrics. + model := &encodeModel{ + dedot: true, + dedup: true, + mode: MappingECS, + } + + docs := make(map[uint32]objmodel.Document) + + var docsBytes [][]byte + for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { + err := model.upsertMetricDataPoint(docs, + metrics.ResourceMetrics().At(0).Resource(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) + require.NoError(t, err) + } + + for _, doc := range docs { + bytes, err := model.encodeDocument(doc) + require.NoError(t, err) + docsBytes = append(docsBytes, bytes) + } + + allDocsSorted := docBytesToSortedString(docsBytes) + assert.Equal(t, expectedMetricsEncoded, allDocsSorted) +} + +func docBytesToSortedString(docsBytes [][]byte) string { + // Convert the byte arrays to strings and sort the docs to make the test deterministic. + docs := make([]string, len(docsBytes)) + for i, docBytes := range docsBytes { + docs[i] = string(docBytes) + } + sort.Strings(docs) + allDocsSorted := strings.Join(docs, "\n") + return allDocsSorted +} func createTestMetrics(t *testing.T) pmetric.Metrics { metricsUnmarshaler := &pmetric.JSONUnmarshaler{} From 17a0e2ba7948eae5a9681224d4e527ea6bbbbcb8 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:35:19 +0100 Subject: [PATCH 17/24] Clarify fallback behavior --- exporter/elasticsearchexporter/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index a52330ea0535..a7e004e27215 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -93,7 +93,7 @@ This can be customised through the following settings: - `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `logs-generic-default`. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. @@ -101,13 +101,13 @@ This can be customised through the following settings: - `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. ⚠️ Note that metrics support is currently in development. - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `metrics-generic-default`. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. - `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name will be `traces-generic-default`. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, From 8ac0cb1d507303d563afbd1000e2c53b068e4f24 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:41:25 +0100 Subject: [PATCH 18/24] Update changelog --- ...csearch-exporter-attribute-based-data-stream-routing.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml index b3fc3e5c1dca..1331f6756147 100644 --- a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: exporter/elasticsearch # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Add data stream routing; Add metrics grouping +note: Add data stream routing # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [33755, 33756] @@ -15,7 +15,8 @@ issues: [33755, 33756] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: +subtext: `data_stream.dataset` and `data_stream.namespace` in attributes will be respected when `*_dynamic_index.enabled` config is true. + # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. From 1b7377d8a7b0533e8b7be0450f43e9b5c0ca97de Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:46:23 +0100 Subject: [PATCH 19/24] Update issues --- ...sticsearch-exporter-attribute-based-data-stream-routing.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml index 1331f6756147..d62ccb1dbdb0 100644 --- a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -10,7 +10,7 @@ component: exporter/elasticsearch note: Add data stream routing # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [33755, 33756] +issues: [33794, 33756] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From 4b3b73b20e9c1eb273dec2375821f7f751eb5062 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 11:51:24 +0100 Subject: [PATCH 20/24] Try to fix changelog --- ...ticsearch-exporter-attribute-based-data-stream-routing.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml index d62ccb1dbdb0..33ade28d21ed 100644 --- a/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml +++ b/.chloggen/elasticsearch-exporter-attribute-based-data-stream-routing.yaml @@ -15,7 +15,8 @@ issues: [33794, 33756] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: `data_stream.dataset` and `data_stream.namespace` in attributes will be respected when `*_dynamic_index.enabled` config is true. +subtext: | + `data_stream.dataset` and `data_stream.namespace` in attributes will be respected when config `*_dynamic_index.enabled` is true. # If your change doesn't affect end users or the exported elements of any package, From 1470c921d4051977ea189734fff88b015a7a43d0 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 12:11:30 +0100 Subject: [PATCH 21/24] Remove unused func --- exporter/elasticsearchexporter/attribute.go | 28 --------------------- 1 file changed, 28 deletions(-) diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 78d036bdb73d..6ff3bfc1f433 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -19,34 +19,6 @@ const ( defaultDataStreamTypeTraces = "traces" ) -// resource is higher priotized than record attribute -type attrGetter interface { - Attributes() pcommon.Map -} - -// retrieve attribute out of resource, scope, and record (span or log, if not found in resource) -// Deprecated: Use getFromAttributesNew instead. -func getFromAttributes(name string, resource, scope, record attrGetter) string { - var str string - val, exist := resource.Attributes().Get(name) - if !exist { - val, exist = scope.Attributes().Get(name) - if !exist { - val, exist = record.Attributes().Get(name) - if exist { - str = val.AsString() - } - } - if exist { - str = val.AsString() - } - } - if exist { - str = val.AsString() - } - return str -} - func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) { for _, attributeMap := range attributeMaps { if value, exists := attributeMap.Get(name); exists { From 88c6687ec9f224384f1db3da6bf5bc6c9d4607b3 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 12:17:58 +0100 Subject: [PATCH 22/24] Rename func --- exporter/elasticsearchexporter/attribute.go | 2 +- exporter/elasticsearchexporter/data_stream_router.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 6ff3bfc1f433..369e885e22cc 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -19,7 +19,7 @@ const ( defaultDataStreamTypeTraces = "traces" ) -func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) { +func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) { for _, attributeMap := range attributeMaps { if value, exists := attributeMap.Get(name); exists { return value.AsString(), true diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index a52c11dd991a..0368f6a1b958 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -28,12 +28,12 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin // 1. read data_stream.* from attributes // 2. read elasticsearch.index.* from attributes // 3. use default hardcoded data_stream.* - dataset, datasetExists := getFromAttributesNew(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr) - namespace, namespaceExists := getFromAttributesNew(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr) + dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr) + namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr) dataStreamMode := datasetExists || namespaceExists if !dataStreamMode { - prefix, prefixExists := getFromAttributesNew(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) - suffix, suffixExists := getFromAttributesNew(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) + prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) + suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) if prefixExists || suffixExists { return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } From 7bed5380c57bab8aae6f82a07e5caa9f70e99f2c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 12:36:25 +0100 Subject: [PATCH 23/24] Make linter happy --- .../elasticsearchexporter/exporter_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 93bd960f72a3..26aa49bf52d4 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -180,7 +180,7 @@ func TestExporterLogs(t *testing.T) { rec.Record(docs) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -209,7 +209,7 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - assert.Equal(t, "logs-record.dataset-resource.namespace", actionJsonToIndex(t, docs[0].Action)) + assert.Equal(t, "logs-record.dataset-resource.namespace", actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -237,7 +237,7 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - assert.Contains(t, actionJsonToIndex(t, docs[0].Action), "not-used-index") + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), "not-used-index") return itemsAllOK(docs) }) @@ -262,7 +262,7 @@ func TestExporterLogs(t *testing.T) { rec.Record(docs) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Contains(t, actionJsonToIndex(t, docs[0].Action), expected) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), expected) return itemsAllOK(docs) }) @@ -477,7 +477,7 @@ func TestExporterMetrics(t *testing.T) { rec.Record(docs) expected := "resource.prefix-metrics.index-resource.suffix" - assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -506,7 +506,7 @@ func TestExporterMetrics(t *testing.T) { rec.Record(docs) expected := "metrics-resource.dataset-data.point.namespace" - assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -703,7 +703,7 @@ func TestExporterTraces(t *testing.T) { rec.Record(docs) expected := "traces-span.dataset-default" - assert.Equal(t, expected, actionJsonToIndex(t, docs[0].Action)) + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -731,7 +731,7 @@ func TestExporterTraces(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - assert.Contains(t, actionJsonToIndex(t, docs[0].Action), defaultCfg.TracesIndex) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), defaultCfg.TracesIndex) return itemsAllOK(docs) }) @@ -759,7 +759,7 @@ func TestExporterTraces(t *testing.T) { rec.Record(docs) expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Contains(t, actionJsonToIndex(t, docs[0].Action), expected) + assert.Contains(t, actionJSONToIndex(t, docs[0].Action), expected) return itemsAllOK(docs) }) @@ -952,7 +952,7 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } -func actionJsonToIndex(t *testing.T, actionJson json.RawMessage) string { +func actionJSONToIndex(t *testing.T, actionJson json.RawMessage) string { action := struct { Create struct { Index string `json:"_index"` From a61917fec2be7c145cc50ec371356d85571fce6b Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 28 Jun 2024 12:48:09 +0100 Subject: [PATCH 24/24] Make linter happy again --- exporter/elasticsearchexporter/exporter_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 26aa49bf52d4..754cfaa4675f 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -952,13 +952,13 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } -func actionJSONToIndex(t *testing.T, actionJson json.RawMessage) string { +func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string { action := struct { Create struct { Index string `json:"_index"` } `json:"create"` }{} - err := json.Unmarshal(actionJson, &action) + err := json.Unmarshal(actionJSON, &action) require.NoError(t, err) return action.Create.Index }