From ec0725874313fe9e9bb64e4e4869afd28fd08cd7 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Tue, 17 Oct 2023 09:08:04 -0700 Subject: [PATCH] [fanoutconsumer] [chore] Do not wrap one read-only consumer (#8689) A follow-up optimization after merging https://github.com/open-telemetry/opentelemetry-collector/pull/8634. There is no need to create a fanout consumer for only one read-only consumer. This introduces a behavior that closely resembles its previous state, before the introduction of the readonly/mutable states. --- internal/fanoutconsumer/logs.go | 5 +++++ internal/fanoutconsumer/logs_test.go | 6 ++++++ internal/fanoutconsumer/metrics.go | 5 +++++ internal/fanoutconsumer/metrics_test.go | 6 ++++++ internal/fanoutconsumer/traces.go | 5 +++++ internal/fanoutconsumer/traces_test.go | 6 ++++++ 6 files changed, 33 insertions(+) diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index e0f9f88df4c..2d1e0336dc0 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -22,6 +22,11 @@ import ( // - Clones only to the consumer that needs to mutate the data. // - If all consumers needs to mutate the data one will get the original mutable data. func NewLogs(lcs []consumer.Logs) consumer.Logs { + // Don't wrap if there is only one non-mutating consumer. + if len(lcs) == 1 && !lcs[0].Capabilities().MutatesData { + return lcs[0] + } + lc := &logsConsumer{} for i := 0; i < len(lcs); i++ { if lcs[i].Capabilities().MutatesData { diff --git a/internal/fanoutconsumer/logs_test.go b/internal/fanoutconsumer/logs_test.go index 5ae01b6b423..c2cd5c42bd9 100644 --- a/internal/fanoutconsumer/logs_test.go +++ b/internal/fanoutconsumer/logs_test.go @@ -20,6 +20,12 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) +func TestLogsNotMultiplexing(t *testing.T) { + nop := consumertest.NewNop() + lfc := NewLogs([]consumer.Logs{nop}) + assert.Same(t, nop, lfc) +} + func TestLogsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.LogsSink) p2 := new(consumertest.LogsSink) diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index 13ea0efe0cf..ddc3761240b 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -20,6 +20,11 @@ import ( // - Clones only to the consumer that needs to mutate the data. // - If all consumers needs to mutate the data one will get the original mutable data. func NewMetrics(mcs []consumer.Metrics) consumer.Metrics { + // Don't wrap if there is only one non-mutating consumer. + if len(mcs) == 1 && !mcs[0].Capabilities().MutatesData { + return mcs[0] + } + mc := &metricsConsumer{} for i := 0; i < len(mcs); i++ { if mcs[i].Capabilities().MutatesData { diff --git a/internal/fanoutconsumer/metrics_test.go b/internal/fanoutconsumer/metrics_test.go index bbf86990944..a58a7480c92 100644 --- a/internal/fanoutconsumer/metrics_test.go +++ b/internal/fanoutconsumer/metrics_test.go @@ -20,6 +20,12 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) +func TestMetricsNotMultiplexing(t *testing.T) { + nop := consumertest.NewNop() + mfc := NewMetrics([]consumer.Metrics{nop}) + assert.Same(t, nop, mfc) +} + func TestMetricsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.MetricsSink) p2 := new(consumertest.MetricsSink) diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index bb6c30ae84e..c8d0871d0a2 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -20,6 +20,11 @@ import ( // - Clones only to the consumer that needs to mutate the data. // - If all consumers needs to mutate the data one will get the original mutable data. func NewTraces(tcs []consumer.Traces) consumer.Traces { + // Don't wrap if there is only one non-mutating consumer. + if len(tcs) == 1 && !tcs[0].Capabilities().MutatesData { + return tcs[0] + } + tc := &tracesConsumer{} for i := 0; i < len(tcs); i++ { if tcs[i].Capabilities().MutatesData { diff --git a/internal/fanoutconsumer/traces_test.go b/internal/fanoutconsumer/traces_test.go index ceda83ecebb..872f4986ef2 100644 --- a/internal/fanoutconsumer/traces_test.go +++ b/internal/fanoutconsumer/traces_test.go @@ -20,6 +20,12 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) +func TestTracesNotMultiplexing(t *testing.T) { + nop := consumertest.NewNop() + tfc := NewTraces([]consumer.Traces{nop}) + assert.Same(t, nop, tfc) +} + func TestTracesMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.TracesSink) p2 := new(consumertest.TracesSink)