diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 23099a4fd43..3b23f426dc6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v6.7.2...6.x[Check the HEAD diff] - Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] +- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 6d5cf35ee69..c03427e4a4d 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -24,6 +24,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/elastic/beats/filebeat/channel" @@ -68,6 +69,7 @@ type Input struct { done chan struct{} numHarvesters atomic.Uint32 meta map[string]string + stopOnce sync.Once } // NewInput instantiates a new Log @@ -146,6 +148,8 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) cleanupNeeded = false + go p.stopWhenDone() + return p, nil } @@ -727,14 +731,27 @@ func (p *Input) Wait() { // Stop stops all harvesters and then stops the input func (p *Input) Stop() { - // Stop all harvesters - // In case the beatDone channel is closed, this will not wait for completion - // Otherwise Stop will wait until output is complete - p.harvesters.Stop() + p.stopOnce.Do(func() { + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.harvesters.Stop() + + // close state updater + p.stateOutlet.Close() + + // stop all communication between harvesters and publisher pipeline + p.outlet.Close() + }) +} - // close state updater - p.stateOutlet.Close() +// stopWhenDone takes care of stopping the input if some of the contexts are done +func (p *Input) stopWhenDone() { + select { + case <-p.done: + case <-p.stateOutlet.Done(): + case <-p.outlet.Done(): + } - // stop all communication between harvesters and publisher pipeline - p.outlet.Close() + p.Wait() }