diff --git a/.chloggen/bounded_duration_sampler.yaml b/.chloggen/bounded_duration_sampler.yaml new file mode 100644 index 000000000000..1b2bb15c62bd --- /dev/null +++ b/.chloggen/bounded_duration_sampler.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: adds optional upper bound duration for sampling + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26115] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.gitignore b/.gitignore index fd937a38f1b4..52616038f614 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ integration-coverage.html go.work* +/result diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index c589e17d4ab5..e5bc9953e73a 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -29,7 +29,7 @@ The following configuration options are required: Multiple policies exist today and it is straight forward to add more. These include: - `always_sample`: Sample all traces -- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between. +- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between. Supplying no upper bound will result in a policy sampling anything greater than `threshold_ms`. - `numeric_attribute`: Sample based on number attributes (resource and record) - `probabilistic`: Sample a percentage of traces. Read [a comparison with the Probabilistic Sampling Processor](#probabilistic-sampling-processor-compared-to-the-tail-sampling-processor-with-the-probabilistic-policy). - `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`) @@ -77,7 +77,7 @@ processors: { name: test-policy-2, type: latency, - latency: {threshold_ms: 5000} + latency: {threshold_ms: 5000, upper_threshold_ms: 10000} }, { name: test-policy-3, diff --git a/processor/tailsamplingprocessor/and_helper_test.go b/processor/tailsamplingprocessor/and_helper_test.go index f1cbe8a1e587..e46237ed95ed 100644 --- a/processor/tailsamplingprocessor/and_helper_test.go +++ b/processor/tailsamplingprocessor/and_helper_test.go @@ -30,7 +30,7 @@ func TestAndHelper(t *testing.T) { require.NoError(t, err) expected := sampling.NewAnd(zap.NewNop(), []sampling.PolicyEvaluator{ - sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100), + sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), }) assert.Equal(t, expected, actual) }) diff --git a/processor/tailsamplingprocessor/composite_helper_test.go b/processor/tailsamplingprocessor/composite_helper_test.go index bc7d3cc2053e..578f3be289b9 100644 --- a/processor/tailsamplingprocessor/composite_helper_test.go +++ b/processor/tailsamplingprocessor/composite_helper_test.go @@ -50,11 +50,11 @@ func TestCompositeHelper(t *testing.T) { expected := sampling.NewComposite(zap.NewNop(), 1000, []sampling.SubPolicyEvalParams{ { - Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100), + Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), MaxSpansPerSecond: 250, }, { - Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200), + Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200, 0), MaxSpansPerSecond: 500, }, }, sampling.MonotonicClock{}) diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 91ae169fd015..008c72c72073 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -87,6 +87,7 @@ type AndSubPolicyCfg struct { sharedPolicyCfg `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct } +// TraceStateCfg holds the common configuration for trace states. type TraceStateCfg struct { // Tag that the filter is going to be matching against. Key string `mapstructure:"key"` @@ -94,6 +95,7 @@ type TraceStateCfg struct { Values []string `mapstructure:"values"` } +// AndCfg holds the common configuration to all and policies. type AndCfg struct { SubPolicyCfg []AndSubPolicyCfg `mapstructure:"and_sub_policy"` } @@ -126,8 +128,10 @@ type PolicyCfg struct { // LatencyCfg holds the configurable settings to create a latency filter sampling policy // evaluator type LatencyCfg struct { - // ThresholdMs in milliseconds. + // Lower bound in milliseconds. Retaining original name for compatibility ThresholdMs int64 `mapstructure:"threshold_ms"` + // Upper bound in milliseconds. + UpperThresholdmsMs int64 `mapstructure:"upper_threshold_ms"` } // NumericAttributeCfg holds the configurable settings to create a numeric attribute filter diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index 760b4e9f6768..be87f47165c9 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -13,17 +13,19 @@ import ( ) type latency struct { - logger *zap.Logger - thresholdMs int64 + logger *zap.Logger + thresholdMs int64 + upperThresholdMs int64 } var _ PolicyEvaluator = (*latency)(nil) // NewLatency creates a policy evaluator sampling traces with a duration higher than a configured threshold -func NewLatency(settings component.TelemetrySettings, thresholdMs int64) PolicyEvaluator { +func NewLatency(settings component.TelemetrySettings, thresholdMs int64, upperThresholdMs int64) PolicyEvaluator { return &latency{ - logger: settings.Logger, - thresholdMs: thresholdMs, + logger: settings.Logger, + thresholdMs: thresholdMs, + upperThresholdMs: upperThresholdMs, } } @@ -47,6 +49,9 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *Trac } duration := maxTime.AsTime().Sub(minTime.AsTime()) - return duration.Milliseconds() >= l.thresholdMs + if l.upperThresholdMs == 0 { + return duration.Milliseconds() >= l.thresholdMs + } + return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs) }), nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/latency_test.go b/processor/tailsamplingprocessor/internal/sampling/latency_test.go index 839feddcbf9d..b5a2537edf65 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency_test.go @@ -15,7 +15,7 @@ import ( ) func TestEvaluate_Latency(t *testing.T) { - filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000) + filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 0) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) now := time.Now() @@ -71,6 +71,93 @@ func TestEvaluate_Latency(t *testing.T) { } } +func TestEvaluate_Bounded_Latency(t *testing.T) { + filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 10000) + + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + now := time.Now() + + cases := []struct { + Desc string + Spans []spanWithTimeAndDuration + Decision Decision + }{ + { + "trace duration shorter than lower bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 4500 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration is equal to lower bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 5000 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration is within lower and upper bounds", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 5001 * time.Millisecond, + }, + }, + Sampled, + }, + { + "trace duration is above upper bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 10001 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration equals upper bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 10000 * time.Millisecond, + }, + }, + Sampled, + }, + { + "total trace duration is longer than threshold but every single span is shorter", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 3000 * time.Millisecond, + }, + { + StartTime: now.Add(2500 * time.Millisecond), + Duration: 3000 * time.Millisecond, + }, + }, + Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + decision, err := filter.Evaluate(context.Background(), traceID, newTraceWithSpans(c.Spans)) + + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + type spanWithTimeAndDuration struct { StartTime time.Time Duration time.Duration diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index ae1c566886b6..75d5b55559c3 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -155,7 +155,7 @@ func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedP return sampling.NewAlwaysSample(settings), nil case Latency: lfCfg := cfg.LatencyCfg - return sampling.NewLatency(settings, lfCfg.ThresholdMs), nil + return sampling.NewLatency(settings, lfCfg.ThresholdMs, lfCfg.UpperThresholdmsMs), nil case NumericAttribute: nafCfg := cfg.NumericAttributeCfg return sampling.NewNumericAttributeFilter(settings, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue, nafCfg.InvertMatch), nil