From cbd1a38bafb47c0f5be131ccf0a0741f2d2ae878 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Alvarez=20Pi=C3=B1eiro?= <95703246+emilioalvap@users.noreply.github.com> Date: Tue, 7 Mar 2023 17:33:12 +0100 Subject: [PATCH] [Heartbeat] Fix reload policy hash consistency, browser monitor cleanup and semaphore release (#34697) * Fix reload policy hash consistency, browser monitor cleanup and semaphore release * Update x-pack/heartbeat/monitors/browser/project.go * Add changelog * Update heartbeat/scheduler/schedjob.go --------- Co-authored-by: Andrew Cholakian --- CHANGELOG.next.asciidoc | 2 ++ heartbeat/scheduler/schedjob.go | 10 ++++------ x-pack/heartbeat/cmd/root.go | 13 ++++++++++++- x-pack/heartbeat/monitors/browser/project.go | 14 +++++++++++++- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e8daa5248fd1..ec7ee6e4809c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -101,6 +101,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702] - Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723] +- Fix integration hashing to prevent reloading all when updated. {pull}34697[34697] +- Fix release of job limit semaphore when context is cancelled. {pull}34697[34697] - Fix bug where states.duration_ms was incorrect type. {pull}33563[33563] - Fix handling of long UDP messages in UDP input. {issue}33836[33836] {pull}33837[33837] - Fix browser monitor summary reporting as up when monitor is down. {issue}33374[33374] {pull}33819[33819] diff --git a/heartbeat/scheduler/schedjob.go b/heartbeat/scheduler/schedjob.go index 6123a039cccd..2a8172b95641 100644 --- a/heartbeat/scheduler/schedjob.go +++ b/heartbeat/scheduler/schedjob.go @@ -62,7 +62,10 @@ func (sj *schedJob) run() (startedAt time.Time) { sj.activeTasks.Inc() if sj.jobLimitSem != nil { err := sj.jobLimitSem.Acquire(sj.ctx, 1) - if err != nil { + // Defer release only if acquired + if err == nil { + defer sj.jobLimitSem.Release(1) + } else { logp.L().Errorf("could not acquire semaphore: %w", err) } } @@ -117,11 +120,6 @@ func (sj *schedJob) runTask(task TaskFunc) time.Time { // irrelevant go sj.runTask(cont) } - // There is always at least 1 task (the current one), if that's all, then we know - // there are no other jobs active or pending, and we can release the jobLimitSem - if sj.jobLimitSem != nil && sj.activeTasks.Load() == 1 { - sj.jobLimitSem.Release(1) - } } return startedAt diff --git a/x-pack/heartbeat/cmd/root.go b/x-pack/heartbeat/cmd/root.go index 6cf566ddc052..32e73104a01f 100644 --- a/x-pack/heartbeat/cmd/root.go +++ b/x-pack/heartbeat/cmd/root.go @@ -23,13 +23,24 @@ var RootCmd *cmd.BeatsRootCmd // heartbeatCfg is a callback registered via SetTransform that returns a Elastic Agent client.Unit // configuration generated from a raw Elastic Agent config func heartbeatCfg(rawIn *proto.UnitExpectedConfig, _ *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { - configList, err := management.CreateReloadConfigFromInputs([]map[string]interface{}{rawIn.GetSource().AsMap()}) + configList, err := management.CreateReloadConfigFromInputs(TransformRawIn(rawIn)) if err != nil { return nil, fmt.Errorf("error creating reloader config: %w", err) } return configList, nil } +// TransformRawIn removes unwanted fields to keep consistent hashing on reload() +func TransformRawIn(rawIn *proto.UnitExpectedConfig) []map[string]interface{} { + rawInput := []map[string]interface{}{rawIn.GetSource().AsMap()} + + for _, p := range rawInput { + delete(p, "policy") + } + + return rawInput +} + func init() { management.ConfigTransform.SetTransform(heartbeatCfg) settings := heartbeatCmd.HeartbeatSettings() diff --git a/x-pack/heartbeat/monitors/browser/project.go b/x-pack/heartbeat/monitors/browser/project.go index b7b37f0bb20c..f43f767e0215 100644 --- a/x-pack/heartbeat/monitors/browser/project.go +++ b/x-pack/heartbeat/monitors/browser/project.go @@ -27,12 +27,20 @@ type JourneyLister func(ctx context.Context, projectPath string, params mapstr.M type Project struct { rawCfg *config.C projectCfg *Config + ctx context.Context + cancel context.CancelFunc } func NewProject(rawCfg *config.C) (*Project, error) { + // Global project context to cancel all jobs + // on close + ctx, cancel := context.WithCancel(context.Background()) + s := &Project{ rawCfg: rawCfg, projectCfg: DefaultConfig(), + ctx: ctx, + cancel: cancel, } err := rawCfg.Unpack(s.projectCfg) if err != nil { @@ -81,6 +89,9 @@ func (p *Project) Close() error { p.projectCfg.Source.ActiveMemo.Close() } + // Cancel running jobs ctxs + p.cancel() + return nil } @@ -127,8 +138,9 @@ func (p *Project) extraArgs() []string { func (p *Project) jobs() []jobs.Job { var j jobs.Job + isScript := p.projectCfg.Source.Inline != nil - ctx := context.WithValue(context.Background(), synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second) + ctx := context.WithValue(p.ctx, synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second) if isScript { src := p.projectCfg.Source.Inline.Script