Skip to content

Commit

Permalink
fix(cascadingfilterprocessor): prevent overriding metrics in cascadin…
Browse files Browse the repository at this point in the history
…g filter processor - add processor tag
  • Loading branch information
dmolenda-sumo committed Apr 25, 2022
1 parent 2dcb2a4 commit c877137
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 26 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- chore(deps): bump golang from 1.18 to 1.18.1 [#546][#546]
- chore: bump OT core to v0.49.0 [#550][#550]

### Fixed

- fix(cascadingfilterprocessor): prevent overriding metrics in cascading filter processor - add processor tag [#539][#539]

[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.48.0-sumo-0...main
[#546]: https://github.com/SumoLogic/sumologic-otel-collector/pull/546
[#550]: https://github.com/SumoLogic/sumologic-otel-collector/pull/550
[#553]: https://github.com/SumoLogic/sumologic-otel-collector/pull/553
[#539]: https://github.com/SumoLogic/sumologic-otel-collector/pull/539

## [v0.48.0-sumo-0]

Expand Down
14 changes: 11 additions & 3 deletions pkg/processor/cascadingfilterprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
tagPolicyKey, _ = tag.NewKey("policy")
tagCascadingFilterDecisionKey, _ = tag.NewKey("cascading_filter_decision")
tagPolicyDecisionKey, _ = tag.NewKey("policy_decision")
tagProcessorKey, _ = tag.NewKey("processor")

statDecisionLatencyMicroSec = stats.Int64("policy_decision_latency", "Latency (in microseconds) of a given filtering policy", "µs")
statOverallDecisionLatencyus = stats.Int64("cascading_filtering_batch_processing_latency", "Latency (in microseconds) of each run of the cascading filter timer", "µs")
Expand Down Expand Up @@ -64,70 +65,77 @@ func CascadingFilterMetricViews(level configtelemetry.Level) []*view.View {
Name: statOverallDecisionLatencyus.Name(),
Measure: statOverallDecisionLatencyus,
Description: statOverallDecisionLatencyus.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: latencyDistributionAggregation,
}

traceRemovalAgeView := &view.View{
Name: statTraceRemovalAgeSec.Name(),
Measure: statTraceRemovalAgeSec,
Description: statTraceRemovalAgeSec.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: ageDistributionAggregation,
}

lateSpanArrivalView := &view.View{
Name: statLateSpanArrivalAfterDecision.Name(),
Measure: statLateSpanArrivalAfterDecision,
Description: statLateSpanArrivalAfterDecision.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: ageDistributionAggregation,
}

countPolicyEvaluationErrorView := &view.View{
Name: statPolicyEvaluationErrorCount.Name(),
Measure: statPolicyEvaluationErrorCount,
Description: statPolicyEvaluationErrorCount.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: view.Sum(),
}

countFinalDecisionView := &view.View{
Name: statCascadingFilterDecision.Name(),
Measure: statCascadingFilterDecision,
Description: statCascadingFilterDecision.Description(),
TagKeys: []tag.Key{tagPolicyKey, tagCascadingFilterDecisionKey},
TagKeys: []tag.Key{tagProcessorKey, tagPolicyKey, tagCascadingFilterDecisionKey},
Aggregation: view.Sum(),
}

countPolicyDecisionsView := &view.View{
Name: statPolicyDecision.Name(),
Measure: statPolicyDecision,
Description: statPolicyDecision.Description(),
TagKeys: []tag.Key{tagPolicyKey, tagPolicyDecisionKey},
TagKeys: []tag.Key{tagProcessorKey, tagPolicyKey, tagPolicyDecisionKey},
Aggregation: view.Sum(),
}

policyLatencyView := &view.View{
Name: statDecisionLatencyMicroSec.Name(),
Measure: statDecisionLatencyMicroSec,
Description: statDecisionLatencyMicroSec.Description(),
TagKeys: []tag.Key{tagPolicyKey},
TagKeys: []tag.Key{tagProcessorKey, tagPolicyKey},
Aggregation: view.Sum(),
}

countTraceDroppedTooEarlyView := &view.View{
Name: statDroppedTooEarlyCount.Name(),
Measure: statDroppedTooEarlyCount,
Description: statDroppedTooEarlyCount.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: view.Sum(),
}
countTraceIDArrivalView := &view.View{
Name: statNewTraceIDReceivedCount.Name(),
Measure: statNewTraceIDReceivedCount,
Description: statNewTraceIDReceivedCount.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: view.Sum(),
}
trackTracesOnMemorylView := &view.View{
Name: statTracesOnMemoryGauge.Name(),
Measure: statTracesOnMemoryGauge,
Description: statTracesOnMemoryGauge.Description(),
TagKeys: []tag.Key{tagProcessorKey},
Aggregation: view.LastValue(),
}

Expand Down
101 changes: 84 additions & 17 deletions pkg/processor/cascadingfilterprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"math"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,6 +68,7 @@ type traceKey [16]byte
type cascadingFilterSpanProcessor struct {
ctx context.Context
nextConsumer consumer.Traces
instanceName string
start sync.Once
maxNumTraces uint64
traceAcceptRules []*TraceAcceptEvaluator
Expand Down Expand Up @@ -227,10 +229,10 @@ func newCascadingFilterSpanProcessor(logger *zap.Logger, nextConsumer consumer.T
}

// Build the span procesor

cfsp := &cascadingFilterSpanProcessor{
ctx: ctx,
nextConsumer: nextConsumer,
instanceName: cfg.ProcessorSettings.ID().String(),
maxNumTraces: cfg.NumTraces,
maxSpansPerSecond: spansPerSecond,
logger: logger,
Expand Down Expand Up @@ -318,7 +320,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
}
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSampled)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagCascadingFilterDecisionKey, statusSampled),
},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
Expand All @@ -327,7 +332,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
} else {
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusExceededKey)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagCascadingFilterDecisionKey, statusExceededKey),
},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
Expand All @@ -340,7 +348,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
trace.FinalDecision = provisionalDecision
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusNotSampled)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagCascadingFilterDecisionKey, statusNotSampled),
},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
Expand All @@ -361,7 +372,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
if trace.FinalDecision == sampling.Sampled {
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceSampled)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceSampled),
},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
Expand All @@ -370,7 +384,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
} else {
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceExceeded)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceExceeded),
},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
Expand Down Expand Up @@ -411,11 +428,18 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
}
}

stats.Record(cfsp.ctx,
err := stats.RecordWithTags(cfsp.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statOverallDecisionLatencyus.M(int64(time.Since(startTime)/time.Microsecond)),
statDroppedTooEarlyCount.M(metrics.idNotFoundOnMapCount),
statPolicyEvaluationErrorCount.M(metrics.evaluateErrorCount),
statTracesOnMemoryGauge.M(int64(atomic.LoadUint64(&cfsp.numTracesOnMap))))
cfsp.logMetricsRecordErrorIfPresent(err, []string{
statOverallDecisionLatencyus.Name(),
statDroppedTooEarlyCount.Name(),
statPolicyEvaluationErrorCount.Name(),
statTracesOnMemoryGauge.Name(),
})

cfsp.logger.Debug("Sampling policy evaluation completed",
zap.Int("batch.len", batchLen),
Expand Down Expand Up @@ -467,7 +491,8 @@ func updateFilteringTag(traces pdata.Traces) {
func (cfsp *cascadingFilterSpanProcessor) shouldBeDropped(id pdata.TraceID, trace *sampling.TraceData) bool {
for _, dropRule := range cfsp.traceRejectRules {
if dropRule.Evaluator.ShouldDrop(id, trace) {
stats.Record(dropRule.ctx, statPolicyDecision.M(int64(1)))
err := stats.RecordWithTags(dropRule.ctx, []tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)}, statPolicyDecision.M(int64(1)))
cfsp.logMetricsRecordErrorIfPresent(err, []string{statPolicyDecision.Name()})
return true
}
}
Expand All @@ -485,9 +510,11 @@ func (cfsp *cascadingFilterSpanProcessor) makeProvisionalDecision(id pdata.Trace
for i, policy := range cfsp.traceAcceptRules {
policyEvaluateStartTime := time.Now()
decision := policy.Evaluator.Evaluate(id, trace)
stats.Record(
err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond)))
cfsp.logMetricsRecordErrorIfPresent(err, []string{statDecisionLatencyMicroSec.Name()})

trace.Decisions[i] = decision

Expand All @@ -503,7 +530,10 @@ func (cfsp *cascadingFilterSpanProcessor) makeProvisionalDecision(id pdata.Trace

err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusSampled)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagPolicyDecisionKey, statusSampled),
},
statPolicyDecision.M(int64(1)),
)
if err != nil {
Expand All @@ -518,7 +548,10 @@ func (cfsp *cascadingFilterSpanProcessor) makeProvisionalDecision(id pdata.Trace
}
err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusNotSampled)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagPolicyDecisionKey, statusNotSampled),
},
statPolicyDecision.M(int64(1)),
)
if err != nil {
Expand All @@ -531,7 +564,10 @@ func (cfsp *cascadingFilterSpanProcessor) makeProvisionalDecision(id pdata.Trace

err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusSecondChance)},
[]tag.Mutator{
tag.Insert(tagProcessorKey, cfsp.instanceName),
tag.Insert(tagPolicyDecisionKey, statusSecondChance),
},
statPolicyDecision.M(int64(1)),
)
if err != nil {
Expand Down Expand Up @@ -644,18 +680,38 @@ func (cfsp *cascadingFilterSpanProcessor) processTraces(ctx context.Context, res
cfsp.logger.Warn("Error sending late arrived spans to destination",
zap.Error(err))
}
stats.Record(cfsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)))
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)),
)
cfsp.logMetricsRecordErrorIfPresent(err, []string{statLateSpanArrivalAfterDecision.Name()})
case sampling.NotSampled:
stats.Record(cfsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)))
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)),
)
cfsp.logMetricsRecordErrorIfPresent(err, []string{statLateSpanArrivalAfterDecision.Name()})
case sampling.Dropped:
stats.Record(cfsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)))
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)),
)
cfsp.logMetricsRecordErrorIfPresent(err, []string{statLateSpanArrivalAfterDecision.Name()})
default:
cfsp.logger.Warn("Encountered unexpected sampling decision",
zap.Int("decision", int(finalDecision)))
}
}

stats.Record(cfsp.ctx, statNewTraceIDReceivedCount.M(newTraceIDs))
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statNewTraceIDReceivedCount.M(newTraceIDs),
)
cfsp.logMetricsRecordErrorIfPresent(err, []string{statNewTraceIDReceivedCount.Name()})
}

// func (cfsp *cascadingFilterSpanProcessor) GetCapabilities() component.ProcessorCapabilities {
Expand Down Expand Up @@ -689,7 +745,12 @@ func (cfsp *cascadingFilterSpanProcessor) dropTrace(traceID traceKey, deletionTi
return
}

stats.Record(cfsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)))
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagProcessorKey, cfsp.instanceName)},
statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)),
)
cfsp.logMetricsRecordErrorIfPresent(err, []string{statTraceRemovalAgeSec.Name()})
}

func prepareTraceBatch(rss pdata.ResourceSpans, spans []*pdata.Span) pdata.Traces {
Expand All @@ -704,6 +765,12 @@ func prepareTraceBatch(rss pdata.ResourceSpans, spans []*pdata.Span) pdata.Trace
return traceTd
}

func (cfsp *cascadingFilterSpanProcessor) logMetricsRecordErrorIfPresent(err error, metricsNames []string) {
if err != nil {
cfsp.logger.Error("failed to record the following metrics: "+strings.Join(metricsNames, ", "), zap.Error(err))
}
}

// tTicker interface allows easier testing of ticker related functionality used by cascadingfilterprocessor
type tTicker interface {
// Start sets the frequency of the ticker and starts the periodic calls to OnTick.
Expand Down
Loading

0 comments on commit c877137

Please sign in to comment.