Skip to content

Commit

Permalink
[prometheusremotewriteexporter] reduce allocations when serializing p…
Browse files Browse the repository at this point in the history
…rotobufs (open-telemetry#58)
  • Loading branch information
edma2 committed Sep 13, 2024
1 parent 0e2bea5 commit 5b202ff
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS
p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
}

type buffer struct {
protobuf *proto.Buffer
snappy []byte
}

// A reusable buffer pool for serializing protobufs and compressing them with Snappy.
var bufferPool = sync.Pool{
New: func() any {
return &buffer{
protobuf: proto.NewBuffer(nil),
snappy: nil,
}
},
}

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
Expand Down Expand Up @@ -271,14 +286,26 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
}

func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
buf := bufferPool.Get().(*buffer)
buf.protobuf.Reset()
defer bufferPool.Put(buf)

// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(writeReq)
errMarshal := buf.protobuf.Marshal(writeReq)
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
// Therefore we always let Snappy decide the size of the buffer.
compressedData := snappy.Encode(nil, data)
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes()))
if maxCompressedLen > len(buf.snappy) {
if cap(buf.snappy) < maxCompressedLen {
buf.snappy = make([]byte, maxCompressedLen)
} else {
buf.snappy = buf.snappy[:maxCompressedLen]
}
}
compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes())

// executeFunc can be used for backoff and non backoff scenarios.
executeFunc := func() error {
Expand Down

0 comments on commit 5b202ff

Please sign in to comment.