From 40c439a90c30495d386d59d0d3ed6ef15c5bcd11 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 20 May 2019 14:57:25 +0200 Subject: [PATCH] Fix goroutine leak on non-explicit finalization of log inputs (#12164) (#12201) If log inputs were finished because their context, or one of their ouleters have been finished, then it wasn't stopping its harvesters, leaking resources. (cherry picked from commit fe8d27d6857cc82f2dc13e15e92696125e8bc3f8) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/input.go | 33 +++++++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b2c1cb901ee..f394fda4347 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v7.0.0...7.0[Check the HEAD diff] - Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] +- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 52ab225a4e6..cb44e61f5bd 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -24,6 +24,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/elastic/beats/filebeat/channel" @@ -68,6 +69,7 @@ type Input struct { done chan struct{} numHarvesters atomic.Uint32 meta map[string]string + stopOnce sync.Once } // NewInput instantiates a new Log @@ -146,6 +148,8 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) cleanupNeeded = false + go p.stopWhenDone() + return p, nil } @@ -727,14 +731,27 @@ func (p *Input) Wait() { // Stop stops all harvesters and then stops the input func (p *Input) Stop() { - // Stop all harvesters - // In case the beatDone channel is closed, this will not wait for completion - // Otherwise Stop will wait until output is complete - p.harvesters.Stop() + p.stopOnce.Do(func() { + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.harvesters.Stop() + + // close state updater + p.stateOutlet.Close() + + // stop all communication between harvesters and publisher pipeline + p.outlet.Close() + }) +} - // close state updater - p.stateOutlet.Close() +// stopWhenDone takes care of stopping the input if some of the contexts are done +func (p *Input) stopWhenDone() { + select { + case <-p.done: + case <-p.stateOutlet.Done(): + case <-p.outlet.Done(): + } - // stop all communication between harvesters and publisher pipeline - p.outlet.Close() + p.Wait() }