Skip to content

Commit

Permalink
[Heartbeat] Fix reload policy hash consistency, browser monitor clean…
Browse files Browse the repository at this point in the history
…up and semaphore release (#34697)

* Fix reload policy hash consistency, browser monitor cleanup and semaphore release

* Update x-pack/heartbeat/monitors/browser/project.go

* Add changelog

* Update heartbeat/scheduler/schedjob.go

---------

Co-authored-by: Andrew Cholakian <andrewvc@elastic.co>
  • Loading branch information
emilioalvap and andrewvc authored Mar 7, 2023
1 parent a8ab071 commit cbd1a38
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
- Fix integration hashing to prevent reloading all when updated. {pull}34697[34697]
- Fix release of job limit semaphore when context is cancelled. {pull}34697[34697]
- Fix bug where states.duration_ms was incorrect type. {pull}33563[33563]
- Fix handling of long UDP messages in UDP input. {issue}33836[33836] {pull}33837[33837]
- Fix browser monitor summary reporting as up when monitor is down. {issue}33374[33374] {pull}33819[33819]
Expand Down
10 changes: 4 additions & 6 deletions heartbeat/scheduler/schedjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func (sj *schedJob) run() (startedAt time.Time) {
sj.activeTasks.Inc()
if sj.jobLimitSem != nil {
err := sj.jobLimitSem.Acquire(sj.ctx, 1)
if err != nil {
// Defer release only if acquired
if err == nil {
defer sj.jobLimitSem.Release(1)
} else {
logp.L().Errorf("could not acquire semaphore: %w", err)
}
}
Expand Down Expand Up @@ -117,11 +120,6 @@ func (sj *schedJob) runTask(task TaskFunc) time.Time {
// irrelevant
go sj.runTask(cont)
}
// There is always at least 1 task (the current one), if that's all, then we know
// there are no other jobs active or pending, and we can release the jobLimitSem
if sj.jobLimitSem != nil && sj.activeTasks.Load() == 1 {
sj.jobLimitSem.Release(1)
}
}

return startedAt
Expand Down
13 changes: 12 additions & 1 deletion x-pack/heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@ var RootCmd *cmd.BeatsRootCmd
// heartbeatCfg is a callback registered via SetTransform that returns a Elastic Agent client.Unit
// configuration generated from a raw Elastic Agent config
func heartbeatCfg(rawIn *proto.UnitExpectedConfig, _ *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
configList, err := management.CreateReloadConfigFromInputs([]map[string]interface{}{rawIn.GetSource().AsMap()})
configList, err := management.CreateReloadConfigFromInputs(TransformRawIn(rawIn))
if err != nil {
return nil, fmt.Errorf("error creating reloader config: %w", err)
}
return configList, nil
}

// TransformRawIn removes unwanted fields to keep consistent hashing on reload()
func TransformRawIn(rawIn *proto.UnitExpectedConfig) []map[string]interface{} {
rawInput := []map[string]interface{}{rawIn.GetSource().AsMap()}

for _, p := range rawInput {
delete(p, "policy")
}

return rawInput
}

func init() {
management.ConfigTransform.SetTransform(heartbeatCfg)
settings := heartbeatCmd.HeartbeatSettings()
Expand Down
14 changes: 13 additions & 1 deletion x-pack/heartbeat/monitors/browser/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ type JourneyLister func(ctx context.Context, projectPath string, params mapstr.M
type Project struct {
rawCfg *config.C
projectCfg *Config
ctx context.Context
cancel context.CancelFunc
}

func NewProject(rawCfg *config.C) (*Project, error) {
// Global project context to cancel all jobs
// on close
ctx, cancel := context.WithCancel(context.Background())

s := &Project{
rawCfg: rawCfg,
projectCfg: DefaultConfig(),
ctx: ctx,
cancel: cancel,
}
err := rawCfg.Unpack(s.projectCfg)
if err != nil {
Expand Down Expand Up @@ -81,6 +89,9 @@ func (p *Project) Close() error {
p.projectCfg.Source.ActiveMemo.Close()
}

// Cancel running jobs ctxs
p.cancel()

return nil
}

Expand Down Expand Up @@ -127,8 +138,9 @@ func (p *Project) extraArgs() []string {

func (p *Project) jobs() []jobs.Job {
var j jobs.Job

isScript := p.projectCfg.Source.Inline != nil
ctx := context.WithValue(context.Background(), synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second)
ctx := context.WithValue(p.ctx, synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second)

if isScript {
src := p.projectCfg.Source.Inline.Script
Expand Down

0 comments on commit cbd1a38

Please sign in to comment.