Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Data stream routing based on data_stream.* attributes #33794

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6ceef08
[exporter/elasticsearch] route based on data stream attributes
andrzej-stencel Jun 3, 2024
dc8e980
add issue number to changelog entry
andrzej-stencel Jun 25, 2024
cb88ebc
fill in missing data stream attributes
andrzej-stencel Jun 26, 2024
7f41b41
make gotidy
andrzej-stencel Jun 26, 2024
2aa1c72
Add back metrics grouping logic; Fix missing scope DS attrs
carsonip Jun 27, 2024
ad4a019
Update changelog
carsonip Jun 27, 2024
1eb99c5
Fix scope override bug
carsonip Jun 27, 2024
d134ef7
Remove ability to override DS type
carsonip Jun 27, 2024
cf382bc
Refactor tests
carsonip Jun 27, 2024
fcbdd95
Update exporter/elasticsearchexporter/model.go
carsonip Jun 28, 2024
608d88e
Rename to data_stream_router.go
carsonip Jun 28, 2024
0567fb1
Remove distinction between data_stream and prefix_suffix mode
carsonip Jun 28, 2024
aa93bde
Update README
carsonip Jun 28, 2024
b17fb7c
Merge branch 'main' into route-on-data-stream-attributes-with-grouping
carsonip Jun 28, 2024
1a5e9ba
Remove mode config completely
carsonip Jun 28, 2024
2283d74
Remove other ref to modes
carsonip Jun 28, 2024
d2f7d37
Fix tests
carsonip Jun 28, 2024
17a0e2b
Clarify fallback behavior
carsonip Jun 28, 2024
8ac0cb1
Update changelog
carsonip Jun 28, 2024
1b7377d
Update issues
carsonip Jun 28, 2024
4b3b73b
Try to fix changelog
carsonip Jun 28, 2024
1470c92
Remove unused func
carsonip Jun 28, 2024
88c6687
Rename func
carsonip Jun 28, 2024
7bed538
Make linter happy
carsonip Jun 28, 2024
a61917f
Make linter happy again
carsonip Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data stream routing

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33794, 33756]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
`data_stream.dataset` and `data_stream.namespace` in attributes will be respected when config `*_dynamic_index.enabled` is true.


# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
48 changes: 26 additions & 22 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).
This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).

## Configuration options

Expand Down Expand Up @@ -83,39 +83,43 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export
### Elasticsearch document routing

Telemetry data will be written to signal specific data streams by default:
logs to `logs-generic-default`, and traces to `traces-generic-default`.
logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`.
This can be customised through the following settings:

- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to.
- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to.
The default value is `logs-generic-default`.

- `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`
- `logs_dynamic_index` (optional):
takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute)
- `enabled`(default=false): Enable/Disable dynamic index for log records
- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.

- `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.
- `metrics_dynamic_index` (optional):
takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `metrics_index`.

- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=false): Enable/Disable dynamic index for metrics
- `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.
- `traces_dynamic_index` (optional):
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute)
- `enabled`(default=false): Enable/Disable dynamic index for trace spans
- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.
- `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date,
e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.

- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

### Elasticsearch document mapping

The Elasticsearch exporter supports several document schemas and preprocessing
behaviours, which may be configured throug the following settings:
behaviours, which may be configured through the following settings:

- `mapping`: Events are encoded to JSON. The `mapping` allows users to
configure additional mapping rules.
Expand All @@ -141,7 +145,7 @@ behaviours, which may be configured throug the following settings:

In ECS mapping mode, the Elastisearch Exporter attempts to map fields from
[OpenTelemetry Semantic Conventions][SemConv] (version 1.22.0) to [Elastic Common Schema][ECS].
This mode may be used for compatibility with existing dashboards that work with with ECS.
This mode may be used for compatibility with existing dashboards that work with ECS.

### Elasticsearch ingest pipeline

Expand Down
46 changes: 13 additions & 33 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,23 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
)

// resource is higher priotized than record attribute
type attrGetter interface {
Attributes() pcommon.Map
}

// retrieve attribute out of resource, scope, and record (span or log, if not found in resource)
// Deprecated: Use getFromAttributesNew instead.
func getFromAttributes(name string, resource, scope, record attrGetter) string {
var str string
val, exist := resource.Attributes().Get(name)
if !exist {
val, exist = scope.Attributes().Get(name)
if !exist {
val, exist = record.Attributes().Get(name)
if exist {
str = val.AsString()
}
}
if exist {
str = val.AsString()
}
}
if exist {
str = val.AsString()
}
return str
}

func getFromAttributesNew(name string, defaultValue string, attributeMaps ...pcommon.Map) string {
func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) {
for _, attributeMap := range attributeMaps {
if value, exists := attributeMap.Get(name); exists {
return value.AsString()
return value.AsString(), true
}
}
return defaultValue
return defaultValue, false
}
57 changes: 42 additions & 15 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,21 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
},
TracesIndex: "trace_index",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -110,12 +119,21 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -163,12 +181,21 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
MetricsIndex: "my_metric_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func(
pcommon.Map,
pcommon.Map,
pcommon.Map,
string,
) string {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
) string {
// Order:
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
// 3. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr)
dataStreamMode := datasetExists || namespaceExists
if !dataStreamMode {
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr)
if prefixExists || suffixExists {
return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}
}
recordAttr.PutStr(dataStreamDataset, dataset)
recordAttr.PutStr(dataStreamNamespace, namespace)
recordAttr.PutStr(dataStreamType, defaultDSType)
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Sprintf("%s-%s-%s", defaultDSType, dataset, namespace)
}
}

// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes and prefix/suffix attributes.
// This function may mutate record attributes.
func routeLogRecord(
record plog.LogRecord,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
) string {
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
// This function may mutate record attributes.
func routeDataPoint(
dataPoint pmetric.NumberDataPoint,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
) string {
route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
}

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
// This function may mutate record attributes.
func routeSpan(
span ptrace.Span,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
) string {
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
}
Loading