diff --git a/processor/tailsamplingprocessor/internal/sampling/duration.go b/processor/tailsamplingprocessor/internal/sampling/duration.go new file mode 100644 index 000000000000..d7c6ee82cdb2 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/duration.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +type duration struct { + logger *zap.Logger + lowerThresholdMs int64 + upperThresholdMs int64 +} + +var _ PolicyEvaluator = (*duration)(nil) + +// NewDuration creates a policy evaluator sampling traces with a duration higher than a configured threshold +func NewDuration(settings component.TelemetrySettings, lowerThresholdMs int64, upperThresholdMs int64) PolicyEvaluator { + return &duration{ + logger: settings.Logger, + lowerThresholdMs: lowerThresholdMs, + upperThresholdMs: upperThresholdMs, + } +} + +// Evaluate looks at the trace data and returns a corresponding SamplingDecision. +func (l *duration) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *TraceData) (Decision, error) { + l.logger.Debug("Evaluating spans in duration filter") + + traceData.Lock() + defer traceData.Unlock() + batches := traceData.ReceivedBatches + + var minTime pcommon.Timestamp + var maxTime pcommon.Timestamp + + return hasSpanWithCondition(batches, func(span ptrace.Span) bool { + if minTime == 0 || span.StartTimestamp() < minTime { + minTime = span.StartTimestamp() + } + if maxTime == 0 || span.EndTimestamp() > maxTime { + maxTime = span.EndTimestamp() + } + + duration := maxTime.AsTime().Sub(minTime.AsTime()) + return (l.lowerThresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs) + }), nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/duration_test.go b/processor/tailsamplingprocessor/internal/sampling/duration_test.go new file mode 100644 index 000000000000..f804b866fc6f --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/duration_test.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestEvaluate_Duration(t *testing.T) { + filter := NewDuration(componenttest.NewNopTelemetrySettings(), 5000, 10000) + + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + now := time.Now() + + cases := []struct { + Desc string + Spans []spanWithTimeAndBoundedDuration + Decision Decision + }{ + { + "trace duration shorter than lower bound", + []spanWithTimeAndBoundedDuration{ + { + StartTime: now, + Duration: 4500 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration is equal to lower bound", + []spanWithTimeAndBoundedDuration{ + { + StartTime: now, + Duration: 5000 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration is within lower and upper bounds", + []spanWithTimeAndBoundedDuration{ + { + StartTime: now, + Duration: 5001 * time.Millisecond, + }, + }, + Sampled, + }, + { + "trace duration is above upper bound", + []spanWithTimeAndBoundedDuration{ + { + StartTime: now, + Duration: 10001 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration equals upper bound", + []spanWithTimeAndBoundedDuration{ + { + StartTime: now, + Duration: 10000 * time.Millisecond, + }, + }, + Sampled, + }, + { + "total trace duration is longer than threshold but every single span is shorter", + []spanWithTimeAndBoundedDuration{ + { + StartTime: now, + Duration: 3000 * time.Millisecond, + }, + { + StartTime: now.Add(2500 * time.Millisecond), + Duration: 3000 * time.Millisecond, + }, + }, + Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + decision, err := filter.Evaluate(context.Background(), traceID, newTraceWithBoundedSpans(c.Spans)) + + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + +type spanWithTimeAndBoundedDuration struct { + StartTime time.Time + Duration time.Duration +} + +func newTraceWithBoundedSpans(spans []spanWithTimeAndBoundedDuration) *TraceData { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + for _, s := range spans { + span := ils.Spans().AppendEmpty() + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(s.StartTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(s.StartTime.Add(s.Duration))) + } + + return &TraceData{ + ReceivedBatches: traces, + } +}