Skip to content

Commit

Permalink
[exporter/elasticsearch] Implement elasticsearch.mapping.hints attrib…
Browse files Browse the repository at this point in the history
…ute handling for data points in OTel mapping mode (#35479)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Supersedes #35348 

elasticsearch.mapping.hints takes a slice of strings.
`_doc_count` enables emitting `_doc_count` for the document.
`aggregate_metric_double` causes histogram or exponential histogram to
be emitted as aggregate_metric_double.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored Oct 1, 2024
1 parent df53fdb commit d4aa284
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ change_type: enhancement
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Emit _doc_count for metric documents in OTel mode when data point attribute _doc_count is true
note: Implement elasticsearch.mapping.hints attribute handling for data points in OTel mapping mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35348]
issues: [35479]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
elasticsearch.mapping.hints takes a slice of strings.
`_doc_count` enables emitting `_doc_count` for the document.
`aggregate_metric_double` causes histogram or exponential histogram to be emitted as aggregate_metric_double.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
Expand Down
10 changes: 5 additions & 5 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(numberDataPoint{dp}); err != nil {
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -232,7 +232,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(numberDataPoint{dp}); err != nil {
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -245,7 +245,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(exponentialHistogramDataPoint{dp}); err != nil {
if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -258,7 +258,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(histogramDataPoint{dp}); err != nil {
if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil {
errs = append(errs, err)
continue
}
Expand All @@ -267,7 +267,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(summaryDataPoint{dp}); err != nil {
if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil {
errs = append(errs, err)
continue
}
Expand Down
66 changes: 58 additions & 8 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ func TestExporterMetrics(t *testing.T) {
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("otel mode _doc_count", func(t *testing.T) {
t.Run("otel mode _doc_count hint", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
Expand All @@ -1039,25 +1039,75 @@ func TestExporterMetrics(t *testing.T) {
summaryDP.SetSum(1)
summaryDP.SetCount(10)
fillAttributeMap(summaryDP.Attributes(), map[string]any{
"_doc_count": true,
"elasticsearch.mapping.hints": []string{"_doc_count"},
})

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(2)
rec.WaitItems(1)
expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.summary":"summary"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"attributes":{"_doc_count":true},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long","metrics.summary":"summary"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode aggregate_metric_double hint", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

histogramMetric := scopeMetric.Metrics().AppendEmpty()
histogramMetric.SetName("histogram.summary")
fooHistogram := histogramMetric.SetEmptyHistogram()
fooHistogram.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
fooDp := fooHistogram.DataPoints().AppendEmpty()
fooDp.SetSum(1)
fooDp.SetCount(10)
fillAttributeMap(fooDp.Attributes(), map[string]any{
"elasticsearch.mapping.hints": []string{"_doc_count", "aggregate_metric_double"},
})

exphistogramMetric := scopeMetric.Metrics().AppendEmpty()
exphistogramMetric.SetName("exphistogram.summary")
fooExpHistogram := exphistogramMetric.SetEmptyExponentialHistogram()
fooExpHistogram.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
fooExpDp := fooExpHistogram.DataPoints().AppendEmpty()
fooExpDp.SetTimestamp(pcommon.Timestamp(time.Hour))
fooExpDp.SetSum(1)
fooExpDp.SetCount(10)
fillAttributeMap(fooExpDp.Attributes(), map[string]any{
"elasticsearch.mapping.hints": []string{"_doc_count", "aggregate_metric_double"},
})

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(1)
expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.histogram.summary":"summary"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"histogram.summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.exphistogram.summary":"summary"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"exphistogram.summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)

})

t.Run("publish summary", func(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions exporter/elasticsearchexporter/mapping_hint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"slices"

"go.opentelemetry.io/collector/pdata/pcommon"
)

const (
mappingHintsAttrKey = "elasticsearch.mapping.hints"
)

type mappingHint string

const (
hintAggregateMetricDouble mappingHint = "aggregate_metric_double"
hintDocCount mappingHint = "_doc_count"
)

type mappingHintGetter struct {
hints []mappingHint
}

func newMappingHintGetter(attr pcommon.Map) (g mappingHintGetter) {
v, ok := attr.Get(mappingHintsAttrKey)
if !ok || v.Type() != pcommon.ValueTypeSlice {
return
}
slice := v.Slice()
g.hints = slices.Grow(g.hints, slice.Len())
for i := 0; i < slice.Len(); i++ {
g.hints = append(g.hints, mappingHint(slice.At(i).Str()))
}
return
}

func (g mappingHintGetter) HasMappingHint(hint mappingHint) bool {
return slices.Contains(g.hints, hint)
}
88 changes: 88 additions & 0 deletions exporter/elasticsearchexporter/mapping_hint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestHasHint(t *testing.T) {
tests := []struct {
name string
attrsFunc func() pcommon.Map
hint mappingHint
want bool
}{
{
name: "empty map",
attrsFunc: pcommon.NewMap,
hint: hintAggregateMetricDouble,
want: false,
},
{
name: "bad type",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
m.PutBool(mappingHintsAttrKey, true)
return m
},
hint: hintAggregateMetricDouble,
want: false,
},
{
name: "bad inner type",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetBool(true)
return m
},
hint: hintAggregateMetricDouble,
want: false,
},
{
name: "hit",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetStr(string(hintAggregateMetricDouble))
return m
},
hint: hintAggregateMetricDouble,
want: true,
},
{
name: "hit 2nd",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetStr(string(hintDocCount))
s.AppendEmpty().SetStr(string(hintAggregateMetricDouble))
return m
},
hint: hintAggregateMetricDouble,
want: true,
},
{
name: "miss",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetStr(string(hintDocCount))
return m
},
hint: hintAggregateMetricDouble,
want: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, newMappingHintGetter(tt.attrsFunc()).HasMappingHint(tt.hint))
})
}
}
Loading

0 comments on commit d4aa284

Please sign in to comment.