Skip to content

Commit

Permalink
Pool buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Jan 10, 2025
1 parent b15169d commit 5e523c5
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 86 deletions.
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/bufferpol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"bytes"
"io"
"sync"
)

type BufferPool struct {
pool *sync.Pool
}

func NewBufferPool() *BufferPool {
return &BufferPool{pool: &sync.Pool{New: func() any { return &bytes.Buffer{} }}}
}

func (w *BufferPool) NewPooledBuffer() PooledBuffer {
return PooledBuffer{
Buffer: w.pool.Get().(*bytes.Buffer),
pool: w.pool,
}
}

type PooledBuffer struct {
Buffer *bytes.Buffer
pool *sync.Pool
}

func (p PooledBuffer) recycle() {
p.Buffer.Reset()
p.pool.Put(p.Buffer)
}

func (p PooledBuffer) WriteTo(w io.Writer) (n int64, err error) {
defer p.recycle()
return bytes.NewReader(p.Buffer.Bytes()).WriteTo(w)
}
29 changes: 16 additions & 13 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -34,6 +33,8 @@ type elasticsearchExporter struct {

wg sync.WaitGroup // active sessions
bulkIndexer bulkIndexer

bufferPool *BufferPool
}

func newExporter(
Expand Down Expand Up @@ -67,6 +68,7 @@ func newExporter(
model: model,
logstashFormat: cfg.LogstashFormat,
otel: otel,
bufferPool: NewBufferPool(),
}
}

Expand Down Expand Up @@ -171,11 +173,12 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
buffer := e.bufferPool.NewPooledBuffer()
err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buffer.Buffer)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
return bulkIndexerSession.Add(ctx, fIndex, buffer, nil)
}

func (e *elasticsearchExporter) pushMetricsData(
Expand Down Expand Up @@ -285,12 +288,13 @@ func (e *elasticsearchExporter) pushMetricsData(

for fIndex, groupedDataPoints := range groupedDataPointsByIndex {
for _, dataPoints := range groupedDataPoints {
docBytes, dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs)
buf := e.bufferPool.NewPooledBuffer()
dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer)
if err != nil {
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), dynamicTemplates); err != nil {
if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand Down Expand Up @@ -405,11 +409,12 @@ func (e *elasticsearchExporter) pushTraceRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
buf := e.bufferPool.NewPooledBuffer()
err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer)
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
}

func (e *elasticsearchExporter) pushSpanEvent(
Expand All @@ -434,13 +439,11 @@ func (e *elasticsearchExporter) pushSpanEvent(
}
fIndex = formattedIndex
}
docBytes, err := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
if err != nil {
return err
}
if docBytes == nil {
buf := e.bufferPool.NewPooledBuffer()
e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer)
if buf.Buffer.Len() == 0 {
return nil
}

return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil)
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
}
64 changes: 30 additions & 34 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -77,12 +76,12 @@ var resourceAttrsToPreserve = map[string]bool{
var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode")

type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error)
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, *bytes.Buffer) error
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, *bytes.Buffer) error
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer)
hashDataPoint(dataPoint) uint32
encodeDocument(objmodel.Document) ([]byte, error)
encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error)
encodeDocument(objmodel.Document, *bytes.Buffer) error
encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error)
}

// encodeModel tries to keep the event as close to the original open telemetry semantics as is.
Expand Down Expand Up @@ -113,24 +112,22 @@ const (
attributeField = "attribute"
)

func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error {
var document objmodel.Document
switch m.mode {
case MappingECS:
document = m.encodeLogECSMode(resource, record, scope)
case MappingOTel:
return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record)
return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record, buf)
case MappingBodyMap:
return m.encodeLogBodyMapMode(record)
return m.encodeLogBodyMapMode(record, buf)
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
return buf.Bytes(), err
return document.Serialize(buf, m.dedot, m.mode == MappingOTel)
}

func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document {
Expand All @@ -154,13 +151,14 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
return document
}

func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) {
func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord, buf *bytes.Buffer) error {
body := record.Body()
if body.Type() != pcommon.ValueTypeMap {
return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type())
return fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type())
}

return jsoniter.Marshal(body.Map().AsRaw())
serializeMap(body.Map(), buf)
return nil
}

func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document {
Expand Down Expand Up @@ -205,16 +203,15 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
return document
}

func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) {
func (m *encodeModel) encodeDocument(document objmodel.Document, buf *bytes.Buffer) error {
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
err := document.Serialize(buf, m.dedot, m.mode == MappingOTel)
if err != nil {
return nil, err
return err
}
return buf.Bytes(), nil
return nil
}

// upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index
Expand All @@ -228,7 +225,7 @@ func (m *encodeModel) hashDataPoint(dp dataPoint) uint32 {
}
}

func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error) {
func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) {
dp0 := dataPoints[0]
var document objmodel.Document
encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve)
Expand All @@ -243,17 +240,17 @@ func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoi
}
document.AddAttribute(dp.Metric().Name(), value)
}
docBytes, err := m.encodeDocument(document)
err := m.encodeDocument(document, buf)

return docBytes, document.DynamicTemplates(), err
return document.DynamicTemplates(), err
}

func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error) ([]byte, map[string]string, error) {
func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) {
switch m.mode {
case MappingOTel:
return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors)
return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, buf)
default:
return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors)
return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors, buf)
}
}

Expand Down Expand Up @@ -489,19 +486,18 @@ func (dp numberDataPoint) Metric() pmetric.Metric {

var errInvalidNumberDataPoint = errors.New("invalid number data point")

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error {
var document objmodel.Document
switch m.mode {
case MappingOTel:
return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span)
return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span, buf)
default:
document = m.encodeSpanDefaultMode(resource, span, scope)
}
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
return buf.Bytes(), err
err := document.Serialize(buf, m.dedot, m.mode == MappingOTel)
return err
}

func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document {
Expand All @@ -524,13 +520,13 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra
return document
}

func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) {
if m.mode != MappingOTel {
// Currently span events are stored separately only in OTel mapping mode.
// In other modes, they are stored within the span document.
return nil, nil
return
}
return serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent)
serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, buf)
}

func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) {
Expand Down
Loading

0 comments on commit 5e523c5

Please sign in to comment.