diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cef48adba16..1fb0d2fcfe1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix Google workspace pagination and document ID generation. {pull}33666[33666] - Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830] - Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654] +- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968] *Heartbeat* - Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723] diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index f958b593b2e..7cae8bcf586 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -177,18 +177,21 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub log.Info("process repeated request") var waitUntil time.Time for { - // We have a special-case wait for when we have a zero limit. - // x/time/rate allow a burst through even when the limit is zero - // so in order to ensure that we don't try until we are out of - // purgatory we calculate how long we should wait according to - // the retry after for a 429 and rate limit headers if we have - // a zero rate quota. See handleResponse below. if wait := time.Until(waitUntil); wait > 0 { + // We have a special-case wait for when we have a zero limit. + // x/time/rate allow a burst through even when the limit is zero + // so in order to ensure that we don't try until we are out of + // purgatory we calculate how long we should wait according to + // the retry after for a 429 and rate limit headers if we have + // a zero rate quota. See handleResponse below. select { case <-ctx.Done(): return ctx.Err() case <-time.After(wait): } + } else if err = ctx.Err(); err != nil { + // Otherwise exit if we have been cancelled. + return err } // Process a set of event requests. @@ -846,15 +849,13 @@ func newProgram(src, root string, client *http.Client, limiter *rate.Limiter, pa func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) { out, _, err := prg.ContextEval(ctx, input) + if e := ctx.Err(); e != nil { + err = e + } if err != nil { input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)} return input, fmt.Errorf("failed eval: %w", err) } - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{})) if err != nil {