From cc3938c852020cf169e2a2e22c90bd048c391161 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 13 Jun 2023 16:05:27 -0700 Subject: [PATCH 1/7] [exporter/splunkhec] apply multimetric metric merge for the whole batch --- .../splunkhec-exporter-multimetric-batch.yaml | 20 ++++++ exporter/splunkhecexporter/client.go | 65 ++++++++++++++++++- 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 .chloggen/splunkhec-exporter-multimetric-batch.yaml diff --git a/.chloggen/splunkhec-exporter-multimetric-batch.yaml b/.chloggen/splunkhec-exporter-multimetric-batch.yaml new file mode 100644 index 000000000000..6f7cea1e252e --- /dev/null +++ b/.chloggen/splunkhec-exporter-multimetric-batch.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23365] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 99e8fb01208a..8f9f05768365 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -254,6 +254,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS // Parsing metric record to Splunk event. events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) + tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) if c.config.UseMultiMetricFormat { merged, err := mergeEventsToMultiMetricFormat(events) @@ -298,6 +299,62 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS return iterState{done: true}, permanentErrors } +func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) { + var permanentErrors []error + jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) + defer jsonStreamPool.Put(jsonStream) + + var events []*splunk.Event + + for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + for j := is.library; j < rm.ScopeMetrics().Len(); j++ { + is.library = 0 // Reset library index for next resource. + sm := rm.ScopeMetrics().At(j) + for k := is.record; k < sm.Metrics().Len(); k++ { + is.record = 0 // Reset record index for next library. + metric := sm.Metrics().At(k) + + // Parsing metric record to Splunk event. + events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) + } + } + } + + merged, err := mergeEventsToMultiMetricFormat(events) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error merging events: %w", err))) + } + + tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) + for _, event := range merged { + // JSON encoding event and writing to buffer. + b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) + if jsonErr != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) + continue + } + tempBuf.Write(b) + } + + // Continue adding events to buffer up to capacity. + b := tempBuf.Bytes() + _, err = buf.Write(b) + if err == nil { + return iterState{done: true}, permanentErrors + } + if errors.Is(err, errOverCapacity) { + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) + } else { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) + } + return iterState{done: true}, permanentErrors +} + func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) @@ -356,7 +413,13 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric for !is.done { buf.Reset() - latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, buf, is) + var latestIterState iterState + var batchPermanentErrors []error + if c.config.UseMultiMetricFormat { + latestIterState, batchPermanentErrors = c.fillMetricsBufferMultiMetrics(md, buf, is) + } else { + latestIterState, batchPermanentErrors = c.fillMetricsBuffer(md, buf, is) + } permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { From c30679aa26b6da04e95e4edfba6cc335b74ad78a Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 13 Jun 2023 17:23:26 -0700 Subject: [PATCH 2/7] with benchmarks --- exporter/splunkhecexporter/client.go | 128 ++++++++-------- exporter/splunkhecexporter/client_test.go | 144 +++++++++++++++++- .../splunkhecexporter/metricdata_to_splunk.go | 3 +- 3 files changed, 206 insertions(+), 69 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 8f9f05768365..0b57d93c3354 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -4,7 +4,6 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" import ( - "bytes" "context" "errors" "fmt" @@ -243,56 +242,48 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) - for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { + var events []*splunk.Event + + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) - for j := is.library; j < rm.ScopeMetrics().Len(); j++ { - is.library = 0 // Reset library index for next resource. + for j := 0; j < rm.ScopeMetrics().Len(); j++ { sm := rm.ScopeMetrics().At(j) - for k := is.record; k < sm.Metrics().Len(); k++ { - is.record = 0 // Reset record index for next library. + for k := 0; k < sm.Metrics().Len(); k++ { metric := sm.Metrics().At(k) // Parsing metric record to Splunk event. - events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) - - tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) - if c.config.UseMultiMetricFormat { - merged, err := mergeEventsToMultiMetricFormat(events) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error merging events: %w", err))) - } else { - events = merged - } - } - for _, event := range events { - // JSON encoding event and writing to buffer. - b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) - continue - } - tempBuf.Write(b) - } + events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) + } + } + } - // Continue adding events to buffer up to capacity. - b := tempBuf.Bytes() - _, err := buf.Write(b) - if err == nil { - continue - } - if errors.Is(err, errOverCapacity) { - if !buf.Empty() { - return iterState{i, j, k, false}, permanentErrors - } - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) - return iterState{i, j, k + 1, false}, permanentErrors - } - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error writing the event: %w", err))) + for i := is.record; i < len(events); i++ { + event := events[i] + // JSON encoding event and writing to buffer. + b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) + if jsonErr != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) + continue + } + _, err := buf.Write(b) + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { + return iterState{ + record: i, + done: false, + }, permanentErrors } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) + return iterState{ + record: i, + done: false, + }, permanentErrors + } else if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) + } } @@ -306,13 +297,11 @@ func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buff var events []*splunk.Event - for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) - for j := is.library; j < rm.ScopeMetrics().Len(); j++ { - is.library = 0 // Reset library index for next resource. + for j := 0; j < rm.ScopeMetrics().Len(); j++ { sm := rm.ScopeMetrics().At(j) - for k := is.record; k < sm.Metrics().Len(); k++ { - is.record = 0 // Reset record index for next library. + for k := 0; k < sm.Metrics().Len(); k++ { metric := sm.Metrics().At(k) // Parsing metric record to Splunk event. @@ -327,31 +316,36 @@ func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buff "error merging events: %w", err))) } - tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) - for _, event := range merged { + for i := is.record; i < len(merged); i++ { + event := merged[i] // JSON encoding event and writing to buffer. b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) if jsonErr != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) continue } - tempBuf.Write(b) - } + _, err = buf.Write(b) + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { + return iterState{ + record: i, + done: false, + }, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) + return iterState{ + record: i, + done: false, + }, permanentErrors + } else if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) - // Continue adding events to buffer up to capacity. - b := tempBuf.Bytes() - _, err = buf.Write(b) - if err == nil { - return iterState{done: true}, permanentErrors - } - if errors.Is(err, errOverCapacity) { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error writing the event: %w", err))) + } } + return iterState{done: true}, permanentErrors } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 708cea958542..bee77c1a4ab4 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -85,7 +85,7 @@ func createMetricsData(resourcesNum, dataPointsNum int) pmetric.Metrics { tsUnix := time.Unix(int64(count), int64(count)*time.Millisecond.Nanoseconds()) ilm := rm.ScopeMetrics().AppendEmpty() metric := ilm.Metrics().AppendEmpty() - metric.SetName("gauge_double_with_dims") + metric.SetName(fmt.Sprintf("gauge_double_with_dims_%d", j)) doublePt := metric.SetEmptyGauge().DataPoints().AppendEmpty() doublePt.SetTimestamp(pcommon.NewTimestampFromTime(tsUnix)) doublePt.SetDoubleValue(doubleVal) @@ -1609,6 +1609,148 @@ func benchPushLogData(b *testing.B, numResources int, numRecords int, bufSize ui } } +// 10 resources, 10 records, 1Kb max HEC batch: 17 HEC batches +func Benchmark_pushMetricData_10_10_1024(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, false, false) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_10_10_8K(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, false, false) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_10_2M(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, false, false) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_200_2M(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, false, false) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_100_200_2M(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, false, false) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_100_200_5M(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, false, false) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_compressed_10_10_1024(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, true, false) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 1 HEC batche +func Benchmark_pushMetricData_compressed_10_10_8K(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, true, false) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_10_2M(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, true, false) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_200_2M(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, true, false) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_100_200_2M(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, true, false) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_compressed_100_200_5M(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, true, false) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 17 HEC batches +func Benchmark_pushMetricData_10_10_1024_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, false, true) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_10_10_8K_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, false, true) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_10_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, false, true) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, false, true) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_100_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, false, true) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_100_200_5M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, false, true) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_compressed_10_10_1024_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, true, true) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 1 HEC batche +func Benchmark_pushMetricData_compressed_10_10_8K_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, true, true) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_10_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, true, true) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, true, true) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_100_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, true, true) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_compressed_100_200_5M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, true, true) +} + +func benchPushMetricData(b *testing.B, numResources int, numRecords int, bufSize uint, compressionEnabled bool, useMultiMetricFormat bool) { + config := NewFactory().CreateDefaultConfig().(*Config) + config.MaxContentLengthMetrics = bufSize + config.DisableCompression = !compressionEnabled + config.UseMultiMetricFormat = useMultiMetricFormat + c := newLogsClient(exportertest.NewNopCreateSettings(), config) + c.hecWorker = &mockHecWorker{} + exp, err := exporterhelper.NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), config, + c.pushMetricsData) + require.NoError(b, err) + + metrics := createMetricsData(numResources, numRecords) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := exp.ConsumeMetrics(context.Background(), metrics) + require.NoError(b, err) + } +} + func BenchmarkConsumeLogsRejected(b *testing.B) { config := NewFactory().CreateDefaultConfig().(*Config) config.DisableCompression = true diff --git a/exporter/splunkhecexporter/metricdata_to_splunk.go b/exporter/splunkhecexporter/metricdata_to_splunk.go index 09b557107dcf..01bf25e9b274 100644 --- a/exporter/splunkhecexporter/metricdata_to_splunk.go +++ b/exporter/splunkhecexporter/metricdata_to_splunk.go @@ -278,10 +278,11 @@ func mergeEventsToMultiMetricFormat(events []*splunk.Event) ([]*splunk.Event, er hashes := map[uint32]*splunk.Event{} hasher := fnv.New32a() var merged []*splunk.Event + marshaler := jsoniter.ConfigCompatibleWithStandardLibrary for _, e := range events { cloned := copyEventWithoutValues(e) - marshaler := jsoniter.ConfigCompatibleWithStandardLibrary + data, err := marshaler.Marshal(cloned) if err != nil { return nil, err From fcee839d8205f358b063ca60d783221ec6aa40e7 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 13 Jun 2023 17:59:32 -0700 Subject: [PATCH 3/7] do not use subMetrics since we're not tracking pmetric data in iterState anymore --- exporter/splunkhecexporter/client.go | 45 +------------------ .../testdata/hec_metric_event.json | 2 +- 2 files changed, 2 insertions(+), 45 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 0b57d93c3354..e11a1c84d4b7 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -417,7 +417,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { - return consumererror.NewMetrics(err, subMetrics(md, is)) + return consumererror.NewMetrics(err, md) } } is = latestIterState @@ -500,49 +500,6 @@ func subLogs(src plog.Logs, state iterState) plog.Logs { return dst } -// subMetrics returns a subset of metrics starting from the state. -func subMetrics(src pmetric.Metrics, state iterState) pmetric.Metrics { - if state.empty() { - return src - } - - dst := pmetric.NewMetrics() - resources := src.ResourceMetrics() - resourcesSub := dst.ResourceMetrics() - - for i := state.resource; i < resources.Len(); i++ { - newSub := resourcesSub.AppendEmpty() - resources.At(i).Resource().CopyTo(newSub.Resource()) - - libraries := resources.At(i).ScopeMetrics() - librariesSub := newSub.ScopeMetrics() - - j := 0 - if i == state.resource { - j = state.library - } - for ; j < libraries.Len(); j++ { - lib := libraries.At(j) - - newLibSub := librariesSub.AppendEmpty() - lib.Scope().CopyTo(newLibSub.Scope()) - - metrics := lib.Metrics() - metricsSub := newLibSub.Metrics() - - k := 0 - if i == state.resource && j == state.library { - k = state.record - } - for ; k < metrics.Len(); k++ { - metrics.At(k).CopyTo(metricsSub.AppendEmpty()) - } - } - } - - return dst -} - func subTraces(src ptrace.Traces, state iterState) ptrace.Traces { if state.empty() { return src diff --git a/exporter/splunkhecexporter/testdata/hec_metric_event.json b/exporter/splunkhecexporter/testdata/hec_metric_event.json index 20858fa9fe33..d9b0a0652a6c 100644 --- a/exporter/splunkhecexporter/testdata/hec_metric_event.json +++ b/exporter/splunkhecexporter/testdata/hec_metric_event.json @@ -5,7 +5,7 @@ "k/n1": "vn1", "k/r0": "vr0", "k/r1": "vr1", - "metric_name:gauge_double_with_dims": 1234.5678, + "metric_name:gauge_double_with_dims_0": 1234.5678, "metric_type": "Gauge", "k0": "v0", "k1": "v1", From 610a58d9a5b5bdbb4f168d32d6fa4fd4eb12fdff Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 13 Jun 2023 18:11:23 -0700 Subject: [PATCH 4/7] do not retry on event too large or last item --- exporter/splunkhecexporter/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index e11a1c84d4b7..4f0369a88719 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -277,8 +277,8 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{ - record: i, - done: false, + record: i + 1, + done: i+1 != len(events), }, permanentErrors } else if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( @@ -336,8 +336,8 @@ func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buff fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{ - record: i, - done: false, + record: i + 1, + done: i+1 != len(merged), }, permanentErrors } else if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( From 6f7bff9bda59563441d81ed733c076a62665077a Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 13 Jun 2023 18:34:04 -0700 Subject: [PATCH 5/7] add a bit more coverage --- exporter/splunkhecexporter/client_test.go | 145 +++++++++++++++------- 1 file changed, 100 insertions(+), 45 deletions(-) diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index bee77c1a4ab4..0f9ee2bde5a9 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -207,7 +207,7 @@ func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(c.statusCode) } -func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum int, t *testing.T) ([]receivedRequest, error) { +func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum int, useMultiMetricsFormat bool, t *testing.T) ([]receivedRequest, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -216,6 +216,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i factory := NewFactory() cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" + cfg.UseMultiMetricFormat = useMultiMetricsFormat rr := make(chan receivedRequest) capture := CapturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression} @@ -249,7 +250,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i return requests, nil } case <-time.After(5 * time.Second): - if len(requests) == 0 { + if len(requests) == 0 && expectedBatchesNum != 0 { err = errors.New("timeout") } return requests, err @@ -955,7 +956,7 @@ func TestReceiveMetricEvent(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true - actual, err := runMetricsExport(cfg, metrics, 1, t) + actual, err := runMetricsExport(cfg, metrics, 1, false, t) assert.Len(t, actual, 1) assert.NoError(t, err) @@ -993,7 +994,7 @@ func TestReceiveMetrics(t *testing.T) { md := createMetricsData(1, 3) cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true - actual, err := runMetricsExport(cfg, md, 1, t) + actual, err := runMetricsExport(cfg, md, 1, false, t) assert.Len(t, actual, 1) assert.NoError(t, err) msg := string(actual[0].body) @@ -1031,6 +1032,27 @@ func TestReceiveBatchedMetrics(t *testing.T) { numBatches: 1, }, }, + { + name: "one metric event too large to fit in a batch", + metrics: func() pmetric.Metrics { + m := pmetric.NewMetrics() + metric := m.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + g := metric.SetEmptyGauge() + g.DataPoints().AppendEmpty().SetIntValue(32) + metric.SetName(repeatableString(256)) + return m + }(), + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.MaxContentLengthMetrics = 20 + cfg.DisableCompression = true + return cfg + }(), + want: wantType{ + batches: [][]string{}, + numBatches: 0, + }, + }, { name: "1 metric event per payload (configured max content length is same as event size)", metrics: createMetricsData(1, 4), @@ -1130,55 +1152,65 @@ func TestReceiveBatchedMetrics(t *testing.T) { } for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - got, err := runMetricsExport(test.conf, test.metrics, test.want.numBatches, t) - - require.NoError(t, err) - require.Len(t, got, test.want.numBatches) + testFn := func(multiMetric bool) func(*testing.T) { + return func(t *testing.T) { + got, err := runMetricsExport(test.conf, test.metrics, test.want.numBatches, multiMetric, t) - for i, batch := range test.want.batches { - require.NotZero(t, got[i]) - if test.conf.MaxContentLengthMetrics != 0 { - require.True(t, int(test.conf.MaxContentLengthMetrics) > len(got[i].body)) - } - if test.want.compressed { - validateCompressedContains(t, batch, got[i].body) - } else { - found := false + require.NoError(t, err) + require.Len(t, got, test.want.numBatches) - for _, expected := range batch { - if strings.Contains(string(got[i].body), expected) { - found = true - break + for i, batch := range test.want.batches { + require.NotZero(t, got[i]) + if test.conf.MaxContentLengthMetrics != 0 { + require.True(t, int(test.conf.MaxContentLengthMetrics) > len(got[i].body)) + } + if test.want.compressed { + validateCompressedContains(t, batch, got[i].body) + } else { + found := false + + for _, expected := range batch { + if strings.Contains(string(got[i].body), expected) { + found = true + break + } } + assert.True(t, found, "%s did not match any expected batch", string(got[i].body)) } - assert.True(t, found, "%s did not match any expected batch", string(got[i].body)) } - } - // ensure all events are sent out - for i := 1; i < test.metrics.MetricCount(); i++ { - eventFound := false - for _, batch := range got { - batchBody := batch.body - if test.want.compressed { - z, err := gzip.NewReader(bytes.NewReader(batchBody)) - require.NoError(t, err) - batchBody, err = io.ReadAll(z) - z.Close() - require.NoError(t, err) - } - time := float64(i) + 0.001*float64(i) - if strings.Contains(string(batchBody), fmt.Sprintf(`"time":%g`, time)) { - if eventFound { - t.Errorf("metric event %d found in multiple batches", i) + if test.want.numBatches == 0 { + assert.Equal(t, 0, len(got)) + return + } + + // ensure all events are sent out + for i := 1; i < test.metrics.MetricCount(); i++ { + eventFound := false + for _, batch := range got { + batchBody := batch.body + if test.want.compressed { + z, err := gzip.NewReader(bytes.NewReader(batchBody)) + require.NoError(t, err) + batchBody, err = io.ReadAll(z) + z.Close() + require.NoError(t, err) + } + time := float64(i) + 0.001*float64(i) + if strings.Contains(string(batchBody), fmt.Sprintf(`"time":%g`, time)) { + if eventFound { + t.Errorf("metric event %d found in multiple batches", i) + } + eventFound = true } - eventFound = true } + assert.Truef(t, eventFound, "metric event %d not found in any batch", i) } - assert.Truef(t, eventFound, "metric event %d not found in any batch", i) } - }) + } + t.Run(test.name, testFn(false)) + t.Run(test.name+"_MultiMetric", testFn(true)) + } } @@ -1198,6 +1230,23 @@ func Test_PushMetricsData_Histogram_NaN_Sum(t *testing.T) { assert.NoError(t, permanentErrors) } +func Test_PushMetricsData_Histogram_NaN_Sum_MultiMetric(t *testing.T) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + ilm := rm.ScopeMetrics().AppendEmpty() + histogram := ilm.Metrics().AppendEmpty() + histogram.SetName("histogram_with_empty_sum") + dp := histogram.SetEmptyHistogram().DataPoints().AppendEmpty() + dp.SetSum(math.NaN()) + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.UseMultiMetricFormat = true + c := newMetricsClient(exportertest.NewNopCreateSettings(), cfg) + c.hecWorker = &mockHecWorker{} + + permanentErrors := c.pushMetricsDataInBatches(context.Background(), metrics, map[string]string{}) + assert.NoError(t, permanentErrors) +} + func Test_PushMetricsData_Summary_NaN_Sum(t *testing.T) { metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() @@ -1217,7 +1266,7 @@ func Test_PushMetricsData_Summary_NaN_Sum(t *testing.T) { func TestReceiveMetricsWithCompression(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 1800 - request, err := runMetricsExport(cfg, createMetricsData(1, 100), 1, t) + request, err := runMetricsExport(cfg, createMetricsData(1, 100), 1, false, t) assert.NoError(t, err) assert.Equal(t, "gzip", request[0].headers.Get("Content-Encoding")) assert.NotEqual(t, "", request) @@ -1280,7 +1329,13 @@ func TestInvalidLogs(t *testing.T) { func TestInvalidMetrics(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) - _, err := runMetricsExport(cfg, pmetric.NewMetrics(), 1, t) + _, err := runMetricsExport(cfg, pmetric.NewMetrics(), 1, false, t) + assert.Error(t, err) +} + +func TestInvalidMetricsMultiMetric(t *testing.T) { + cfg := NewFactory().CreateDefaultConfig().(*Config) + _, err := runMetricsExport(cfg, pmetric.NewMetrics(), 1, true, t) assert.Error(t, err) } From e066ad73e11b147fc6345d359d7e50c6bae5c2b7 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 13 Jun 2023 21:38:46 -0700 Subject: [PATCH 6/7] code review changes --- exporter/splunkhecexporter/client.go | 203 ++++++++++++++++++--------- 1 file changed, 135 insertions(+), 68 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 4f0369a88719..23ca22d46408 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -4,6 +4,7 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" import ( + "bytes" "context" "errors" "fmt" @@ -96,6 +97,9 @@ func (c *client) pushMetricsData( } } + if c.config.UseMultiMetricFormat { + return c.pushMultiMetricsDataInBatches(ctx, md, localHeaders) + } return c.pushMetricsDataInBatches(ctx, md, localHeaders) } @@ -242,89 +246,66 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) - var events []*splunk.Event - - for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) - for j := 0; j < rm.ScopeMetrics().Len(); j++ { + for j := is.library; j < rm.ScopeMetrics().Len(); j++ { + is.library = 0 // Reset library index for next resource. sm := rm.ScopeMetrics().At(j) - for k := 0; k < sm.Metrics().Len(); k++ { + for k := is.record; k < sm.Metrics().Len(); k++ { + is.record = 0 // Reset record index for next library. metric := sm.Metrics().At(k) // Parsing metric record to Splunk event. - events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) - } - } - } + events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) + tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) + for _, event := range events { + // JSON encoding event and writing to buffer. + b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) + continue + } + tempBuf.Write(b) + } - for i := is.record; i < len(events); i++ { - event := events[i] - // JSON encoding event and writing to buffer. - b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) - if jsonErr != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) - continue - } - _, err := buf.Write(b) - if errors.Is(err, errOverCapacity) { - if !buf.Empty() { - return iterState{ - record: i, - done: false, - }, permanentErrors + // Continue adding events to buffer up to capacity. + b := tempBuf.Bytes() + _, err := buf.Write(b) + if err == nil { + continue + } + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) + return iterState{i, j, k + 1, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) } - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) - return iterState{ - record: i + 1, - done: i+1 != len(events), - }, permanentErrors - } else if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error writing the event: %w", err))) - } } return iterState{done: true}, permanentErrors } -func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) { +func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) defer jsonStreamPool.Put(jsonStream) - var events []*splunk.Event - - for i := 0; i < metrics.ResourceMetrics().Len(); i++ { - rm := metrics.ResourceMetrics().At(i) - for j := 0; j < rm.ScopeMetrics().Len(); j++ { - sm := rm.ScopeMetrics().At(j) - for k := 0; k < sm.Metrics().Len(); k++ { - metric := sm.Metrics().At(k) - - // Parsing metric record to Splunk event. - events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) - } - } - } - - merged, err := mergeEventsToMultiMetricFormat(events) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error merging events: %w", err))) - } - - for i := is.record; i < len(merged); i++ { - event := merged[i] + for i := is.record; i < len(events); i++ { + event := events[i] // JSON encoding event and writing to buffer. b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) if jsonErr != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) continue } - _, err = buf.Write(b) + _, err := buf.Write(b) if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{ @@ -337,12 +318,11 @@ func (c *client) fillMetricsBufferMultiMetrics(metrics pmetric.Metrics, buf buff " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{ record: i + 1, - done: i+1 != len(merged), + done: i+1 != len(events), }, permanentErrors } else if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) - } } @@ -396,6 +376,53 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState return iterState{done: true}, permanentErrors } +// pushMultiMetricsDataInBatches sends batches of Splunk multi-metric events in JSON format. +// The batch content length is restricted to MaxContentLengthMetrics. +// md metrics are parsed to Splunk events. +func (c *client) pushMultiMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { + buf := c.bufferPool.get() + defer c.bufferPool.put(buf) + is := iterState{} + + var permanentErrors []error + var events []*splunk.Event + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + for k := 0; k < sm.Metrics().Len(); k++ { + metric := sm.Metrics().At(k) + + // Parsing metric record to Splunk event. + events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) + } + } + } + + merged, err := mergeEventsToMultiMetricFormat(events) + if err != nil { + return consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err)) + } + + for !is.done { + buf.Reset() + var latestIterState iterState + var batchPermanentErrors []error + + latestIterState, batchPermanentErrors = c.fillMetricsBufferMultiMetrics(merged, buf, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if !buf.Empty() { + if err := c.postEvents(ctx, buf, headers); err != nil { + return consumererror.NewMetrics(err, md) + } + } + + is = latestIterState + } + + return multierr.Combine(permanentErrors...) +} + // pushMetricsDataInBatches sends batches of Splunk events in JSON format. // The batch content length is restricted to MaxContentLengthMetrics. // md metrics are parsed to Splunk events. @@ -409,17 +436,14 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric buf.Reset() var latestIterState iterState var batchPermanentErrors []error - if c.config.UseMultiMetricFormat { - latestIterState, batchPermanentErrors = c.fillMetricsBufferMultiMetrics(md, buf, is) - } else { - latestIterState, batchPermanentErrors = c.fillMetricsBuffer(md, buf, is) - } + latestIterState, batchPermanentErrors = c.fillMetricsBuffer(md, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { - return consumererror.NewMetrics(err, md) + return consumererror.NewMetrics(err, subMetrics(md, is)) } } + is = latestIterState } @@ -500,6 +524,49 @@ func subLogs(src plog.Logs, state iterState) plog.Logs { return dst } +// subMetrics returns a subset of metrics starting from the state. +func subMetrics(src pmetric.Metrics, state iterState) pmetric.Metrics { + if state.empty() { + return src + } + + dst := pmetric.NewMetrics() + resources := src.ResourceMetrics() + resourcesSub := dst.ResourceMetrics() + + for i := state.resource; i < resources.Len(); i++ { + newSub := resourcesSub.AppendEmpty() + resources.At(i).Resource().CopyTo(newSub.Resource()) + + libraries := resources.At(i).ScopeMetrics() + librariesSub := newSub.ScopeMetrics() + + j := 0 + if i == state.resource { + j = state.library + } + for ; j < libraries.Len(); j++ { + lib := libraries.At(j) + + newLibSub := librariesSub.AppendEmpty() + lib.Scope().CopyTo(newLibSub.Scope()) + + metrics := lib.Metrics() + metricsSub := newLibSub.Metrics() + + k := 0 + if i == state.resource && j == state.library { + k = state.record + } + for ; k < metrics.Len(); k++ { + metrics.At(k).CopyTo(metricsSub.AppendEmpty()) + } + } + } + + return dst +} + func subTraces(src ptrace.Traces, state iterState) ptrace.Traces { if state.empty() { return src From 87ec0af012387e1a5cc80d630f779f1839cc0c1e Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 14 Jun 2023 00:09:37 -0700 Subject: [PATCH 7/7] code review --- exporter/splunkhecexporter/client.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 23ca22d46408..d14f725931e9 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -406,10 +406,8 @@ func (c *client) pushMultiMetricsDataInBatches(ctx context.Context, md pmetric.M for !is.done { buf.Reset() - var latestIterState iterState - var batchPermanentErrors []error - latestIterState, batchPermanentErrors = c.fillMetricsBufferMultiMetrics(merged, buf, is) + latestIterState, batchPermanentErrors := c.fillMetricsBufferMultiMetrics(merged, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { @@ -434,9 +432,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric for !is.done { buf.Reset() - var latestIterState iterState - var batchPermanentErrors []error - latestIterState, batchPermanentErrors = c.fillMetricsBuffer(md, buf, is) + latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil {