From 5e523c5a19966ab3f09c3a087dfb6f6ad40f21a3 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 10 Jan 2025 17:51:41 +0100 Subject: [PATCH] Pool buffers --- exporter/elasticsearchexporter/bufferpol.go | 40 ++++++++++++ exporter/elasticsearchexporter/exporter.go | 29 +++++---- exporter/elasticsearchexporter/model.go | 64 +++++++++---------- exporter/elasticsearchexporter/model_test.go | 52 +++++++++------ .../elasticsearchexporter/pdata_serializer.go | 38 +++++------ .../pdata_serializer_test.go | 4 +- 6 files changed, 141 insertions(+), 86 deletions(-) create mode 100644 exporter/elasticsearchexporter/bufferpol.go diff --git a/exporter/elasticsearchexporter/bufferpol.go b/exporter/elasticsearchexporter/bufferpol.go new file mode 100644 index 000000000000..257aaaf09e0f --- /dev/null +++ b/exporter/elasticsearchexporter/bufferpol.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "bytes" + "io" + "sync" +) + +type BufferPool struct { + pool *sync.Pool +} + +func NewBufferPool() *BufferPool { + return &BufferPool{pool: &sync.Pool{New: func() any { return &bytes.Buffer{} }}} +} + +func (w *BufferPool) NewPooledBuffer() PooledBuffer { + return PooledBuffer{ + Buffer: w.pool.Get().(*bytes.Buffer), + pool: w.pool, + } +} + +type PooledBuffer struct { + Buffer *bytes.Buffer + pool *sync.Pool +} + +func (p PooledBuffer) recycle() { + p.Buffer.Reset() + p.pool.Put(p.Buffer) +} + +func (p PooledBuffer) WriteTo(w io.Writer) (n int64, err error) { + defer p.recycle() + return bytes.NewReader(p.Buffer.Bytes()).WriteTo(w) +} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index dadc5e30e526..d1410761f336 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -4,7 +4,6 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( - "bytes" "context" "errors" "fmt" @@ -34,6 +33,8 @@ type elasticsearchExporter struct { wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer + + bufferPool *BufferPool } func newExporter( @@ -67,6 +68,7 @@ func newExporter( model: model, logstashFormat: cfg.LogstashFormat, otel: otel, + bufferPool: NewBufferPool(), } } @@ -171,11 +173,12 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } - document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + buffer := e.bufferPool.NewPooledBuffer() + err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buffer.Buffer) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + return bulkIndexerSession.Add(ctx, fIndex, buffer, nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -285,12 +288,13 @@ func (e *elasticsearchExporter) pushMetricsData( for fIndex, groupedDataPoints := range groupedDataPointsByIndex { for _, dataPoints := range groupedDataPoints { - docBytes, dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs) + buf := e.bufferPool.NewPooledBuffer() + dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer) if err != nil { errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), dynamicTemplates); err != nil { + if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -405,11 +409,12 @@ func (e *elasticsearchExporter) pushTraceRecord( fIndex = formattedIndex } - document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) + buf := e.bufferPool.NewPooledBuffer() + err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer) if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + return bulkIndexerSession.Add(ctx, fIndex, buf, nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -434,13 +439,11 @@ func (e *elasticsearchExporter) pushSpanEvent( } fIndex = formattedIndex } - docBytes, err := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) - if err != nil { - return err - } - if docBytes == nil { + buf := e.bufferPool.NewPooledBuffer() + e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer) + if buf.Buffer.Len() == 0 { return nil } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil) + return bulkIndexerSession.Add(ctx, fIndex, buf, nil) } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 90a2194f3f5d..88bcf6e7bdf0 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -16,7 +16,6 @@ import ( "strings" "time" - jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -77,12 +76,12 @@ var resourceAttrsToPreserve = map[string]bool{ var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode") type mappingModel interface { - encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) - encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error) - encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) + encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, *bytes.Buffer) error + encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, *bytes.Buffer) error + encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) hashDataPoint(dataPoint) uint32 - encodeDocument(objmodel.Document) ([]byte, error) - encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error) + encodeDocument(objmodel.Document, *bytes.Buffer) error + encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -113,24 +112,22 @@ const ( attributeField = "attribute" ) -func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { +func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingECS: document = m.encodeLogECSMode(resource, record, scope) case MappingOTel: - return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record) + return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record, buf) case MappingBodyMap: - return m.encodeLogBodyMapMode(record) + return m.encodeLogBodyMapMode(record, buf) default: document = m.encodeLogDefaultMode(resource, record, scope) } // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false document.Dedup(m.mode != MappingOTel) - var buf bytes.Buffer - err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) - return buf.Bytes(), err + return document.Serialize(buf, m.dedot, m.mode == MappingOTel) } func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { @@ -154,13 +151,14 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo return document } -func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { +func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord, buf *bytes.Buffer) error { body := record.Body() if body.Type() != pcommon.ValueTypeMap { - return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type()) + return fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type()) } - return jsoniter.Marshal(body.Map().AsRaw()) + serializeMap(body.Map(), buf) + return nil } func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { @@ -205,16 +203,15 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } -func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) { +func (m *encodeModel) encodeDocument(document objmodel.Document, buf *bytes.Buffer) error { // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false document.Dedup(m.mode != MappingOTel) - var buf bytes.Buffer - err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) + err := document.Serialize(buf, m.dedot, m.mode == MappingOTel) if err != nil { - return nil, err + return err } - return buf.Bytes(), nil + return nil } // upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index @@ -228,7 +225,7 @@ func (m *encodeModel) hashDataPoint(dp dataPoint) uint32 { } } -func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error) { +func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { dp0 := dataPoints[0] var document objmodel.Document encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve) @@ -243,17 +240,17 @@ func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoi } document.AddAttribute(dp.Metric().Name(), value) } - docBytes, err := m.encodeDocument(document) + err := m.encodeDocument(document, buf) - return docBytes, document.DynamicTemplates(), err + return document.DynamicTemplates(), err } -func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error) { +func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { switch m.mode { case MappingOTel: - return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors) + return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, buf) default: - return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors) + return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors, buf) } } @@ -489,19 +486,18 @@ func (dp numberDataPoint) Metric() pmetric.Metric { var errInvalidNumberDataPoint = errors.New("invalid number data point") -func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { +func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingOTel: - return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span) + return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span, buf) default: document = m.encodeSpanDefaultMode(resource, span, scope) } // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false document.Dedup(m.mode != MappingOTel) - var buf bytes.Buffer - err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) - return buf.Bytes(), err + err := document.Serialize(buf, m.dedot, m.mode == MappingOTel) + return err } func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document { @@ -524,13 +520,13 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra return document } -func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { +func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) { if m.mode != MappingOTel { // Currently span events are stored separately only in OTel mapping mode. // In other modes, they are stored within the span document. - return nil, nil + return } - return serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent) + serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, buf) } func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 4067a136d798..0f9b27ba2d2d 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -56,9 +56,10 @@ var ( func TestEncodeSpan(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceSpans() - spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "") + var buf bytes.Buffer + err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "", &buf) assert.NoError(t, err) - assert.Equal(t, expectedSpanBody, string(spanByte)) + assert.Equal(t, expectedSpanBody, buf.String()) } func TestEncodeLog(t *testing.T) { @@ -66,26 +67,29 @@ func TestEncodeLog(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceLogs() td.ScopeLogs().At(0).LogRecords().At(0).SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Date(2023, 4, 19, 3, 4, 5, 6, time.UTC))) - logByte, err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl()) + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) assert.NoError(t, err) - assert.Equal(t, expectedLogBody, string(logByte)) + assert.Equal(t, expectedLogBody, buf.String()) }) t.Run("both timestamp and observedTimestamp empty", func(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceLogs() - logByte, err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl()) + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) assert.NoError(t, err) - assert.Equal(t, expectedLogBodyWithEmptyTimestamp, string(logByte)) + assert.Equal(t, expectedLogBodyWithEmptyTimestamp, buf.String()) }) t.Run("dedot true", func(t *testing.T) { model := &encodeModel{dedot: true} td := mockResourceLogs() td.Resource().Attributes().PutStr("foo.bar", "baz") - logByte, err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl()) + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) require.NoError(t, err) - require.Equal(t, expectedLogBodyDeDottedWithEmptyTimestamp, string(logByte)) + require.Equal(t, expectedLogBodyDeDottedWithEmptyTimestamp, buf.String()) }) } @@ -118,9 +122,12 @@ func TestEncodeMetric(t *testing.T) { } for _, dataPoints := range groupedDataPoints { - bytes, _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, nil) + var buf bytes.Buffer + errors := make([]error, 0) + _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, &errors, &buf) + require.Empty(t, errors, err) require.NoError(t, err) - docsBytes = append(docsBytes, bytes) + docsBytes = append(docsBytes, buf.Bytes()) } allDocsSorted := docBytesToSortedString(docsBytes) @@ -338,10 +345,11 @@ func TestEncodeLogECSModeDuplication(t *testing.T) { mode: MappingECS, dedot: true, } - doc, err := m.encodeLog(resource, "", record, scope, "") + var buf bytes.Buffer + err = m.encodeLog(resource, "", record, scope, "", &buf) require.NoError(t, err) - assert.Equal(t, want, string(doc)) + assert.Equal(t, want, buf.String()) } func TestEncodeLogECSMode(t *testing.T) { @@ -1116,7 +1124,8 @@ func TestEncodeLogOtelMode(t *testing.T) { // This sets the data_stream values default or derived from the record/scope/resources routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name()) - b, err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL) + var buf bytes.Buffer + err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL, &buf) require.NoError(t, err) want := tc.rec @@ -1125,7 +1134,7 @@ func TestEncodeLogOtelMode(t *testing.T) { } var got OTelRecord - err = json.Unmarshal(b, &got) + err = json.Unmarshal(buf.Bytes(), &got) require.NoError(t, err) @@ -1248,9 +1257,11 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { td := mockResourceLogs() td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo", "scalar") td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.bar", "baz") - encoded, err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "") + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", &buf) assert.NoError(t, err) + encoded := buf.Bytes() assert.True(t, gjson.ValidBytes(encoded)) assert.False(t, gjson.GetBytes(encoded, "Attributes\\.foo").Exists()) fooValue := gjson.GetBytes(encoded, "Attributes\\.foo\\.value") @@ -1260,9 +1271,11 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { // If there is an attribute named "foo.value", then "foo" would be omitted rather than renamed. td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.value", "foovalue") - encoded, err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "") + buf = bytes.Buffer{} + err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", &buf) assert.NoError(t, err) + encoded = buf.Bytes() assert.False(t, gjson.GetBytes(encoded, "Attributes\\.foo").Exists()) fooValue = gjson.GetBytes(encoded, "Attributes\\.foo\\.value") assert.Equal(t, "foovalue", fooValue.Str) @@ -1289,7 +1302,8 @@ func TestEncodeLogBodyMapMode(t *testing.T) { bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) m := encodeModel{} - got, err := m.encodeLogBodyMapMode(logRecord) + var buf bytes.Buffer + err := m.encodeLogBodyMapMode(logRecord, &buf) require.NoError(t, err) require.JSONEq(t, `{ @@ -1299,11 +1313,11 @@ func TestEncodeLogBodyMapMode(t *testing.T) { "key.a": "a", "key.a.b": "b", "pi": 3.14 - }`, string(got)) + }`, buf.String()) // invalid body map logRecord.Body().SetEmptySlice() - _, err = m.encodeLogBodyMapMode(logRecord) + err = m.encodeLogBodyMapMode(logRecord, &bytes.Buffer{}) require.Error(t, err) require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode) } diff --git a/exporter/elasticsearchexporter/pdata_serializer.go b/exporter/elasticsearchexporter/pdata_serializer.go index ec3cd1caea94..f0697ea7ae1f 100644 --- a/exporter/elasticsearchexporter/pdata_serializer.go +++ b/exporter/elasticsearchexporter/pdata_serializer.go @@ -17,14 +17,13 @@ import ( const tsLayout = "2006-01-02T15:04:05.000000000Z" -func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error) { +func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { if len(dataPoints) == 0 { - return nil, nil, nil + return nil, nil } dp0 := dataPoints[0] - var buf bytes.Buffer - v := json.NewVisitor(&buf) + v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. v.SetExplicitRadixPoint(true) @@ -40,7 +39,7 @@ func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope writeScope(v, scope, scopeSchemaURL, true) dynamicTemplates := serializeDataPoints(v, dataPoints, validationErrors) _ = v.OnObjectFinished() - return buf.Bytes(), dynamicTemplates, nil + return dynamicTemplates, nil } func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErrors *[]error) map[string]string { @@ -76,10 +75,8 @@ func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErro return dynamicTemplates } -func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent) ([]byte, error) { - var buf bytes.Buffer - - v := json.NewVisitor(&buf) +func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, buf *bytes.Buffer) { + v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. v.SetExplicitRadixPoint(true) @@ -103,13 +100,10 @@ func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, sco writeResource(v, resource, resourceSchemaURL, false) writeScope(v, scope, scopeSchemaURL, false) _ = v.OnObjectFinished() - return buf.Bytes(), nil } -func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span) ([]byte, error) { - var buf bytes.Buffer - - v := json.NewVisitor(&buf) +func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, buf *bytes.Buffer) error { + v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. v.SetExplicitRadixPoint(true) @@ -132,7 +126,7 @@ func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pc writeResource(v, resource, resourceSchemaURL, false) writeScope(v, scope, scopeSchemaURL, false) _ = v.OnObjectFinished() - return buf.Bytes(), nil + return nil } func writeStatus(v *json.Visitor, status ptrace.Status) { @@ -160,10 +154,16 @@ func writeSpanLinks(v *json.Visitor, span ptrace.Span) { _ = v.OnArrayFinished() } -func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord) ([]byte, error) { - var buf bytes.Buffer +func serializeMap(m pcommon.Map, buf *bytes.Buffer) { + v := json.NewVisitor(buf) + // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. + // This is required to generate the correct dynamic mapping in ES. + v.SetExplicitRadixPoint(true) + writeMap(v, m, false) +} - v := json.NewVisitor(&buf) +func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, buf *bytes.Buffer) error { + v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. v.SetExplicitRadixPoint(true) @@ -193,7 +193,7 @@ func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pco writeScope(v, scope, scopeSchemaURL, false) writeLogBody(v, record, isEvent) _ = v.OnObjectFinished() - return buf.Bytes(), nil + return nil } func writeDataStream(v *json.Visitor, attributes pcommon.Map) { diff --git a/exporter/elasticsearchexporter/pdata_serializer_test.go b/exporter/elasticsearchexporter/pdata_serializer_test.go index ff901da27a1c..1aa61d0ccb7a 100644 --- a/exporter/elasticsearchexporter/pdata_serializer_test.go +++ b/exporter/elasticsearchexporter/pdata_serializer_test.go @@ -119,10 +119,12 @@ func TestSerializeLog(t *testing.T) { record := scopeLogs.LogRecords().AppendEmpty() tt.logCustomizer(resourceLogs.Resource(), scopeLogs.Scope(), record) - logBytes, err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record) + var buf bytes.Buffer + err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, &buf) if (err != nil) != tt.wantErr { t.Errorf("serializeLog() error = %v, wantErr %v", err, tt.wantErr) } + logBytes := buf.Bytes() eventAsJSON := string(logBytes) var result any decoder := json.NewDecoder(bytes.NewBuffer(logBytes))