From 9932c8abdb4a1db4f37d6beddc444f7f7dd2b77f Mon Sep 17 00:00:00 2001 From: Kranti Date: Sun, 1 Aug 2021 21:37:20 +0530 Subject: [PATCH 1/5] 1306: Adding support for composite sampling policy to tailsampler --- processor/tailsamplingprocessor/README.md | 44 ++- .../tailsamplingprocessor/composite_helper.go | 74 ++++++ .../composite_helper_test.go | 82 ++++++ processor/tailsamplingprocessor/config.go | 37 +++ .../tailsamplingprocessor/config_test.go | 34 +++ .../internal/sampling/composite.go | 162 ++++++++++++ .../internal/sampling/composite_test.go | 250 ++++++++++++++++++ .../internal/sampling/time_provider.go | 34 +++ .../internal/sampling/time_provider_test.go | 26 ++ processor/tailsamplingprocessor/processor.go | 3 + .../testdata/tail_sampling_config.yaml | 37 +++ 11 files changed, 782 insertions(+), 1 deletion(-) create mode 100644 processor/tailsamplingprocessor/composite_helper.go create mode 100644 processor/tailsamplingprocessor/composite_helper_test.go create mode 100644 processor/tailsamplingprocessor/internal/sampling/composite.go create mode 100644 processor/tailsamplingprocessor/internal/sampling/composite_test.go create mode 100644 processor/tailsamplingprocessor/internal/sampling/time_provider.go create mode 100644 processor/tailsamplingprocessor/internal/sampling/time_provider_test.go diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index e4187a06a04e..4e076333af20 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -18,6 +18,11 @@ Multiple policies exist today and it is straight forward to add more. These incl - `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`) - `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported - `rate_limiting`: Sample based on rate +- `composite`: Sample based on a combination of above samplers, with ordering and rate allocation per sampler. Rate allocation allocates certain percentages of spans per policy order. + For example if we have set max_total_spans_per_second as 100 then we can set rate_allocation as follows + 1. test-composite-policy-1 = 50 % of max_total_spans_per_second = 50 spans_per_second + 2. test-composite-policy-1 = 25 % of max_total_spans_per_second = 25 spans_per_second + 3. To ensure remaining capacity is filled use always_sample as one of the policies The following configuration options can also be modified: - `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision @@ -67,7 +72,44 @@ processors: name: test-policy-7, type: rate_limiting, rate_limiting: {spans_per_second: 35} - } + }, + { + name: composite-policy-1, + type: composite, + composite: + { + max_total_spans_per_second: 1000, + policy_order: [test-composite-policy-1, test-composite-policy-2, test-composite-policy-3], + composite_sub_policy: + [ + { + name: test-composite-policy-1, + type: numeric_attribute, + numeric_attribute: {key: key1, min_value: 50, max_value: 100} + }, + { + name: test-composite-policy-2, + type: string_attribute, + string_attribute: {key: key2, values: [value1, value2]} + }, + { + name: test-composite-policy-3, + type: always_sample + } + ], + rate_allocation: + [ + { + policy: test-composite-policy-1, + percent: 50 + }, + { + policy: test-composite-policy-2, + percent: 25 + } + ] + } + }, ] ``` diff --git a/processor/tailsamplingprocessor/composite_helper.go b/processor/tailsamplingprocessor/composite_helper.go new file mode 100644 index 000000000000..f9d72ab30314 --- /dev/null +++ b/processor/tailsamplingprocessor/composite_helper.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tailsamplingprocessor + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" +) + +func getNewCompositePolicy(logger *zap.Logger, config CompositeCfg) (sampling.PolicyEvaluator, error) { + var subPolicyEvalParams []sampling.SubPolicyEvalParams + rateAllocationsMap := getRateAllocationMap(config) + for i := range config.SubPolicyCfg { + policyCfg := &config.SubPolicyCfg[i] + policy, _ := getSubPolicyEvaluator(logger, policyCfg) + + evalParams := sampling.SubPolicyEvalParams{ + Evaluator: policy, + MaxSpansPerSecond: int64(rateAllocationsMap[policyCfg.Name]), + } + subPolicyEvalParams = append(subPolicyEvalParams, evalParams) + } + return sampling.NewComposite(logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}), nil +} + +// Apply rate allocations to the sub-policies +func getRateAllocationMap(config CompositeCfg) map[string]float64 { + rateAllocationsMap := make(map[string]float64) + maxTotalSPS := float64(config.MaxTotalSpansPerSecond) + // Default SPS determined by equally diving number of sub policies + defaultSPS := maxTotalSPS / float64(len(config.SubPolicyCfg)) + for i := 0; i < len(config.RateAllocation); i++ { + rAlloc := &config.RateAllocation[i] + rateAllocationsMap[rAlloc.Policy] = defaultSPS + if rAlloc.Percent > 0 { + rateAllocationsMap[rAlloc.Policy] = (float64(rAlloc.Percent) / 100) * maxTotalSPS + } + } + return rateAllocationsMap +} + +// Return instance of composite sub-policy +func getSubPolicyEvaluator(logger *zap.Logger, cfg *SubPolicyCfg) (sampling.PolicyEvaluator, error) { + switch cfg.Type { + case AlwaysSample: + return sampling.NewAlwaysSample(logger), nil + case NumericAttribute: + nafCfg := cfg.NumericAttributeCfg + return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil + case StringAttribute: + safCfg := cfg.StringAttributeCfg + return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize), nil + case RateLimiting: + rlfCfg := cfg.RateLimitingCfg + return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil + default: + return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type) + } +} diff --git a/processor/tailsamplingprocessor/composite_helper_test.go b/processor/tailsamplingprocessor/composite_helper_test.go new file mode 100644 index 000000000000..46b5a9f27bde --- /dev/null +++ b/processor/tailsamplingprocessor/composite_helper_test.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tailsamplingprocessor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config" + "go.uber.org/zap" +) + +func TestCompositeHelper(t *testing.T) { + cfg := &Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + DecisionWait: 10 * time.Second, + NumTraces: 100, + ExpectedNewTracesPerSec: 10, + PolicyCfgs: []PolicyCfg{ + { + Name: "composite-policy-1", + Type: Composite, + CompositeCfg: CompositeCfg{ + MaxTotalSpansPerSecond: 1000, + PolicyOrder: []string{"test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3", "test-composite-policy-4", "test-composite-policy-5"}, + SubPolicyCfg: []SubPolicyCfg{ + { + Name: "test-composite-policy-1", + Type: NumericAttribute, + NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100}, + }, + { + Name: "test-composite-policy-2", + Type: StringAttribute, + StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}}, + }, + { + Name: "test-composite-policy-3", + Type: RateLimiting, + RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 10}, + }, + { + Name: "test-composite-policy-4", + Type: AlwaysSample, + }, + { + Name: "test-composite-policy-5", + }, + }, + RateAllocation: []RateAllocationCfg{ + { + Policy: "test-composite-policy-1", + Percent: 50, + }, + { + Policy: "test-composite-policy-2", + Percent: 25, + }, + }, + }, + }, + }, + } + rlfCfg := cfg.PolicyCfgs[0].CompositeCfg + composite, e := getNewCompositePolicy(zap.NewNop(), rlfCfg) + require.NotNil(t, composite) + require.Nil(t, e) + // TBD add more assertions +} diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 7b9311a4517a..5a4efbc3c0dc 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -38,8 +38,43 @@ const ( StringAttribute PolicyType = "string_attribute" // RateLimiting allows all traces until the specified limits are satisfied. RateLimiting PolicyType = "rate_limiting" + // Composite Composite allows defining composite policy + Composite PolicyType = "composite" ) +// SubPolicyCfg holds the common configuration to all policies under composite policy. +type SubPolicyCfg struct { + // Name given to the instance of the policy to make easy to identify it in metrics and logs. + Name string `mapstructure:"name"` + // Type of the policy this will be used to match the proper configuration of the policy. + Type PolicyType `mapstructure:"type"` + // Configs for numeric attribute filter sampling policy evaluator. + NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"` + // Configs for string attribute filter sampling policy evaluator. + StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"` + // Configs for rate limiting filter sampling policy evaluator. + RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"` + // Configs for latency filter sampling policy evaluator. + LatencyCfg LatencyCfg `mapstructure:"latency"` + // Configs for status code filter sampling policy evaluator. + StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"` +} + +// CompositeCfg holds the configurable settings to create a composite +// sampling policy evaluator. +type CompositeCfg struct { + MaxTotalSpansPerSecond int64 `mapstructure:"max_total_spans_per_second"` + PolicyOrder []string `mapstructure:"policy_order"` + SubPolicyCfg []SubPolicyCfg `mapstructure:"composite_sub_policy"` + RateAllocation []RateAllocationCfg `mapstructure:"rate_allocation"` +} + +// RateAllocationCfg used within composite policy +type RateAllocationCfg struct { + Policy string `mapstructure:"policy"` + Percent int64 `mapstructure:"percent"` +} + // PolicyCfg holds the common configuration to all policies. type PolicyCfg struct { // Name given to the instance of the policy to make easy to identify it in metrics and logs. @@ -56,6 +91,8 @@ type PolicyCfg struct { StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"` // Configs for rate limiting filter sampling policy evaluator. RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"` + // CompositeCfg for defining composite policy + CompositeCfg CompositeCfg `mapstructure:"composite"` } // LatencyCfg holds the configurable settings to create a latency filter sampling policy diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index fc596eff651d..a9301d62937b 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -73,6 +73,40 @@ func TestLoadConfig(t *testing.T) { Type: RateLimiting, RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35}, }, + { + Name: "composite-policy-1", + Type: Composite, + CompositeCfg: CompositeCfg{ + MaxTotalSpansPerSecond: 1000, + PolicyOrder: []string{"test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3"}, + SubPolicyCfg: []SubPolicyCfg{ + { + Name: "test-composite-policy-1", + Type: NumericAttribute, + NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100}, + }, + { + Name: "test-composite-policy-2", + Type: StringAttribute, + StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}}, + }, + { + Name: "test-composite-policy-3", + Type: AlwaysSample, + }, + }, + RateAllocation: []RateAllocationCfg{ + { + Policy: "test-composite-policy-1", + Percent: 50, + }, + { + Policy: "test-composite-policy-2", + Percent: 25, + }, + }, + }, + }, }, }) } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go new file mode 100644 index 000000000000..0a06da458d54 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -0,0 +1,162 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sampling + +import ( + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +type subpolicy struct { + // the subpolicy evaluator + evaluator PolicyEvaluator + + // spans per second allocated to each subpolicy + allocatedSPS int64 + + // spans per second that each subpolicy sampled in this period + sampledSPS int64 +} + +// Composite evaluator and its internal data +type Composite struct { + // the subpolicy evaluators + subpolicies []*subpolicy + + // maximum total spans per second that must be sampled + maxTotalSPS int64 + + // current unix timestamp second + currentSecond int64 + + // The time provider (can be different from clock for testing purposes) + timeProvider TimeProvider + + logger *zap.Logger +} + +var _ PolicyEvaluator = (*Composite)(nil) + +// SubPolicyEvalParams defines the evaluator and max rate for a sub-policy +type SubPolicyEvalParams struct { + Evaluator PolicyEvaluator + MaxSpansPerSecond int64 +} + +// NewComposite creates a policy evaluator that samples all subpolicies. +func NewComposite( + logger *zap.Logger, + maxTotalSpansPerSecond int64, + subPolicyParams []SubPolicyEvalParams, + timeProvider TimeProvider, +) PolicyEvaluator { + + subpolicies := []*subpolicy{} + + for i := 0; i < len(subPolicyParams); i++ { + sub := &subpolicy{} + sub.evaluator = subPolicyParams[i].Evaluator + sub.allocatedSPS = subPolicyParams[i].MaxSpansPerSecond + + // We are just starting, so there is no previous input, set it to 0 + sub.sampledSPS = 0 + + subpolicies = append(subpolicies, sub) + } + + return &Composite{ + maxTotalSPS: maxTotalSpansPerSecond, + subpolicies: subpolicies, + timeProvider: timeProvider, + logger: logger, + } +} + +// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived +// after the sampling decision was already taken for the trace. +// This gives the evaluator a chance to log any message/metrics and/or update any +// related internal state. +func (c *Composite) OnLateArrivingSpans(Decision, []*pdata.Span) error { + c.logger.Debug("Spans are arriving late, decision is already made!!!") + return nil +} + +// Evaluate looks at the trace data and returns a corresponding SamplingDecision. +func (c *Composite) Evaluate(traceID pdata.TraceID, trace *TraceData) (Decision, error) { + // Rate limiting works by counting spans that are sampled during each 1 second + // time period. Until the total number of spans during a particular second + // exceeds the allocated number of spans-per-second the traces are sampled, + // once the limit is exceeded the traces are no longer sampled. The counter + // restarts at the beginning of each second. + // Current counters and rate limits are kept separately for each subpolicy. + + currSecond := c.timeProvider.getCurSecond() + if c.currentSecond != currSecond { + // This is a new second + c.currentSecond = currSecond + // Reset counters + for i := range c.subpolicies { + c.subpolicies[i].sampledSPS = 0 + } + } + + for _, sub := range c.subpolicies { + decision, err := sub.evaluator.Evaluate(traceID, trace) + if err != nil { + return Unspecified, err + } + + if decision == Sampled { + // The subpolicy made a decision to Sample. Now we need to make our decision. + + // Calculate resulting SPS counter if we decide to sample this trace + spansInSecondIfSampled := sub.sampledSPS + trace.SpanCount + + // Check if the rate will be within the allocated bandwidth. + if spansInSecondIfSampled <= sub.allocatedSPS && spansInSecondIfSampled <= c.maxTotalSPS { + sub.sampledSPS = spansInSecondIfSampled + + // Let the sampling happen + return Sampled, nil + } + + // We exceeded the rate limit. Don't sample this trace. + // Note that we will continue evaluating new incoming traces against + // allocated SPS, we do not update sub.sampledSPS here in order to give + // chance to another smaller trace to be accepted later. + return NotSampled, nil + } + } + + return NotSampled, nil +} + +// OnDroppedSpans is called when the trace needs to be dropped, due to memory +// pressure, before the decision_wait time has been reached. +func (c *Composite) OnDroppedSpans(pdata.TraceID, *TraceData) (Decision, error) { + // Here we have a number of possible solutions: + // 1. Random sample traces based on maxTotalSPS. + // 2. Perform full composite sampling logic by calling Composite.Evaluate(), essentially + // using partial trace data for sampling. + // 3. Sample everything. + // + // It seems that #2 may be the best choice from end user perspective, but + // it is not certain and it is also additional performance penalty when we are + // already under a memory (and possibly CPU) pressure situation. + // + // For now we are playing safe and go with #3. Investigating alternate options + // should be a future task. + return Sampled, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go new file mode 100644 index 000000000000..19242b034daa --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -0,0 +1,250 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package sampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +type FakeTimeProvider struct { + second int64 +} + +func (f FakeTimeProvider) getCurSecond() int64 { + return f.second +} + +var traceID = pdata.NewTraceID([16]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x52, 0x96, 0x9A, 0x89, 0x55, 0x57, 0x1A, 0x3F}) + +func createTrace() *TraceData { + trace := &TraceData{SpanCount: 1} + return trace +} + +func TestCompositeEvaluatorNotSampled(t *testing.T) { + + // Create 2 policies which do not match any trace + n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) + n2 := NewNumericAttributeFilter(zap.NewNop(), "tag", 200, 300) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{}) + + trace := createTrace() + + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + // None of the numeric filters should match since input trace data does not contain + // the "tag", so the decision should be NotSampled. + expected := NotSampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } +} + +func TestCompositeEvaluatorSampled(t *testing.T) { + + // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. + n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) + n2 := NewAlwaysSample(zap.NewNop()) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{}) + + trace := createTrace() + + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + // The second policy is AlwaysSample, so the decision should be Sampled. + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } +} + +func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { + + // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. + n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) + n2 := NewAlwaysSample(zap.NewNop()) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{}) + + for i := 1; i <= 3; i++ { + trace := createTrace() + + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + // The second policy is AlwaysSample, so the decision should be Sampled. + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } +} + +func TestCompositeEvaluatorThrottling(t *testing.T) { + + // Create only one subpolicy, with 100% Sampled policy. + n1 := NewAlwaysSample(zap.NewNop()) + timeProvider := &FakeTimeProvider{second: 0} + const totalSPS = 100 + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider) + + trace := createTrace() + + // First totalSPS traces should be 100% Sampled + for i := 0; i < totalSPS; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } + + // Now we hit the rate limit, so subsequent evaluations should result in 100% NotSampled + for i := 0; i < totalSPS; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := NotSampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } + + // Let the time advance by one second. + timeProvider.second++ + + // Subsequent sampling should be Sampled again because it is a new second. + for i := 0; i < totalSPS; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } +} + +func TestOnLateArrivingSpans_Composite(t *testing.T) { + n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) + n2 := NewAlwaysSample(zap.NewNop()) + timeProvider := &FakeTimeProvider{second: 0} + const totalSPS = 100 + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) + e := c.OnLateArrivingSpans(Sampled, nil) + assert.Nil(t, e) +} + +func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { + + n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) + n2 := NewAlwaysSample(zap.NewNop()) + timeProvider := &FakeTimeProvider{second: 0} + const totalSPS = 100 + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) + + trace := createTrace() + + // We have 2 subpolicies, so each should initially get half the bandwidth + + // First totalSPS/2 should be Sampled until we hit the rate limit + for i := 0; i < totalSPS/2; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } + + // Now we hit the rate limit for second subpolicy, so subsequent evaluations should result in NotSampled + for i := 0; i < totalSPS/2; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := NotSampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } + + // Let the time advance by one second. + timeProvider.second++ + + // It is a new second, so we should start sampling again. + for i := 0; i < totalSPS/2; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } + + // Now let's hit the hard limit and exceed the total by a factor of 2 + for i := 0; i < 2*totalSPS; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := NotSampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } + + // Let the time advance by one second. + timeProvider.second++ + + // It is a new second, so we should start sampling again. + for i := 0; i < totalSPS/2; i++ { + decision, err := c.Evaluate(traceID, trace) + if err != nil { + t.Fatalf("Failed to evaluate composite policy: %v", err) + } + + expected := Sampled + if decision != expected { + t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) + } + } +} diff --git a/processor/tailsamplingprocessor/internal/sampling/time_provider.go b/processor/tailsamplingprocessor/internal/sampling/time_provider.go new file mode 100644 index 000000000000..04f0f7374163 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/time_provider.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package sampling + +import ( + "time" +) + +// TimeProvider allows to get current Unix second +type TimeProvider interface { + getCurSecond() int64 +} + +// MonotonicClock provides monotonic real clock-based current Unix second. +// Use it when creating a NewComposite which should measure sample rates +// against a realtime clock (this is almost always what you want to do, +// the exception is usually only automated testing where you may want +// to have fake clocks). +type MonotonicClock struct{} + +func (c MonotonicClock) getCurSecond() int64 { + return time.Now().Unix() +} diff --git a/processor/tailsamplingprocessor/internal/sampling/time_provider_test.go b/processor/tailsamplingprocessor/internal/sampling/time_provider_test.go new file mode 100644 index 000000000000..d6f9c4a98d26 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/time_provider_test.go @@ -0,0 +1,26 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTimeProvider(t *testing.T) { + clock := MonotonicClock{} + assert.Greater(t, clock.getCurSecond(), int64(0)) +} diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index d82e3d690af3..6d75d1d8ac3a 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -132,6 +132,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval case RateLimiting: rlfCfg := cfg.RateLimitingCfg return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil + case Composite: + rlfCfg := cfg.CompositeCfg + return getNewCompositePolicy(logger, rlfCfg) default: return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type) } diff --git a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml index 04e1787f907c..c177c2499ee9 100644 --- a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml +++ b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml @@ -40,6 +40,43 @@ processors: type: rate_limiting, rate_limiting: {spans_per_second: 35} }, + { + name: composite-policy-1, + type: composite, + composite: + { + max_total_spans_per_second: 1000, + policy_order: [ test-composite-policy-1, test-composite-policy-2, test-composite-policy-3 ], + composite_sub_policy: + [ + { + name: test-composite-policy-1, + type: numeric_attribute, + numeric_attribute: { key: key1, min_value: 50, max_value: 100 } + }, + { + name: test-composite-policy-2, + type: string_attribute, + string_attribute: { key: key2, values: [ value1, value2 ] } + }, + { + name: test-composite-policy-3, + type: always_sample + } + ], + rate_allocation: + [ + { + policy: test-composite-policy-1, + percent: 50 + }, + { + policy: test-composite-policy-2, + percent: 25 + } + ] + } + }, ] service: From 17535bf7a4e40a01364f65a4271c751c7894085a Mon Sep 17 00:00:00 2001 From: vikrambe Date: Sun, 22 Aug 2021 21:07:44 +0530 Subject: [PATCH 2/5] Fixing review comments --- processor/tailsamplingprocessor/README.md | 2 +- processor/tailsamplingprocessor/composite_helper.go | 11 +++++------ .../internal/sampling/composite_test.go | 8 ++++---- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index ea820f27faae..8b78be20ef00 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -22,7 +22,7 @@ Multiple policies exist today and it is straight forward to add more. These incl - `composite`: Sample based on a combination of above samplers, with ordering and rate allocation per sampler. Rate allocation allocates certain percentages of spans per policy order. For example if we have set max_total_spans_per_second as 100 then we can set rate_allocation as follows 1. test-composite-policy-1 = 50 % of max_total_spans_per_second = 50 spans_per_second - 2. test-composite-policy-1 = 25 % of max_total_spans_per_second = 25 spans_per_second + 2. test-composite-policy-2 = 25 % of max_total_spans_per_second = 25 spans_per_second 3. To ensure remaining capacity is filled use always_sample as one of the policies The following configuration options can also be modified: diff --git a/processor/tailsamplingprocessor/composite_helper.go b/processor/tailsamplingprocessor/composite_helper.go index f9d72ab30314..ae5a7f06aa4a 100644 --- a/processor/tailsamplingprocessor/composite_helper.go +++ b/processor/tailsamplingprocessor/composite_helper.go @@ -25,9 +25,8 @@ import ( func getNewCompositePolicy(logger *zap.Logger, config CompositeCfg) (sampling.PolicyEvaluator, error) { var subPolicyEvalParams []sampling.SubPolicyEvalParams rateAllocationsMap := getRateAllocationMap(config) - for i := range config.SubPolicyCfg { - policyCfg := &config.SubPolicyCfg[i] - policy, _ := getSubPolicyEvaluator(logger, policyCfg) + for _, policyCfg := range config.SubPolicyCfg { + policy, _ := getSubPolicyEvaluator(logger, &policyCfg) evalParams := sampling.SubPolicyEvalParams{ Evaluator: policy, @@ -44,11 +43,11 @@ func getRateAllocationMap(config CompositeCfg) map[string]float64 { maxTotalSPS := float64(config.MaxTotalSpansPerSecond) // Default SPS determined by equally diving number of sub policies defaultSPS := maxTotalSPS / float64(len(config.SubPolicyCfg)) - for i := 0; i < len(config.RateAllocation); i++ { - rAlloc := &config.RateAllocation[i] - rateAllocationsMap[rAlloc.Policy] = defaultSPS + for _, rAlloc := range config.RateAllocation{ if rAlloc.Percent > 0 { rateAllocationsMap[rAlloc.Policy] = (float64(rAlloc.Percent) / 100) * maxTotalSPS + }else { + rateAllocationsMap[rAlloc.Policy] = defaultSPS } } return rateAllocationsMap diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index 19242b034daa..8df9eb215dfe 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -107,7 +107,7 @@ func TestCompositeEvaluatorThrottling(t *testing.T) { // Create only one subpolicy, with 100% Sampled policy. n1 := NewAlwaysSample(zap.NewNop()) timeProvider := &FakeTimeProvider{second: 0} - const totalSPS = 100 + const totalSPS = 10 c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider) trace := createTrace() @@ -159,10 +159,10 @@ func TestOnLateArrivingSpans_Composite(t *testing.T) { n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) n2 := NewAlwaysSample(zap.NewNop()) timeProvider := &FakeTimeProvider{second: 0} - const totalSPS = 100 + const totalSPS = 10 c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) e := c.OnLateArrivingSpans(Sampled, nil) - assert.Nil(t, e) + assert.NoError(t, e) } func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { @@ -170,7 +170,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) n2 := NewAlwaysSample(zap.NewNop()) timeProvider := &FakeTimeProvider{second: 0} - const totalSPS = 100 + const totalSPS = 10 c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) trace := createTrace() From 39808057e9fd15e2fd717351d5e45b8f3229c24d Mon Sep 17 00:00:00 2001 From: vikrambe Date: Sun, 22 Aug 2021 21:07:44 +0530 Subject: [PATCH 3/5] Fixing review comments --- processor/tailsamplingprocessor/README.md | 2 +- processor/tailsamplingprocessor/composite_helper.go | 11 +++++------ .../internal/sampling/composite_test.go | 8 ++++---- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index ea820f27faae..8b78be20ef00 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -22,7 +22,7 @@ Multiple policies exist today and it is straight forward to add more. These incl - `composite`: Sample based on a combination of above samplers, with ordering and rate allocation per sampler. Rate allocation allocates certain percentages of spans per policy order. For example if we have set max_total_spans_per_second as 100 then we can set rate_allocation as follows 1. test-composite-policy-1 = 50 % of max_total_spans_per_second = 50 spans_per_second - 2. test-composite-policy-1 = 25 % of max_total_spans_per_second = 25 spans_per_second + 2. test-composite-policy-2 = 25 % of max_total_spans_per_second = 25 spans_per_second 3. To ensure remaining capacity is filled use always_sample as one of the policies The following configuration options can also be modified: diff --git a/processor/tailsamplingprocessor/composite_helper.go b/processor/tailsamplingprocessor/composite_helper.go index f9d72ab30314..ae5a7f06aa4a 100644 --- a/processor/tailsamplingprocessor/composite_helper.go +++ b/processor/tailsamplingprocessor/composite_helper.go @@ -25,9 +25,8 @@ import ( func getNewCompositePolicy(logger *zap.Logger, config CompositeCfg) (sampling.PolicyEvaluator, error) { var subPolicyEvalParams []sampling.SubPolicyEvalParams rateAllocationsMap := getRateAllocationMap(config) - for i := range config.SubPolicyCfg { - policyCfg := &config.SubPolicyCfg[i] - policy, _ := getSubPolicyEvaluator(logger, policyCfg) + for _, policyCfg := range config.SubPolicyCfg { + policy, _ := getSubPolicyEvaluator(logger, &policyCfg) evalParams := sampling.SubPolicyEvalParams{ Evaluator: policy, @@ -44,11 +43,11 @@ func getRateAllocationMap(config CompositeCfg) map[string]float64 { maxTotalSPS := float64(config.MaxTotalSpansPerSecond) // Default SPS determined by equally diving number of sub policies defaultSPS := maxTotalSPS / float64(len(config.SubPolicyCfg)) - for i := 0; i < len(config.RateAllocation); i++ { - rAlloc := &config.RateAllocation[i] - rateAllocationsMap[rAlloc.Policy] = defaultSPS + for _, rAlloc := range config.RateAllocation{ if rAlloc.Percent > 0 { rateAllocationsMap[rAlloc.Policy] = (float64(rAlloc.Percent) / 100) * maxTotalSPS + }else { + rateAllocationsMap[rAlloc.Policy] = defaultSPS } } return rateAllocationsMap diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index 19242b034daa..8df9eb215dfe 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -107,7 +107,7 @@ func TestCompositeEvaluatorThrottling(t *testing.T) { // Create only one subpolicy, with 100% Sampled policy. n1 := NewAlwaysSample(zap.NewNop()) timeProvider := &FakeTimeProvider{second: 0} - const totalSPS = 100 + const totalSPS = 10 c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider) trace := createTrace() @@ -159,10 +159,10 @@ func TestOnLateArrivingSpans_Composite(t *testing.T) { n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) n2 := NewAlwaysSample(zap.NewNop()) timeProvider := &FakeTimeProvider{second: 0} - const totalSPS = 100 + const totalSPS = 10 c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) e := c.OnLateArrivingSpans(Sampled, nil) - assert.Nil(t, e) + assert.NoError(t, e) } func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { @@ -170,7 +170,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) n2 := NewAlwaysSample(zap.NewNop()) timeProvider := &FakeTimeProvider{second: 0} - const totalSPS = 100 + const totalSPS = 10 c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) trace := createTrace() From 1794b75381ddb073d6cc7a5c9cbd479e9ae1c43d Mon Sep 17 00:00:00 2001 From: vikrambe Date: Sun, 22 Aug 2021 21:48:09 +0530 Subject: [PATCH 4/5] Fixing review comments --- .../composite_helper_test.go | 2 +- processor/tailsamplingprocessor/config.go | 4 +- .../internal/sampling/composite_test.go | 84 +++++-------------- 3 files changed, 26 insertions(+), 64 deletions(-) diff --git a/processor/tailsamplingprocessor/composite_helper_test.go b/processor/tailsamplingprocessor/composite_helper_test.go index 46b5a9f27bde..39da7770aa1b 100644 --- a/processor/tailsamplingprocessor/composite_helper_test.go +++ b/processor/tailsamplingprocessor/composite_helper_test.go @@ -77,6 +77,6 @@ func TestCompositeHelper(t *testing.T) { rlfCfg := cfg.PolicyCfgs[0].CompositeCfg composite, e := getNewCompositePolicy(zap.NewNop(), rlfCfg) require.NotNil(t, composite) - require.Nil(t, e) + require.NoError(t, e) // TBD add more assertions } diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 7f6a3088242d..920b45d3ea12 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -40,7 +40,7 @@ const ( StringAttribute PolicyType = "string_attribute" // RateLimiting allows all traces until the specified limits are satisfied. RateLimiting PolicyType = "rate_limiting" - // Composite Composite allows defining composite policy + // Composite allows defining a composite policy, combining the other policies in one Composite PolicyType = "composite" ) @@ -95,7 +95,7 @@ type PolicyCfg struct { StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"` // Configs for rate limiting filter sampling policy evaluator. RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"` - // CompositeCfg for defining composite policy + // Configs for defining composite policy CompositeCfg CompositeCfg `mapstructure:"composite"` } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index 8df9eb215dfe..821cbf752847 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -14,6 +14,7 @@ package sampling import ( + "github.com/stretchr/testify/require" "testing" "github.com/stretchr/testify/assert" @@ -46,16 +47,12 @@ func TestCompositeEvaluatorNotSampled(t *testing.T) { trace := createTrace() decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) // None of the numeric filters should match since input trace data does not contain // the "tag", so the decision should be NotSampled. expected := NotSampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } func TestCompositeEvaluatorSampled(t *testing.T) { @@ -68,15 +65,11 @@ func TestCompositeEvaluatorSampled(t *testing.T) { trace := createTrace() decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) // The second policy is AlwaysSample, so the decision should be Sampled. expected := Sampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { @@ -86,19 +79,16 @@ func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { n2 := NewAlwaysSample(zap.NewNop()) c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{}) - for i := 1; i <= 3; i++ { + for i := 1; i <= 10; i++ { trace := createTrace() decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + // The second policy is AlwaysSample, so the decision should be Sampled. expected := Sampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } } @@ -115,27 +105,19 @@ func TestCompositeEvaluatorThrottling(t *testing.T) { // First totalSPS traces should be 100% Sampled for i := 0; i < totalSPS; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := Sampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } // Now we hit the rate limit, so subsequent evaluations should result in 100% NotSampled for i := 0; i < totalSPS; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := NotSampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } // Let the time advance by one second. @@ -144,14 +126,10 @@ func TestCompositeEvaluatorThrottling(t *testing.T) { // Subsequent sampling should be Sampled again because it is a new second. for i := 0; i < totalSPS; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := Sampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } } @@ -180,9 +158,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { // First totalSPS/2 should be Sampled until we hit the rate limit for i := 0; i < totalSPS/2; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := Sampled if decision != expected { @@ -193,9 +169,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { // Now we hit the rate limit for second subpolicy, so subsequent evaluations should result in NotSampled for i := 0; i < totalSPS/2; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := NotSampled if decision != expected { @@ -209,27 +183,19 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { // It is a new second, so we should start sampling again. for i := 0; i < totalSPS/2; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := Sampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } // Now let's hit the hard limit and exceed the total by a factor of 2 for i := 0; i < 2*totalSPS; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := NotSampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } // Let the time advance by one second. @@ -238,13 +204,9 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { // It is a new second, so we should start sampling again. for i := 0; i < totalSPS/2; i++ { decision, err := c.Evaluate(traceID, trace) - if err != nil { - t.Fatalf("Failed to evaluate composite policy: %v", err) - } + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) expected := Sampled - if decision != expected { - t.Fatalf("Incorrect decision by composite policy evaluator: expected %v, actual %v", expected, decision) - } + assert.Equal(t, decision, expected) } } From 61df388e240065410bdab2a6b7e1b9842416b5f4 Mon Sep 17 00:00:00 2001 From: vikrambe Date: Sun, 29 Aug 2021 15:01:34 +0530 Subject: [PATCH 5/5] Adding additional testcases --- .../internal/sampling/composite_test.go | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index 821cbf752847..81cd390cb052 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -16,6 +16,7 @@ package sampling import ( "github.com/stretchr/testify/require" "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/model/pdata" @@ -37,6 +38,34 @@ func createTrace() *TraceData { return trace } +func newTraceID() pdata.TraceID { + r := [16]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x52, 0x96, 0x9A, 0x89, 0x55, 0x57, 0x1A, 0x3F} + return pdata.NewTraceID(r) +} + +func newTraceWithKV(traceId pdata.TraceID, key string, val int64) *TraceData { + var traceBatches []pdata.Traces + traces := pdata.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.InstrumentationLibrarySpans().AppendEmpty() + span := ils.Spans().AppendEmpty() + span.SetTraceID(traceId) + span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + span.SetStartTimestamp(pdata.NewTimestampFromTime( + time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC), + )) + span.SetEndTimestamp(pdata.NewTimestampFromTime( + time.Date(2020, 1, 1, 12, 0, 16, 0, time.UTC), + )) + span.Attributes().InsertInt(key, val) + + traceBatches = append(traceBatches, traces) + return &TraceData{ + ReceivedBatches: traceBatches, + SpanCount: 1, + } +} + func TestCompositeEvaluatorNotSampled(t *testing.T) { // Create 2 policies which do not match any trace @@ -72,6 +101,45 @@ func TestCompositeEvaluatorSampled(t *testing.T) { assert.Equal(t, decision, expected) } +func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) { + + timeProvider := &FakeTimeProvider{second: 0} + + // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. + n1 := NewNumericAttributeFilter(zap.NewNop(), "tag", 0, 100) + n2 := NewAlwaysSample(zap.NewNop()) + c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider) + + traceId := newTraceID() + trace := newTraceWithKV(traceId, "tag", int64(10)) + + decision, err := c.Evaluate(traceId, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The first policy is NewNumericAttributeFilter and trace tag matches criteria, so the decision should be Sampled. + expected := Sampled + assert.Equal(t, decision, expected) + + traceId = newTraceID() + trace = newTraceWithKV(traceId, "tag", int64(11)) + + decision, err = c.Evaluate(traceId, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The first policy is NewNumericAttributeFilter and trace tag matches criteria, so the decision should be Sampled. + expected = NotSampled + assert.Equal(t, decision, expected) + + traceId = newTraceID() + trace = newTraceWithKV(traceId, "tag", int64(1001)) + decision, err = c.Evaluate(traceId, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The first policy fails as the tag value is higher than the range set where as the second policy is AlwaysSample, so the decision should be Sampled. + expected = Sampled + assert.Equal(t, decision, expected) +} + func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. @@ -85,7 +153,6 @@ func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { decision, err := c.Evaluate(traceID, trace) require.NoError(t, err, "Failed to evaluate composite policy: %v", err) - // The second policy is AlwaysSample, so the decision should be Sampled. expected := Sampled assert.Equal(t, decision, expected)