Skip to content

Commit

Permalink
[pdata] Enable the pdata mutation safeguards in the fanout consumers
Browse files Browse the repository at this point in the history
This required introducing extra API to get the pdata mutability state:
- p[metric|trace|log].[Metrics|Traces|Logs].IsReadOnly()
  • Loading branch information
dmitryax committed Oct 7, 2023
1 parent 5361583 commit d6855ae
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 116 deletions.
26 changes: 26 additions & 0 deletions .chloggen/enable-mutation-assertions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: fanoutconsumer

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable runtime assertions to catch incorrect pdata mutations in the components claiming as non-mutating pdata.

# One or more tracking issues or pull requests related to the change
issues: [6794]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change introduces enables the runtime assertions to catch unintentional pdata mutations in components
that are claimed as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by
unrelated components, making it very difficult to troubleshoot.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 12 additions & 4 deletions .chloggen/pdata-mutation-assertions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ change_type: enhancement
component: pdata

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce runtime assertions to catch incorrect pdata mutations
note: Introduce API to control pdata mutability.

# One or more tracking issues or pull requests related to the change
issues: [6794]
Expand All @@ -14,6 +14,14 @@ issues: [6794]
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change introduces an option to enable runtime assertions to catch unintentional pdata mutations in components
that are claimed as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by
unrelated components, making it very difficult to troubleshoot.
This change introduces new API pdata methods to control the mutability:
- p[metric|trace|log].[Metrics|Traces|Logs].MarkReadOnly() - marks the pdata as read-only. Any subsequent
mutations will result in a panic.
- p[metric|trace|log].[Metrics|Traces|Logs].IsReadOnly() - returns true if the pdata is marked as read-only.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
58 changes: 27 additions & 31 deletions internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,22 @@ import (
// NewLogs wraps multiple log consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewLogs(lcs []consumer.Logs) consumer.Logs {
if len(lcs) == 1 {
// Don't wrap if no need to do it.
return lcs[0]
}
var pass []consumer.Logs
var clone []consumer.Logs
for i := 0; i < len(lcs)-1; i++ {
if !lcs[i].Capabilities().MutatesData {
pass = append(pass, lcs[i])
lc := &logsConsumer{}
for i := 0; i < len(lcs); i++ {
if lcs[i].Capabilities().MutatesData {
lc.mutable = append(lc.mutable, lcs[i])
} else {
clone = append(clone, lcs[i])
lc.readonly = append(lc.readonly, lcs[i])
}
}
// Give the original data to the last consumer if no other read-only consumer,
// otherwise put it in the right bucket. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
if len(pass) == 0 || !lcs[len(lcs)-1].Capabilities().MutatesData {
pass = append(pass, lcs[len(lcs)-1])
} else {
clone = append(clone, lcs[len(lcs)-1])
}
return &logsConsumer{pass: pass, clone: clone}
return lc
}

type logsConsumer struct {
pass []consumer.Logs
clone []consumer.Logs
mutable []consumer.Logs
readonly []consumer.Logs
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -59,17 +45,27 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error
// Initially pass to clone exporter to avoid the case where the optimization of sending
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, lc := range lsc.clone {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))

// Clone the data before sending to mutable consumers.
// The only exception is the last consumer which is allowed to mutate the data only if there are no
// other non-mutating consumers and the data is mutable. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
for i, lc := range lsc.mutable {
if i < len(lsc.mutable)-1 || ld.IsReadOnly() || len(lsc.readonly) > 0 {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))
} else {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
}
}
for _, lc := range lsc.pass {

// Send the data as is to read-only consumers.
for _, lc := range lsc.readonly {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
}

return errs
}

Expand Down
43 changes: 37 additions & 6 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

func TestLogsNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
lfc := NewLogs([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}

func TestLogsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.LogsSink)
p2 := new(consumertest.LogsSink)
Expand Down Expand Up @@ -93,6 +87,43 @@ func TestLogsMultiplexingMutating(t *testing.T) {
assert.EqualValues(t, ld, p3.AllLogs()[1])
}

func TestReadOnlyLogsMultiplexingMutating(t *testing.T) {
p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p2 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
ldOrig := testdata.GenerateLogs(1)
ld := testdata.GenerateLogs(1)
ld.MarkReadOnly()

for i := 0; i < 2; i++ {
err := lfc.ConsumeLogs(context.Background(), ld)
if err != nil {
t.Errorf("Wanted nil got error")
return
}
}

// All consumers should receive the cloned data.

assert.True(t, ld != p1.AllLogs()[0])
assert.True(t, ld != p1.AllLogs()[1])
assert.EqualValues(t, ldOrig, p1.AllLogs()[0])
assert.EqualValues(t, ldOrig, p1.AllLogs()[1])

assert.True(t, ld != p2.AllLogs()[0])
assert.True(t, ld != p2.AllLogs()[1])
assert.EqualValues(t, ldOrig, p2.AllLogs()[0])
assert.EqualValues(t, ldOrig, p2.AllLogs()[1])

assert.True(t, ld != p3.AllLogs()[0])
assert.True(t, ld != p3.AllLogs()[1])
assert.EqualValues(t, ldOrig, p3.AllLogs()[0])
assert.EqualValues(t, ldOrig, p3.AllLogs()[1])
}

func TestLogsMultiplexingMixLastMutating(t *testing.T) {
p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p2 := new(consumertest.LogsSink)
Expand Down
58 changes: 27 additions & 31 deletions internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,22 @@ import (
// NewMetrics wraps multiple metrics consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewMetrics(mcs []consumer.Metrics) consumer.Metrics {
if len(mcs) == 1 {
// Don't wrap if no need to do it.
return mcs[0]
}
var pass []consumer.Metrics
var clone []consumer.Metrics
for i := 0; i < len(mcs)-1; i++ {
if !mcs[i].Capabilities().MutatesData {
pass = append(pass, mcs[i])
mc := &metricsConsumer{}
for i := 0; i < len(mcs); i++ {
if mcs[i].Capabilities().MutatesData {
mc.mutable = append(mc.mutable, mcs[i])
} else {
clone = append(clone, mcs[i])
mc.readonly = append(mc.readonly, mcs[i])
}
}
// Give the original data to the last consumer if no other read-only consumer,
// otherwise put it in the right bucket. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
if len(pass) == 0 || !mcs[len(mcs)-1].Capabilities().MutatesData {
pass = append(pass, mcs[len(mcs)-1])
} else {
clone = append(clone, mcs[len(mcs)-1])
}
return &metricsConsumer{pass: pass, clone: clone}
return mc
}

type metricsConsumer struct {
pass []consumer.Metrics
clone []consumer.Metrics
mutable []consumer.Metrics
readonly []consumer.Metrics
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -57,17 +43,27 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error
// Initially pass to clone exporter to avoid the case where the optimization of sending
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, mc := range msc.clone {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))

// Clone the data before sending to mutable consumers.
// The only exception is the last consumer which is allowed to mutate the data only if there are no
// other non-mutating consumers and the data is mutable. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
for i, mc := range msc.mutable {
if i < len(msc.mutable)-1 || md.IsReadOnly() || len(msc.readonly) > 0 {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))
} else {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
}
}
for _, mc := range msc.pass {

// Send the data as is to read-only consumers.
for _, mc := range msc.readonly {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
}

return errs
}

Expand Down
43 changes: 37 additions & 6 deletions internal/fanoutconsumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestMetricsNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
mfc := NewMetrics([]consumer.Metrics{nop})
assert.Same(t, nop, mfc)
}

func TestMetricsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.MetricsSink)
p2 := new(consumertest.MetricsSink)
Expand Down Expand Up @@ -93,6 +87,43 @@ func TestMetricsMultiplexingMutating(t *testing.T) {
assert.EqualValues(t, md, p3.AllMetrics()[1])
}

func TestReadOnlyMetricsMultiplexingMixFirstMutating(t *testing.T) {
p1 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}
p2 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
mdOrig := testdata.GenerateMetrics(1)
md := testdata.GenerateMetrics(1)
md.MarkReadOnly()

for i := 0; i < 2; i++ {
err := mfc.ConsumeMetrics(context.Background(), md)
if err != nil {
t.Errorf("Wanted nil got error")
return
}
}

// All consumers should receive the cloned data.

assert.True(t, md != p1.AllMetrics()[0])
assert.True(t, md != p1.AllMetrics()[1])
assert.EqualValues(t, mdOrig, p1.AllMetrics()[0])
assert.EqualValues(t, mdOrig, p1.AllMetrics()[1])

assert.True(t, md != p2.AllMetrics()[0])
assert.True(t, md != p2.AllMetrics()[1])
assert.EqualValues(t, mdOrig, p2.AllMetrics()[0])
assert.EqualValues(t, mdOrig, p2.AllMetrics()[1])

assert.True(t, md != p3.AllMetrics()[0])
assert.True(t, md != p3.AllMetrics()[1])
assert.EqualValues(t, mdOrig, p3.AllMetrics()[0])
assert.EqualValues(t, mdOrig, p3.AllMetrics()[1])
}

func TestMetricsMultiplexingMixLastMutating(t *testing.T) {
p1 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}
p2 := new(consumertest.MetricsSink)
Expand Down
Loading

0 comments on commit d6855ae

Please sign in to comment.