Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Fix reload policy hash consistency, browser monitor cleanup and semaphore release #34697

Merged
merged 8 commits into from
Mar 7, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
- Fix integration hashing to prevent reloading all when updated. {pull}34697[34697]
- Fix release of job limit semaphore when context is cancelled. {pull}34697[34697]
- Fix bug where states.duration_ms was incorrect type. {pull}33563[33563]
- Fix handling of long UDP messages in UDP input. {issue}33836[33836] {pull}33837[33837]
- Fix browser monitor summary reporting as up when monitor is down. {issue}33374[33374] {pull}33819[33819]
Expand Down
10 changes: 4 additions & 6 deletions heartbeat/scheduler/schedjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func (sj *schedJob) run() (startedAt time.Time) {
sj.activeTasks.Inc()
if sj.jobLimitSem != nil {
err := sj.jobLimitSem.Acquire(sj.ctx, 1)
if err != nil {
// Defer release only if acquired
if err == nil {
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
defer sj.jobLimitSem.Release(1)
} else {
logp.L().Errorf("could not acquire semaphore: %w", err)
}
}
Expand Down Expand Up @@ -117,11 +120,6 @@ func (sj *schedJob) runTask(task TaskFunc) time.Time {
// irrelevant
go sj.runTask(cont)
}
// There is always at least 1 task (the current one), if that's all, then we know
// there are no other jobs active or pending, and we can release the jobLimitSem
if sj.jobLimitSem != nil && sj.activeTasks.Load() == 1 {
sj.jobLimitSem.Release(1)
}
}

return startedAt
Expand Down
13 changes: 12 additions & 1 deletion x-pack/heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@ var RootCmd *cmd.BeatsRootCmd
// heartbeatCfg is a callback registered via SetTransform that returns a Elastic Agent client.Unit
// configuration generated from a raw Elastic Agent config
func heartbeatCfg(rawIn *proto.UnitExpectedConfig, _ *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
configList, err := management.CreateReloadConfigFromInputs([]map[string]interface{}{rawIn.GetSource().AsMap()})
configList, err := management.CreateReloadConfigFromInputs(TransformRawIn(rawIn))
if err != nil {
return nil, fmt.Errorf("error creating reloader config: %w", err)
}
return configList, nil
}

// TransformRawIn removes unwanted fields to keep consistent hashing on reload()
func TransformRawIn(rawIn *proto.UnitExpectedConfig) []map[string]interface{} {
rawInput := []map[string]interface{}{rawIn.GetSource().AsMap()}

for _, p := range rawInput {
delete(p, "policy")
}

return rawInput
}

func init() {
management.ConfigTransform.SetTransform(heartbeatCfg)
settings := heartbeatCmd.HeartbeatSettings()
Expand Down
14 changes: 13 additions & 1 deletion x-pack/heartbeat/monitors/browser/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ type JourneyLister func(ctx context.Context, projectPath string, params mapstr.M
type Project struct {
rawCfg *config.C
projectCfg *Config
ctx context.Context
cancel context.CancelFunc
}

func NewProject(rawCfg *config.C) (*Project, error) {
// Global project context to cancel all jobs
// on close
ctx, cancel := context.WithCancel(context.Background())

s := &Project{
rawCfg: rawCfg,
projectCfg: DefaultConfig(),
ctx: ctx,
cancel: cancel,
}
err := rawCfg.Unpack(s.projectCfg)
if err != nil {
Expand Down Expand Up @@ -81,6 +89,9 @@ func (p *Project) Close() error {
p.projectCfg.Source.ActiveMemo.Close()
}

// Cancel running jobs ctxs
p.cancel()

return nil
}

Expand Down Expand Up @@ -127,8 +138,9 @@ func (p *Project) extraArgs() []string {

func (p *Project) jobs() []jobs.Job {
var j jobs.Job

isScript := p.projectCfg.Source.Inline != nil
ctx := context.WithValue(context.Background(), synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second)
ctx := context.WithValue(p.ctx, synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second)

if isScript {
src := p.projectCfg.Source.Inline.Script
Expand Down