Skip to content

Commit

Permalink
🐛 handle concurrency issue with updating ruleCTX with scopes (#770)
Browse files Browse the repository at this point in the history
* Adding context propagation so that rule workers when processing A rule
can attach their span to the correct parent

Signed-off-by: Shawn Hurley <shawn@hurley.page>
  • Loading branch information
shawn-hurley authored Feb 6, 2025
1 parent 80e9439 commit 175fd3f
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"sync/atomic"

"go.lsp.dev/uri"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"

"github.com/cbroglie/mustache"
"github.com/go-logr/logr"
Expand All @@ -31,11 +33,12 @@ type RuleEngine interface {
}

type ruleMessage struct {
rule Rule
ruleSetName string
ctx ConditionContext
scope Scope
returnChan chan response
rule Rule
ruleSetName string
conditionContext ConditionContext
scope Scope
returnChan chan response
carrier propagation.TextMapCarrier
}

type response struct {
Expand Down Expand Up @@ -125,18 +128,21 @@ func (r *ruleEngine) Stop() {
}

func processRuleWorker(ctx context.Context, ruleMessages chan ruleMessage, logger logr.Logger, wg *sync.WaitGroup) {
prop := otel.GetTextMapPropagator()
for {
select {
case m := <-ruleMessages:
logger.V(5).Info("taking rule", "ruleset", m.ruleSetName, "rule", m.rule.RuleID)
newLogger := logger.WithValues("ruleID", m.rule.RuleID)
//We createa new rule context for a every rule run, here we need to apply the scope
m.ctx.Template = make(map[string]ChainTemplate)
m.conditionContext.Template = make(map[string]ChainTemplate)
if m.scope != nil {
m.scope.AddToContext(&m.ctx)
m.scope.AddToContext(&m.conditionContext)
}
logger.Info("Adding Carrier span info to context")
ctx = prop.Extract(ctx, m.carrier)

bo, err := processRule(ctx, m.rule, m.ctx, newLogger)
bo, err := processRule(ctx, m.rule, m.conditionContext, newLogger)
logger.V(5).Info("finished rule", "found", len(bo.Incidents), "error", err, "rule", m.rule.RuleID)
m.returnChan <- response{
ConditionResponse: bo,
Expand Down Expand Up @@ -190,6 +196,9 @@ func (r *ruleEngine) RunRulesScoped(ctx context.Context, ruleSets []RuleSet, sco
}
r.logger.Info("added scopes to condition context", "scopes", scopes, "conditionContext", conditionContext)
}
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
r.logger.Info("inject span info", "carrier", carrier)
ctx, cancelFunc := context.WithCancel(ctx)

taggingRules, otherRules, mapRuleSets := r.filterRules(ruleSets, selectors...)
Expand Down Expand Up @@ -264,10 +273,13 @@ func (r *ruleEngine) RunRulesScoped(ctx context.Context, ruleSets []RuleSet, sco
}()

for _, rule := range otherRules {
newContext := ruleContext.Copy()
newContext.RuleID = rule.rule.RuleID
wg.Add(1)
rule.returnChan = ret
rule.ctx = ruleContext
rule.conditionContext = newContext
rule.scope = scopes
rule.carrier = carrier
r.ruleProcessing <- rule
}
r.logger.V(5).Info("All rules added buffer, waiting for engine to complete", "size", len(otherRules))
Expand Down Expand Up @@ -450,11 +462,9 @@ func processRule(ctx context.Context, rule Rule, ruleCtx ConditionContext, log l
ctx, span := tracing.StartNewSpan(
ctx, "process-rule", attribute.Key("rule").String(rule.RuleID))
defer span.End()
newContext := ruleCtx.Copy()
newContext.RuleID = rule.RuleID
// Here is what a worker should run when getting a rule.
// For now, lets not fan out the running of conditions.
return rule.When.Evaluate(ctx, log, newContext)
return rule.When.Evaluate(ctx, log, ruleCtx)

}

Expand Down

0 comments on commit 175fd3f

Please sign in to comment.