diff --git a/.chloggen/elasticsearchexporter_otel-mode-doc-count.yaml b/.chloggen/elasticsearchexporter_mapping-hint.yaml similarity index 77% rename from .chloggen/elasticsearchexporter_otel-mode-doc-count.yaml rename to .chloggen/elasticsearchexporter_mapping-hint.yaml index 12668e7baa1f..3fa4662df777 100644 --- a/.chloggen/elasticsearchexporter_otel-mode-doc-count.yaml +++ b/.chloggen/elasticsearchexporter_mapping-hint.yaml @@ -7,15 +7,18 @@ change_type: enhancement component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Emit _doc_count for metric documents in OTel mode when data point attribute _doc_count is true +note: Implement elasticsearch.mapping.hints attribute handling for data points in OTel mapping mode # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [35348] +issues: [35479] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. subtext: + elasticsearch.mapping.hints takes a slice of strings. + `_doc_count` enables emitting `_doc_count` for the document. + `aggregate_metric_double` causes histogram or exponential histogram to be emitted as aggregate_metric_double. # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index e8990fc1a606..17d2dc8578f0 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -223,7 +223,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Sum().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(numberDataPoint{dp}); err != nil { + if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil { errs = append(errs, err) continue } @@ -232,7 +232,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Gauge().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(numberDataPoint{dp}); err != nil { + if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil { errs = append(errs, err) continue } @@ -245,7 +245,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.ExponentialHistogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(exponentialHistogramDataPoint{dp}); err != nil { + if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil { errs = append(errs, err) continue } @@ -258,7 +258,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Histogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(histogramDataPoint{dp}); err != nil { + if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil { errs = append(errs, err) continue } @@ -267,7 +267,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Summary().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(summaryDataPoint{dp}); err != nil { + if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil { errs = append(errs, err) continue } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index bb78458eada3..77ff8e2d0b47 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -1013,7 +1013,7 @@ func TestExporterMetrics(t *testing.T) { assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) - t.Run("otel mode _doc_count", func(t *testing.T) { + t.Run("otel mode _doc_count hint", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) @@ -1039,25 +1039,75 @@ func TestExporterMetrics(t *testing.T) { summaryDP.SetSum(1) summaryDP.SetCount(10) fillAttributeMap(summaryDP.Attributes(), map[string]any{ - "_doc_count": true, + "elasticsearch.mapping.hints": []string{"_doc_count"}, }) mustSendMetrics(t, exporter, metrics) - rec.WaitItems(2) + rec.WaitItems(1) expected := []itemRequest{ { - Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"attributes":{"_doc_count":true},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long","metrics.summary":"summary"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) + }) + + t.Run("otel mode aggregate_metric_double hint", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + metrics := pmetric.NewMetrics() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + histogramMetric := scopeMetric.Metrics().AppendEmpty() + histogramMetric.SetName("histogram.summary") + fooHistogram := histogramMetric.SetEmptyHistogram() + fooHistogram.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + fooDp := fooHistogram.DataPoints().AppendEmpty() + fooDp.SetSum(1) + fooDp.SetCount(10) + fillAttributeMap(fooDp.Attributes(), map[string]any{ + "elasticsearch.mapping.hints": []string{"_doc_count", "aggregate_metric_double"}, + }) + + exphistogramMetric := scopeMetric.Metrics().AppendEmpty() + exphistogramMetric.SetName("exphistogram.summary") + fooExpHistogram := exphistogramMetric.SetEmptyExponentialHistogram() + fooExpHistogram.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + fooExpDp := fooExpHistogram.DataPoints().AppendEmpty() + fooExpDp.SetTimestamp(pcommon.Timestamp(time.Hour)) + fooExpDp.SetSum(1) + fooExpDp.SetCount(10) + fillAttributeMap(fooExpDp.Attributes(), map[string]any{ + "elasticsearch.mapping.hints": []string{"_doc_count", "aggregate_metric_double"}, + }) + + mustSendMetrics(t, exporter, metrics) + + rec.WaitItems(1) + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.histogram.summary":"summary"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"histogram.summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), }, { - Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.exphistogram.summary":"summary"}}}`), + Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"exphistogram.summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), }, } assertItemsEqual(t, expected, rec.Items(), false) - }) t.Run("publish summary", func(t *testing.T) { diff --git a/exporter/elasticsearchexporter/mapping_hint.go b/exporter/elasticsearchexporter/mapping_hint.go new file mode 100644 index 000000000000..5362335c7b35 --- /dev/null +++ b/exporter/elasticsearchexporter/mapping_hint.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "slices" + + "go.opentelemetry.io/collector/pdata/pcommon" +) + +const ( + mappingHintsAttrKey = "elasticsearch.mapping.hints" +) + +type mappingHint string + +const ( + hintAggregateMetricDouble mappingHint = "aggregate_metric_double" + hintDocCount mappingHint = "_doc_count" +) + +type mappingHintGetter struct { + hints []mappingHint +} + +func newMappingHintGetter(attr pcommon.Map) (g mappingHintGetter) { + v, ok := attr.Get(mappingHintsAttrKey) + if !ok || v.Type() != pcommon.ValueTypeSlice { + return + } + slice := v.Slice() + g.hints = slices.Grow(g.hints, slice.Len()) + for i := 0; i < slice.Len(); i++ { + g.hints = append(g.hints, mappingHint(slice.At(i).Str())) + } + return +} + +func (g mappingHintGetter) HasMappingHint(hint mappingHint) bool { + return slices.Contains(g.hints, hint) +} diff --git a/exporter/elasticsearchexporter/mapping_hint_test.go b/exporter/elasticsearchexporter/mapping_hint_test.go new file mode 100644 index 000000000000..00ff4f9b3117 --- /dev/null +++ b/exporter/elasticsearchexporter/mapping_hint_test.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestHasHint(t *testing.T) { + tests := []struct { + name string + attrsFunc func() pcommon.Map + hint mappingHint + want bool + }{ + { + name: "empty map", + attrsFunc: pcommon.NewMap, + hint: hintAggregateMetricDouble, + want: false, + }, + { + name: "bad type", + attrsFunc: func() pcommon.Map { + m := pcommon.NewMap() + m.PutBool(mappingHintsAttrKey, true) + return m + }, + hint: hintAggregateMetricDouble, + want: false, + }, + { + name: "bad inner type", + attrsFunc: func() pcommon.Map { + m := pcommon.NewMap() + s := m.PutEmptySlice(mappingHintsAttrKey) + s.AppendEmpty().SetBool(true) + return m + }, + hint: hintAggregateMetricDouble, + want: false, + }, + { + name: "hit", + attrsFunc: func() pcommon.Map { + m := pcommon.NewMap() + s := m.PutEmptySlice(mappingHintsAttrKey) + s.AppendEmpty().SetStr(string(hintAggregateMetricDouble)) + return m + }, + hint: hintAggregateMetricDouble, + want: true, + }, + { + name: "hit 2nd", + attrsFunc: func() pcommon.Map { + m := pcommon.NewMap() + s := m.PutEmptySlice(mappingHintsAttrKey) + s.AppendEmpty().SetStr(string(hintDocCount)) + s.AppendEmpty().SetStr(string(hintAggregateMetricDouble)) + return m + }, + hint: hintAggregateMetricDouble, + want: true, + }, + { + name: "miss", + attrsFunc: func() pcommon.Map { + m := pcommon.NewMap() + s := m.PutEmptySlice(mappingHintsAttrKey) + s.AppendEmpty().SetStr(string(hintDocCount)) + return m + }, + hint: hintAggregateMetricDouble, + want: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, newMappingHintGetter(tt.attrsFunc()).HasMappingHint(tt.hint)) + }) + } +} diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 4ad88c6d5483..ce4b9a3a22da 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -12,6 +12,7 @@ import ( "hash" "hash/fnv" "math" + "slices" "time" "go.opentelemetry.io/collector/pdata/pcommon" @@ -90,6 +91,7 @@ type dataPoint interface { Value() (pcommon.Value, error) DynamicTemplate(pmetric.Metric) string DocCount() uint64 + HasMappingHint(mappingHint) bool } const ( @@ -304,8 +306,7 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) } - // Emit _doc_count if data point contains attribute _doc_count: true - if val, ok := dp.Attributes().Get("_doc_count"); ok && val.Bool() { + if dp.HasMappingHint(hintDocCount) { docCount := dp.DocCount() document.AddInt("_doc_count", int64(docCount)) } @@ -332,6 +333,11 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob type summaryDataPoint struct { pmetric.SummaryDataPoint + mappingHintGetter +} + +func newSummaryDataPoint(dp pmetric.SummaryDataPoint) summaryDataPoint { + return summaryDataPoint{SummaryDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} } func (dp summaryDataPoint) Value() (pcommon.Value, error) { @@ -354,9 +360,22 @@ func (dp summaryDataPoint) DocCount() uint64 { type exponentialHistogramDataPoint struct { pmetric.ExponentialHistogramDataPoint + mappingHintGetter +} + +func newExponentialHistogramDataPoint(dp pmetric.ExponentialHistogramDataPoint) exponentialHistogramDataPoint { + return exponentialHistogramDataPoint{ExponentialHistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} } func (dp exponentialHistogramDataPoint) Value() (pcommon.Value, error) { + if dp.HasMappingHint(hintAggregateMetricDouble) { + vm := pcommon.NewValueMap() + m := vm.Map() + m.PutDouble("sum", dp.Sum()) + m.PutInt("value_count", int64(dp.Count())) + return vm, nil + } + counts, values := exphistogram.ToTDigest(dp.ExponentialHistogramDataPoint) vm := pcommon.NewValueMap() @@ -376,6 +395,9 @@ func (dp exponentialHistogramDataPoint) Value() (pcommon.Value, error) { } func (dp exponentialHistogramDataPoint) DynamicTemplate(_ pmetric.Metric) string { + if dp.HasMappingHint(hintAggregateMetricDouble) { + return "summary" + } return "histogram" } @@ -385,13 +407,28 @@ func (dp exponentialHistogramDataPoint) DocCount() uint64 { type histogramDataPoint struct { pmetric.HistogramDataPoint + mappingHintGetter +} + +func newHistogramDataPoint(dp pmetric.HistogramDataPoint) histogramDataPoint { + return histogramDataPoint{HistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} } func (dp histogramDataPoint) Value() (pcommon.Value, error) { + if dp.HasMappingHint(hintAggregateMetricDouble) { + vm := pcommon.NewValueMap() + m := vm.Map() + m.PutDouble("sum", dp.Sum()) + m.PutInt("value_count", int64(dp.Count())) + return vm, nil + } return histogramToValue(dp.HistogramDataPoint) } func (dp histogramDataPoint) DynamicTemplate(_ pmetric.Metric) string { + if dp.HasMappingHint(hintAggregateMetricDouble) { + return "summary" + } return "histogram" } @@ -449,6 +486,11 @@ func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) { type numberDataPoint struct { pmetric.NumberDataPoint + mappingHintGetter +} + +func newNumberDataPoint(dp pmetric.NumberDataPoint) numberDataPoint { + return numberDataPoint{NumberDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} } func (dp numberDataPoint) Value() (pcommon.Value, error) { @@ -557,6 +599,8 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr // Move them to the top of the document and remove them from the record document.AddAttribute(key, val) return true + case mappingHintsAttrKey: + return true } return false }) @@ -829,7 +873,7 @@ func metricECSHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp)) hasher.Write(timestampBuf) - mapHashExcludeDataStreamAttr(hasher, attributes) + mapHashExcludeReservedAttrs(hasher, attributes) return hasher.Sum32() } @@ -846,20 +890,23 @@ func metricOTelHash(dp dataPoint, scopeAttrs pcommon.Map, unit string) uint32 { hasher.Write([]byte(unit)) - mapHashExcludeDataStreamAttr(hasher, scopeAttrs) - mapHashExcludeDataStreamAttr(hasher, dp.Attributes()) + mapHashExcludeReservedAttrs(hasher, scopeAttrs) + mapHashExcludeReservedAttrs(hasher, dp.Attributes(), mappingHintsAttrKey) return hasher.Sum32() } -// mapHashExcludeDataStreamAttr is mapHash but ignoring DS attributes. -// It is useful for cases where index is already considered during routing and no need to be considered in hashing. -func mapHashExcludeDataStreamAttr(hasher hash.Hash, m pcommon.Map) { +// mapHashExcludeReservedAttrs is mapHash but ignoring some reserved attributes. +// e.g. index is already considered during routing and DS attributes do not need to be considered in hashing +func mapHashExcludeReservedAttrs(hasher hash.Hash, m pcommon.Map, extra ...string) { m.Range(func(k string, v pcommon.Value) bool { switch k { case dataStreamType, dataStreamDataset, dataStreamNamespace: return true } + if slices.Contains(extra, k) { + return true + } hasher.Write([]byte(k)) valueHash(hasher, v) diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 5ce8a04115e8..ec17db600f78 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -108,7 +108,7 @@ func TestEncodeMetric(t *testing.T) { metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), "", metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), - numberDataPoint{metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)}, + newNumberDataPoint(metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)), ) require.NoError(t, err) }