diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1868cadc85f..c69ba380072 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.5.4...6.5[Check the HEAD diff] *Affecting all Beats* - Enforce validation for the Central Management access token. {issue}9621[9621] +- Start autodiscover consumers before producers. {pull}7926[7926] *Auditbeat* diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 75b545ee461..69d57f0eedc 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -103,11 +103,16 @@ func (a *Autodiscover) Start() { logp.Info("Starting autodiscover manager") a.listener = a.bus.Subscribe(a.adapter.EventFilter()...) + // It is important to start the worker first before starting the producer. + // In hosts that have large number of workloads, it is easy to have an initial + // sync of workloads to have a count that is greater than 100 (which is the size + // of the bounded Go channel. Starting the providers before the consumer would + // result in the channel filling up and never allowing the worker to start up. + go a.worker() + for _, provider := range a.providers { provider.Start() } - - go a.worker() } func (a *Autodiscover) worker() { diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 27b1bd338c8..6bcad2f226f 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -70,6 +70,7 @@ type watcher struct { k8sResourceFactory func() k8s.Resource items func() []k8s.Resource handler ResourceEventHandler + logger *logp.Logger } // NewWatcher initializes the watcher client to provide a events handler for @@ -82,6 +83,7 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa lastResourceVersion: "0", ctx: ctx, stop: cancel, + logger: logp.NewLogger("kubernetes"), } switch resource.(type) { // add resource type which you want to support watching here @@ -184,10 +186,12 @@ func (w *watcher) sync() error { return err } + w.logger.Debugf("Got %v items from the resource sync", len(w.items())) for _, item := range w.items() { w.onAdd(item) } + w.logger.Debugf("Done syncing %v items from the resource sync", len(w.items())) // Store last version w.lastResourceVersion = w.resourceList.GetMetadata().GetResourceVersion()