From fdcd174a3968c4c65ca83ea131c7fadcbac9f03c Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Mon, 27 Jan 2020 12:01:44 +0100 Subject: [PATCH 01/25] Adjust test cases first --- x-pack/metricbeat/metricbeat.reference.yml | 50 ------------------- .../module/activemq/_meta/config.yml | 30 ----------- .../module/activemq/broker/manifest.yml | 20 ++++++++ .../module/activemq/queue/manifest.yml | 10 ++++ .../module/activemq/topic/manifest.yml | 10 ++++ .../metricbeat/module/ibmmq/_meta/config.yml | 20 -------- .../metricbeat/module/ibmmq/qmgr/manifest.yml | 20 ++++++++ .../modules.d/activemq.yml.disabled | 30 ----------- .../metricbeat/modules.d/ibmmq.yml.disabled | 20 -------- 9 files changed, 60 insertions(+), 150 deletions(-) diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 9e94fc905b21..eb06b910b71d 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -143,36 +143,6 @@ metricbeat.modules: path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } #------------------------------ Aerospike Module ------------------------------ - module: aerospike @@ -517,26 +487,6 @@ metricbeat.modules: # the options for this metricset are also available here. metrics_path: /metrics - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } - #-------------------------------- Istio Module -------------------------------- # Istio mesh. To collect all all Mixer-generated metrics - module: istio diff --git a/x-pack/metricbeat/module/activemq/_meta/config.yml b/x-pack/metricbeat/module/activemq/_meta/config.yml index cddafae3352d..08791c5dc069 100644 --- a/x-pack/metricbeat/module/activemq/_meta/config.yml +++ b/x-pack/metricbeat/module/activemq/_meta/config.yml @@ -5,33 +5,3 @@ path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } diff --git a/x-pack/metricbeat/module/activemq/broker/manifest.yml b/x-pack/metricbeat/module/activemq/broker/manifest.yml index b266e4a054de..8b2746182329 100644 --- a/x-pack/metricbeat/module/activemq/broker/manifest.yml +++ b/x-pack/metricbeat/module/activemq/broker/manifest.yml @@ -27,3 +27,23 @@ input: field: messages.count - attr: TotalProducerCount field: producers.count +processors: + - script: + lang: javascript + source: > + function process(event) { + var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") + if (broker_memory_broker_pct != null) { + event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) + } + + var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") + if (broker_memory_temlep_pct != null) { + event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) + } + + var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") + if (broker_memory_store_pct != null) { + event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) + } + } diff --git a/x-pack/metricbeat/module/activemq/queue/manifest.yml b/x-pack/metricbeat/module/activemq/queue/manifest.yml index 5521f5e68862..84ea800aa45c 100644 --- a/x-pack/metricbeat/module/activemq/queue/manifest.yml +++ b/x-pack/metricbeat/module/activemq/queue/manifest.yml @@ -35,3 +35,13 @@ input: field: messages.enqueue.time.min - attr: ProducerCount field: producers.count +processors: + - script: + lang: javascript + source: > + function process(event) { + var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") + if (queue_memory_broker_pct != null) { + event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) + } + } diff --git a/x-pack/metricbeat/module/activemq/topic/manifest.yml b/x-pack/metricbeat/module/activemq/topic/manifest.yml index 7f9bc9f3ae5e..a2e64f0868f6 100644 --- a/x-pack/metricbeat/module/activemq/topic/manifest.yml +++ b/x-pack/metricbeat/module/activemq/topic/manifest.yml @@ -33,3 +33,13 @@ input: field: messages.enqueue.time.min - attr: ProducerCount field: producers.count +processors: + - script: + lang: javascript + source: > + function process(event) { + var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") + if (topic_memory_broker_pct != null) { + event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) + } + } diff --git a/x-pack/metricbeat/module/ibmmq/_meta/config.yml b/x-pack/metricbeat/module/ibmmq/_meta/config.yml index 2f8973d97302..f16580b07afe 100644 --- a/x-pack/metricbeat/module/ibmmq/_meta/config.yml +++ b/x-pack/metricbeat/module/ibmmq/_meta/config.yml @@ -6,23 +6,3 @@ # This module uses the Prometheus collector metricset, all # the options for this metricset are also available here. metrics_path: /metrics - - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } diff --git a/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml b/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml index ec802f1ca1b3..f55504521d9a 100644 --- a/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml +++ b/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml @@ -4,3 +4,23 @@ input: metricset: collector defaults: metrics_path: /metrics + +# The custom processor is responsible for filtering Prometheus metrics +# not stricly related to the IBM MQ domain, e.g. system load, process, +# metrics HTTP server. +processors: + - script: + lang: javascript + source: > + function process(event) { + var metrics = event.Get("prometheus.metrics"); + Object.keys(metrics).forEach(function(key) { + if (!(key.match(/^ibmmq_.*$/))) { + event.Delete("prometheus.metrics." + key); + } + }); + metrics = event.Get("prometheus.metrics"); + if (Object.keys(metrics).length == 0) { + event.Cancel(); + } + } diff --git a/x-pack/metricbeat/modules.d/activemq.yml.disabled b/x-pack/metricbeat/modules.d/activemq.yml.disabled index 8759b7cbe524..33716db01c9d 100644 --- a/x-pack/metricbeat/modules.d/activemq.yml.disabled +++ b/x-pack/metricbeat/modules.d/activemq.yml.disabled @@ -8,33 +8,3 @@ path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } diff --git a/x-pack/metricbeat/modules.d/ibmmq.yml.disabled b/x-pack/metricbeat/modules.d/ibmmq.yml.disabled index 93d77367a972..a2fdf552f1ca 100644 --- a/x-pack/metricbeat/modules.d/ibmmq.yml.disabled +++ b/x-pack/metricbeat/modules.d/ibmmq.yml.disabled @@ -9,23 +9,3 @@ # This module uses the Prometheus collector metricset, all # the options for this metricset are also available here. metrics_path: /metrics - - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } From b13980c905beb7ce497422aaec0e6e13a9c93564 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Tue, 28 Jan 2020 20:01:49 +0100 Subject: [PATCH 02/25] Setup processors for light modules --- metricbeat/beater/metricbeat.go | 2 +- metricbeat/mb/lightmodules.go | 20 +++++++++ metricbeat/mb/module/factory.go | 41 ++++++++++++------- metricbeat/mb/module/publish.go | 31 +++++++++++++- metricbeat/mb/module/runner.go | 31 +++++++++++--- metricbeat/mb/registry.go | 32 +++++++++++++++ .../module/activemq/docker-compose.yml | 2 +- 7 files changed, 135 insertions(+), 24 deletions(-) diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 8d27d8066664..695bf5f96109 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -227,7 +227,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { return err } - r := module.NewRunner(client, m.module) + r := module.NewRunnerForStaticModule(client, m.module) r.Start() wg.Add(1) go func() { diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index 51269c363942..ca2c9ddacbb3 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -150,6 +150,26 @@ type lightModuleConfig struct { MetricSets []string `config:"metricsets"` } +func (s *LightModulesSource) UnpackMetricSetConfiguration(r *Register, moduleName string , metricSetName string, + cfg interface{}) error { + + modulePath, found := s.findModulePath(moduleName) + if !found { + return fmt.Errorf("module '%s' not found", moduleName) + } + + manifestPath := filepath.Join(filepath.Dir(modulePath), metricSetName, manifestYML) + config, err := common.LoadFile(manifestPath) + if err != nil { + return errors.Wrapf(err, "failed to load metricset manifest from '%s'", manifestPath) + } + + if err := config.Unpack(cfg); err != nil { + return errors.Wrapf(err, "failed to parse metricset manifest from '%s'", manifestPath) + } + return nil +} + // LightModule contains the definition of a light module type LightModule struct { Name string diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 8661bee3d035..9fcb67e5c021 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -18,7 +18,7 @@ package module import ( - "github.com/joeshaw/multierror" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" @@ -43,27 +43,38 @@ func NewFactory(beatInfo beat.Info, options ...Option) *Factory { // Create creates a new metricbeat module runner reporting events to the passed pipeline. func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - var errs multierror.Errors - - connector, err := NewConnector(r.beatInfo, p, c, meta) - if err != nil { - errs = append(errs, err) - } w, err := NewWrapper(c, mb.Registry, r.options...) if err != nil { - errs = append(errs, err) - } - - if err := errs.Err(); err != nil { return nil, err } - client, err := connector.Connect() - if err != nil { - return nil, err + clients := map[string]beat.Client{} + + for _, msw := range w.MetricSets() { + processorsForMetricSet, err := mb.Registry.ProcessorsForMetricSet(msw.Module().Name(), msw.MetricSet.Name()) + if err != nil { + return nil, errors.Wrapf(err, "reading metricset processors failed (module: %s, metricset: %s)", + msw.Module().Name(), msw.MetricSet.Name()) + } + + connector, err := NewConnector(r.beatInfo, p, c, meta) + if err != nil { + return nil, err + } + + for _, p := range processorsForMetricSet.List { + connector.processors.AddProcessor(p) + } + + client, err := connector.Connect() + if err != nil { + return nil, err + } + + clients[msw.MetricSet.Name()] = client } - mr := NewRunner(client, w) + mr := NewRunner(clients, w) return mr, nil } diff --git a/metricbeat/mb/module/publish.go b/metricbeat/mb/module/publish.go index 40ce01549fe4..374db628cdbe 100644 --- a/metricbeat/mb/module/publish.go +++ b/metricbeat/mb/module/publish.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/logp" ) // PublishChannels publishes the events read from each channel to the given @@ -31,14 +32,40 @@ import ( // and are fully read. To stop the method immediately, close the channels and // close the publisher client to ensure that publishing does not block. This // may result is some events being discarded. -func PublishChannels(client beat.Client, cs ...<-chan beat.Event) { +func PublishChannels(clients map[string]beat.Client, cs ...<-chan beat.Event) { var wg sync.WaitGroup // output publishes values from c until c is closed, then calls wg.Done. output := func(c <-chan beat.Event) { defer wg.Done() + logger := logp.NewLogger("PublishChannels") + + sink, staticModule := clients["@static"] + for event := range c { - client.Publish(event) + if staticModule { + sink.Publish(event) + continue + } + + v, err := event.Fields.GetValue("metricset.name") + if err != nil { + logger.Errorf("Error occurred while retrieving key 'metricset': %v", err) + continue + } + + metricSetName, ok := v.(string) + if !ok { + logger.Error("Non-string type of 'metricset'") + continue + } + + if _, ok := clients[metricSetName]; !ok { + logger.Errorf("Non-registered metricset client (name: %s)", metricSetName) + continue + } + + clients[metricSetName].Publish(event) } } diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index c60a009caea3..171ccb9ac1af 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -19,6 +19,7 @@ package module import ( "fmt" + "github.com/elastic/beats/libbeat/logp" "sync" "github.com/elastic/beats/libbeat/beat" @@ -55,21 +56,35 @@ type Runner interface { // NewRunner returns a Runner facade. The events generated by // the Module will be published to a new publisher.Client generated from the // pubClientFactory. -func NewRunner(client beat.Client, mod *Wrapper) Runner { +func NewRunner(clients map[string]beat.Client, mod *Wrapper) Runner { return &runner{ + logger: logp.NewLogger("runner"), done: make(chan struct{}), mod: mod, - client: client, + clients: clients, + } +} + +// NewRunner returns a Runner facade. The events generated by +// the Module will be published to a new publisher.Client generated from the +// pubClientFactory. +func NewRunnerForStaticModule(client beat.Client, mod *Wrapper) Runner { + return &runner{ + logger: logp.NewLogger("runner"), + done: make(chan struct{}), + mod: mod, + clients: map[string]beat.Client{"@static": client}, } } type runner struct { + logger *logp.Logger done chan struct{} wg sync.WaitGroup startOnce sync.Once stopOnce sync.Once mod *Wrapper - client beat.Client + clients map[string]beat.Client } func (mr *runner) Start() { @@ -79,7 +94,7 @@ func (mr *runner) Start() { moduleList.Add(mr.mod.Name()) go func() { defer mr.wg.Done() - PublishChannels(mr.client, output) + PublishChannels(mr.clients, output) }() }) } @@ -87,7 +102,13 @@ func (mr *runner) Start() { func (mr *runner) Stop() { mr.stopOnce.Do(func() { close(mr.done) - mr.client.Close() + + for metricSetName, client := range mr.clients { + err := client.Close() + if err != nil { + mr.logger.Errorf("Error occurred while stopping runner (name: %s): %v", metricSetName, err) + } + } mr.wg.Wait() moduleList.Remove(mr.mod.Name()) }) diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index d57d1999a5e6..16ad9c7586ef 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -19,6 +19,7 @@ package mb import ( "fmt" + "github.com/elastic/beats/libbeat/processors" "sort" "strings" "sync" @@ -121,6 +122,7 @@ type ModulesSource interface { HasMetricSet(module, name string) bool MetricSetRegistration(r *Register, module, name string) (MetricSetRegistration, error) ModulesInfo(r *Register) string + UnpackMetricSetConfiguration(r *Register, moduleName string, metricSetName string, cfg interface{}) error } // NewRegister creates and returns a new Register. @@ -362,6 +364,36 @@ func (r *Register) MetricSets(module string) []string { return metricsets } +func (r *Register) ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + var procs *processors.Processors + + moduleName = strings.ToLower(moduleName) + metricSetName = strings.ToLower(metricSetName) + + metricSets, exists := r.metricSets[moduleName] + if exists { + _, exists := metricSets[metricSetName] + if exists { + return procs, nil // Standard metric sets don't have processor definitions. + } + } + + if source := r.secondarySource; source != nil && source.HasMetricSet(moduleName, metricSetName) { + config := struct { + Processors processors.PluginConfig `config:"processors"` + }{} + err := source.UnpackMetricSetConfiguration(r, moduleName, metricSetName, &config) + if err != nil { + return nil, err + } + return processors.New(config.Processors) + } + return procs, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, metricSetName, moduleName) +} + // SetSecondarySource sets an additional source of modules func (r *Register) SetSecondarySource(source ModulesSource) { r.lock.Lock() diff --git a/x-pack/metricbeat/module/activemq/docker-compose.yml b/x-pack/metricbeat/module/activemq/docker-compose.yml index 63f32bb0b81d..1e1472c652ed 100644 --- a/x-pack/metricbeat/module/activemq/docker-compose.yml +++ b/x-pack/metricbeat/module/activemq/docker-compose.yml @@ -8,5 +8,5 @@ services: args: ACTIVEMQ_VERSION: ${ACTIVEMQ_VERSION:-5.15.9} ports: - - 8161 + - 8161:8161 - 61613 From e589187c8bec84d954028cf6c6c3de2c4059f9da Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Tue, 28 Jan 2020 20:13:04 +0100 Subject: [PATCH 03/25] Fix: comments, mage check --- metricbeat/mb/lightmodules.go | 3 ++- metricbeat/mb/module/publish.go | 4 +++- metricbeat/mb/module/runner.go | 21 +++++++++++---------- metricbeat/mb/registry.go | 3 ++- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index ca2c9ddacbb3..3cf82499efec 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -150,7 +150,8 @@ type lightModuleConfig struct { MetricSets []string `config:"metricsets"` } -func (s *LightModulesSource) UnpackMetricSetConfiguration(r *Register, moduleName string , metricSetName string, +// UnpackMetricSetConfiguration deserializes metricset configuration to the given structure. +func (s *LightModulesSource) UnpackMetricSetConfiguration(r *Register, moduleName string, metricSetName string, cfg interface{}) error { modulePath, found := s.findModulePath(moduleName) diff --git a/metricbeat/mb/module/publish.go b/metricbeat/mb/module/publish.go index 374db628cdbe..25e75b3da357 100644 --- a/metricbeat/mb/module/publish.go +++ b/metricbeat/mb/module/publish.go @@ -24,6 +24,8 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +const staticModuleSink = "@static" + // PublishChannels publishes the events read from each channel to the given // publisher client. If the publisher client blocks for any reason then events // will not be read from the given channels. @@ -40,7 +42,7 @@ func PublishChannels(clients map[string]beat.Client, cs ...<-chan beat.Event) { defer wg.Done() logger := logp.NewLogger("PublishChannels") - sink, staticModule := clients["@static"] + sink, staticModule := clients[staticModuleSink] for event := range c { if staticModule { diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 171ccb9ac1af..938edf77e267 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -19,9 +19,10 @@ package module import ( "fmt" - "github.com/elastic/beats/libbeat/logp" "sync" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/monitoring" ) @@ -54,26 +55,26 @@ type Runner interface { } // NewRunner returns a Runner facade. The events generated by -// the Module will be published to a new publisher.Client generated from the +// the Module will be published to one of new publisher.Clients generated from the // pubClientFactory. func NewRunner(clients map[string]beat.Client, mod *Wrapper) Runner { return &runner{ - logger: logp.NewLogger("runner"), - done: make(chan struct{}), - mod: mod, + logger: logp.NewLogger("runner"), + done: make(chan struct{}), + mod: mod, clients: clients, } } -// NewRunner returns a Runner facade. The events generated by +// NewRunnerForStaticModule returns a Runner facade. The events generated by // the Module will be published to a new publisher.Client generated from the // pubClientFactory. func NewRunnerForStaticModule(client beat.Client, mod *Wrapper) Runner { return &runner{ - logger: logp.NewLogger("runner"), - done: make(chan struct{}), - mod: mod, - clients: map[string]beat.Client{"@static": client}, + logger: logp.NewLogger("runner"), + done: make(chan struct{}), + mod: mod, + clients: map[string]beat.Client{staticModuleSink: client}, } } diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index 16ad9c7586ef..2541505b9100 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -19,11 +19,12 @@ package mb import ( "fmt" - "github.com/elastic/beats/libbeat/processors" "sort" "strings" "sync" + "github.com/elastic/beats/libbeat/processors" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/logp" From 0c4f502e9037250eb919d0ca762f140ae5e24b81 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 09:30:51 +0100 Subject: [PATCH 04/25] Adjust code --- metricbeat/mb/module/connector.go | 12 ++++++++++++ metricbeat/mb/module/example_test.go | 2 +- metricbeat/mb/module/factory.go | 14 +------------- metricbeat/mb/module/runner_test.go | 2 +- metricbeat/mb/registry.go | 7 +++---- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index b85aecc93285..4d5365fb03cd 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -23,6 +23,8 @@ import ( "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/processors/add_formatted_index" + "github.com/elastic/beats/metricbeat/mb" + "github.com/pkg/errors" ) // Connector configures and establishes a beat.Client for publishing events @@ -70,6 +72,16 @@ func NewConnector( }, nil } +func (c *Connector) UseMetricSetProcessors(r *mb.Register, moduleName, metricSetName string) error { + metricSetProcessors, err := mb.Registry.ProcessorsForMetricSet(moduleName, metricSetName) + if err != nil { + return errors.Wrapf(err, "reading metricset processors failed (module: %s, metricset: %s)", + moduleName, metricSetName) + } + c.processors.AddProcessors(*metricSetProcessors) + return nil +} + func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ Processing: beat.ProcessingConfig{ diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index e5fc7e81e830..6638056032cb 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -143,7 +143,7 @@ func ExampleRunner() { } // Create the Runner facade. - runner := module.NewRunner(client, m) + runner := module.NewRunnerForStaticModule(client, m) // Start the module and have it publish to a new publisher.Client. runner.Start() diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 9fcb67e5c021..89d9349bf404 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -18,8 +18,6 @@ package module import ( - "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -51,26 +49,16 @@ func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP clients := map[string]beat.Client{} for _, msw := range w.MetricSets() { - processorsForMetricSet, err := mb.Registry.ProcessorsForMetricSet(msw.Module().Name(), msw.MetricSet.Name()) - if err != nil { - return nil, errors.Wrapf(err, "reading metricset processors failed (module: %s, metricset: %s)", - msw.Module().Name(), msw.MetricSet.Name()) - } - connector, err := NewConnector(r.beatInfo, p, c, meta) if err != nil { return nil, err } - - for _, p := range processorsForMetricSet.List { - connector.processors.AddProcessor(p) - } + connector.UseMetricSetProcessors(mb.Registry, msw.Name(), msw.MetricSet.Name()) client, err := connector.Connect() if err != nil { return nil, err } - clients[msw.MetricSet.Name()] = client } diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index 140e6333400d..a71bed7975ea 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -49,7 +49,7 @@ func TestRunner(t *testing.T) { } // Create the Runner facade. - runner := module.NewRunner(factory(), m) + runner := module.NewRunnerForStaticModule(factory(), m) // Start the module and have it publish to a new publisher.Client. runner.Start() diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index 2541505b9100..fb2f605d9233 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -365,12 +365,11 @@ func (r *Register) MetricSets(module string) []string { return metricsets } +// ProcessorsForMetricSet returns a list of processors defined in manifest of the registered metricset. func (r *Register) ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) { r.lock.RLock() defer r.lock.RUnlock() - var procs *processors.Processors - moduleName = strings.ToLower(moduleName) metricSetName = strings.ToLower(metricSetName) @@ -378,7 +377,7 @@ func (r *Register) ProcessorsForMetricSet(moduleName, metricSetName string) (*pr if exists { _, exists := metricSets[metricSetName] if exists { - return procs, nil // Standard metric sets don't have processor definitions. + return nil, nil // Standard metric sets don't have processor definitions. } } @@ -392,7 +391,7 @@ func (r *Register) ProcessorsForMetricSet(moduleName, metricSetName string) (*pr } return processors.New(config.Processors) } - return procs, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, metricSetName, moduleName) + return nil, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, metricSetName, moduleName) } // SetSecondarySource sets an additional source of modules From 6e7953f7c2e30c1f1b3b7e7797d79562c3e1ef29 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 09:34:33 +0100 Subject: [PATCH 05/25] Fix: missing comment --- metricbeat/mb/module/connector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 4d5365fb03cd..a17978796d88 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -72,6 +72,7 @@ func NewConnector( }, nil } +// UseMetricSetProcessors appends processors defined in metricset configuration to the connector properties. func (c *Connector) UseMetricSetProcessors(r *mb.Register, moduleName, metricSetName string) error { metricSetProcessors, err := mb.Registry.ProcessorsForMetricSet(moduleName, metricSetName) if err != nil { From 5399adc978f4f43e5d17fc325a4bd110de68ccb8 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 09:46:28 +0100 Subject: [PATCH 06/25] Fix: mage check --- metricbeat/docs/modules/activemq.asciidoc | 30 ----------------------- metricbeat/docs/modules/ibmmq.asciidoc | 20 --------------- metricbeat/mb/module/connector.go | 3 ++- 3 files changed, 2 insertions(+), 51 deletions(-) diff --git a/metricbeat/docs/modules/activemq.asciidoc b/metricbeat/docs/modules/activemq.asciidoc index 3bd03addf9cc..8f3e0551fd16 100644 --- a/metricbeat/docs/modules/activemq.asciidoc +++ b/metricbeat/docs/modules/activemq.asciidoc @@ -35,36 +35,6 @@ metricbeat.modules: path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } ---- [float] diff --git a/metricbeat/docs/modules/ibmmq.asciidoc b/metricbeat/docs/modules/ibmmq.asciidoc index 4912139f9dc1..911659d37d47 100644 --- a/metricbeat/docs/modules/ibmmq.asciidoc +++ b/metricbeat/docs/modules/ibmmq.asciidoc @@ -55,26 +55,6 @@ metricbeat.modules: # This module uses the Prometheus collector metricset, all # the options for this metricset are also available here. metrics_path: /metrics - - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } ---- It also supports the options described in <>. diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index a17978796d88..f77b95d26c8a 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -18,13 +18,14 @@ package module import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/processors/add_formatted_index" "github.com/elastic/beats/metricbeat/mb" - "github.com/pkg/errors" ) // Connector configures and establishes a beat.Client for publishing events From 4b852cf72ccbd0bd295889c325aa655c66212421 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 12:11:23 +0100 Subject: [PATCH 07/25] Test light modules --- metricbeat/mb/lightmodules_test.go | 29 +++++++++++++++++-- .../unpack/metricset/manifest.yml | 8 +++++ .../testdata/lightmodules/unpack/module.yml | 3 ++ 3 files changed, 37 insertions(+), 3 deletions(-) create mode 100644 metricbeat/mb/testdata/lightmodules/unpack/metricset/manifest.yml create mode 100644 metricbeat/mb/testdata/lightmodules/unpack/module.yml diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index 4278adc049b0..aed0ae80cf7d 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -23,11 +23,13 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/add_id" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestLightModulesAsModuleSource checks that registry correctly lists @@ -302,6 +304,27 @@ func TestNewModulesCallModuleFactory(t *testing.T) { assert.True(t, called, "module factory must be called if registered") } +func TestUnpackMetricSetConfiguration(t *testing.T) { + logp.TestingSetup() + + r := NewRegister() + r.MustAddMetricSet("foo", "bar", newMetricSetWithOption) + source := NewLightModulesSource("testdata/lightmodules") + r.SetSecondarySource(source) + + config := struct { + Processors processors.PluginConfig `config:"processors"` + }{} + err := source.UnpackMetricSetConfiguration(r, "unpack", "metricset", &config) + require.NoError(t, err) + + p, err := processors.New(config.Processors) + require.NoError(t, err) + + assert.Len(t, p.List, 1) + assert.Equal(t, "add_id=[target_field=[@metadata.id]]", p.List[0].String()) +} + type metricSetWithOption struct { BaseMetricSet Option string diff --git a/metricbeat/mb/testdata/lightmodules/unpack/metricset/manifest.yml b/metricbeat/mb/testdata/lightmodules/unpack/metricset/manifest.yml new file mode 100644 index 000000000000..63984b9bb3d9 --- /dev/null +++ b/metricbeat/mb/testdata/lightmodules/unpack/metricset/manifest.yml @@ -0,0 +1,8 @@ +default: true +input: + module: foo + metricset: bar + defaults: + option: test +processors: + - add_id: diff --git a/metricbeat/mb/testdata/lightmodules/unpack/module.yml b/metricbeat/mb/testdata/lightmodules/unpack/module.yml new file mode 100644 index 000000000000..74828a9c80f0 --- /dev/null +++ b/metricbeat/mb/testdata/lightmodules/unpack/module.yml @@ -0,0 +1,3 @@ +name: service +metricsets: +- metricset From f94010369b924f6890a4f9ff0d3a713bd02dd6e7 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 12:42:50 +0100 Subject: [PATCH 08/25] Test connector --- metricbeat/mb/lightmodules_test.go | 24 ++++++++++- metricbeat/mb/module/connector.go | 9 ++-- metricbeat/mb/module/connector_test.go | 41 +++++++++++++++++++ .../testdata/lightmodules/unpack/module.yml | 3 +- .../unpack/noprocessors/manifest.yml | 6 +++ .../manifest.yml | 0 6 files changed, 77 insertions(+), 6 deletions(-) create mode 100644 metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml rename metricbeat/mb/testdata/lightmodules/unpack/{metricset => withprocessors}/manifest.yml (100%) diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index aed0ae80cf7d..de9ad0546979 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -304,7 +304,7 @@ func TestNewModulesCallModuleFactory(t *testing.T) { assert.True(t, called, "module factory must be called if registered") } -func TestUnpackMetricSetConfiguration(t *testing.T) { +func TestUnpackMetricSetConfiguration_ProcessorsDefined(t *testing.T) { logp.TestingSetup() r := NewRegister() @@ -315,7 +315,7 @@ func TestUnpackMetricSetConfiguration(t *testing.T) { config := struct { Processors processors.PluginConfig `config:"processors"` }{} - err := source.UnpackMetricSetConfiguration(r, "unpack", "metricset", &config) + err := source.UnpackMetricSetConfiguration(r, "unpack", "withprocessors", &config) require.NoError(t, err) p, err := processors.New(config.Processors) @@ -325,6 +325,26 @@ func TestUnpackMetricSetConfiguration(t *testing.T) { assert.Equal(t, "add_id=[target_field=[@metadata.id]]", p.List[0].String()) } +func TestUnpackMetricSetConfiguration_ProcessorsUndefined(t *testing.T) { + logp.TestingSetup() + + r := NewRegister() + r.MustAddMetricSet("foo", "bar", newMetricSetWithOption) + source := NewLightModulesSource("testdata/lightmodules") + r.SetSecondarySource(source) + + config := struct { + Processors processors.PluginConfig `config:"processors"` + }{} + err := source.UnpackMetricSetConfiguration(r, "unpack", "noprocessors", &config) + require.NoError(t, err) + + p, err := processors.New(config.Processors) + require.NoError(t, err) + + assert.Len(t, p.List, 0) +} + type metricSetWithOption struct { BaseMetricSet Option string diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index f77b95d26c8a..0637d4759318 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/processors/add_formatted_index" - "github.com/elastic/beats/metricbeat/mb" ) // Connector configures and establishes a beat.Client for publishing events @@ -50,6 +49,10 @@ type connectorConfig struct { common.EventMetadata `config:",inline"` // Fields and tags to add to events. } +type metricSetRegister interface { + ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) +} + func NewConnector( beatInfo beat.Info, pipeline beat.Pipeline, c *common.Config, dynFields *common.MapStrPointer, @@ -74,8 +77,8 @@ func NewConnector( } // UseMetricSetProcessors appends processors defined in metricset configuration to the connector properties. -func (c *Connector) UseMetricSetProcessors(r *mb.Register, moduleName, metricSetName string) error { - metricSetProcessors, err := mb.Registry.ProcessorsForMetricSet(moduleName, metricSetName) +func (c *Connector) UseMetricSetProcessors(r metricSetRegister, moduleName, metricSetName string) error { + metricSetProcessors, err := r.ProcessorsForMetricSet(moduleName, metricSetName) if err != nil { return errors.Wrapf(err, "reading metricset processors failed (module: %s, metricset: %s)", moduleName, metricSetName) diff --git a/metricbeat/mb/module/connector_test.go b/metricbeat/mb/module/connector_test.go index 40789374bf81..773a160b64f2 100644 --- a/metricbeat/mb/module/connector_test.go +++ b/metricbeat/mb/module/connector_test.go @@ -18,13 +18,16 @@ package module import ( + "errors" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" ) func TestProcessorsForConfig(t *testing.T) { @@ -91,6 +94,44 @@ func TestProcessorsForConfig(t *testing.T) { } } +type fakeMetricSetRegister struct { + success bool +} + +func (fmsr *fakeMetricSetRegister) ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) { + if !fmsr.success { + return nil, errors.New("failure") + } + + procs := new(processors.Processors) + procs.List = []processors.Processor{nil, nil} + return procs, nil +} + +func TestUseMetricSetProcessors_ReadingProcessorsFailed(t *testing.T) { + r := new(fakeMetricSetRegister) + + var connector Connector + err := connector.UseMetricSetProcessors(r, "module", "metricset") + require.Error(t, err) + require.Nil(t, connector.processors) +} + +func TestUseMetricSetProcessors_ReadingProcessorsSucceeded(t *testing.T) { + r := &fakeMetricSetRegister{ + success: true, + } + + connector := Connector{ + processors: &processors.Processors{ + List: []processors.Processor{}, + }, + } + err := connector.UseMetricSetProcessors(r, "module", "metricset") + require.Nil(t, err) + require.Len(t, connector.processors.List, 2) +} + // Helper function to convert from YML input string to an unpacked // connectorConfig func connectorConfigFromString(s string) (connectorConfig, error) { diff --git a/metricbeat/mb/testdata/lightmodules/unpack/module.yml b/metricbeat/mb/testdata/lightmodules/unpack/module.yml index 74828a9c80f0..d86e9c4ae450 100644 --- a/metricbeat/mb/testdata/lightmodules/unpack/module.yml +++ b/metricbeat/mb/testdata/lightmodules/unpack/module.yml @@ -1,3 +1,4 @@ name: service metricsets: -- metricset +- withprocessors +- noprocessors diff --git a/metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml b/metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml new file mode 100644 index 000000000000..5291cac44e68 --- /dev/null +++ b/metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml @@ -0,0 +1,6 @@ +default: true +input: + module: foo + metricset: bar + defaults: + option: test diff --git a/metricbeat/mb/testdata/lightmodules/unpack/metricset/manifest.yml b/metricbeat/mb/testdata/lightmodules/unpack/withprocessors/manifest.yml similarity index 100% rename from metricbeat/mb/testdata/lightmodules/unpack/metricset/manifest.yml rename to metricbeat/mb/testdata/lightmodules/unpack/withprocessors/manifest.yml From 1b9a48f33cb5af63bdb2a71ea2a5d82ec36ff6e8 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 13:39:21 +0100 Subject: [PATCH 09/25] Test runner --- metricbeat/mb/module/connector_test.go | 2 +- metricbeat/mb/module/runner_test.go | 63 +++++++++++++++++++++----- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/metricbeat/mb/module/connector_test.go b/metricbeat/mb/module/connector_test.go index 773a160b64f2..9ca3e48cd17e 100644 --- a/metricbeat/mb/module/connector_test.go +++ b/metricbeat/mb/module/connector_test.go @@ -128,7 +128,7 @@ func TestUseMetricSetProcessors_ReadingProcessorsSucceeded(t *testing.T) { }, } err := connector.UseMetricSetProcessors(r, "module", "metricset") - require.Nil(t, err) + require.NoError(t, err) require.Len(t, connector.processors.List, 2) } diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index a71bed7975ea..8e124b2047f0 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -20,33 +20,31 @@ package module_test import ( + "strings" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" pubtest "github.com/elastic/beats/libbeat/publisher/testing" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" - - "github.com/stretchr/testify/assert" ) -func TestRunner(t *testing.T) { - pubClient, factory := newPubClientFactory() +func TestRunnerForStaticModule(t *testing.T) { + pubClient, factory := newPubClientFactoryForStaticModule() config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, "metricsets": []string{eventFetcherName}, }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create a new Wrapper based on the configuration. m, err := module.NewWrapper(config, mb.Registry, module.WithMetricSetInfo()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create the Runner facade. runner := module.NewRunnerForStaticModule(factory(), m) @@ -61,10 +59,51 @@ func TestRunner(t *testing.T) { runner.Stop() } -// newPubClientFactory returns a new ChanClient and a function that returns +// newPubClientFactoryForStaticModule returns a new ChanClient and a function that returns // the same Client when invoked. This simulates the return value of // Publisher.Connect. -func newPubClientFactory() (*pubtest.ChanClient, func() beat.Client) { +func newPubClientFactoryForStaticModule() (*pubtest.ChanClient, func() beat.Client) { client := pubtest.NewChanClient(10) return client, func() beat.Client { return client } } + +func TestRunner(t *testing.T) { + pubClients, factory := newPubClientFactory() + + config, err := common.NewConfigFrom(map[string]interface{}{ + "module": moduleName, + "metricsets": []string{eventFetcherName, reportingFetcherName}, + }) + require.NoError(t, err) + + // Create a new Wrapper based on the configuration. + m, err := module.NewWrapper(config, mb.Registry, module.WithMetricSetInfo()) + require.NoError(t, err) + + // Create the Runner facade. + runner := module.NewRunner(factory(), m) + + // Start the module and have it publish to a new publisher.Client. + runner.Start() + + assert.NotNil(t, <-pubClients[0].Channel) + assert.NotNil(t, <-pubClients[1].Channel) + + // Stop the module. This blocks until all MetricSets in the Module have + // stopped and the publisher.Client is closed. + runner.Stop() +} + +// newPubClientFactory returns new ChanClients and a function that returns +// the same Clients when invoked. This simulates the return value of +// Publisher.Connect. +func newPubClientFactory() ([]*pubtest.ChanClient, func() map[string]beat.Client) { + firstClient := pubtest.NewChanClient(10) + secondClient := pubtest.NewChanClient(10) + return []*pubtest.ChanClient{firstClient, secondClient}, func() map[string]beat.Client { + return map[string]beat.Client{ + strings.ToLower(eventFetcherName): firstClient, + strings.ToLower(reportingFetcherName): secondClient, + } + } +} From 500104ae278c4078db5f961072f1ea20761d6e8c Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 13:43:44 +0100 Subject: [PATCH 10/25] Fix: ToLower --- metricbeat/mb/module/factory.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 89d9349bf404..e39b81e23a06 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -18,6 +18,8 @@ package module import ( + "strings" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -59,7 +61,7 @@ func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP if err != nil { return nil, err } - clients[msw.MetricSet.Name()] = client + clients[strings.ToLower(msw.MetricSet.Name())] = client } mr := NewRunner(clients, w) From 18c98aa0f63c06a90210840423a5fe51075ada45 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 17:05:47 +0100 Subject: [PATCH 11/25] Adjust code after review --- metricbeat/mb/lightmetricset.go | 2 ++ metricbeat/mb/lightmodules.go | 26 ++++++------------- metricbeat/mb/lightmodules_test.go | 41 ------------------------------ metricbeat/mb/registry.go | 13 +++------- 4 files changed, 13 insertions(+), 69 deletions(-) diff --git a/metricbeat/mb/lightmetricset.go b/metricbeat/mb/lightmetricset.go index eb0ec0749184..37052601ad84 100644 --- a/metricbeat/mb/lightmetricset.go +++ b/metricbeat/mb/lightmetricset.go @@ -18,6 +18,7 @@ package mb import ( + "github.com/elastic/beats/libbeat/processors" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -33,6 +34,7 @@ type LightMetricSet struct { MetricSet string `config:"metricset" validate:"required"` Defaults interface{} `config:"defaults"` } `config:"input" validate:"required"` + Processors processors.PluginConfig `config:"processors"` } // Registration obtains a metric set registration for this light metric set, this registration diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index 3cf82499efec..aa0b78d7d82c 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -19,6 +19,7 @@ package mb import ( "fmt" + "github.com/elastic/beats/libbeat/processors" "io/ioutil" "os" "path/filepath" @@ -150,25 +151,14 @@ type lightModuleConfig struct { MetricSets []string `config:"metricsets"` } -// UnpackMetricSetConfiguration deserializes metricset configuration to the given structure. -func (s *LightModulesSource) UnpackMetricSetConfiguration(r *Register, moduleName string, metricSetName string, - cfg interface{}) error { - - modulePath, found := s.findModulePath(moduleName) - if !found { - return fmt.Errorf("module '%s' not found", moduleName) - } - - manifestPath := filepath.Join(filepath.Dir(modulePath), metricSetName, manifestYML) - config, err := common.LoadFile(manifestPath) - if err != nil { - return errors.Wrapf(err, "failed to load metricset manifest from '%s'", manifestPath) - } - - if err := config.Unpack(cfg); err != nil { - return errors.Wrapf(err, "failed to parse metricset manifest from '%s'", manifestPath) +// ProcessorsForMetricSet returns processors defined for the light metricset. +func (s *LightModulesSource) ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) { + module, err := s.loadModule(r, moduleName) + metricSet, ok := module.MetricSets[metricSetName] + if !ok { + return nil, errors.Wrapf(err, "unknown metricset '%s' in module '%s'", metricSetName, moduleName) } - return nil + return processors.New(metricSet.Processors) } // LightModule contains the definition of a light module diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index de9ad0546979..f24fab1c0aae 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -304,47 +304,6 @@ func TestNewModulesCallModuleFactory(t *testing.T) { assert.True(t, called, "module factory must be called if registered") } -func TestUnpackMetricSetConfiguration_ProcessorsDefined(t *testing.T) { - logp.TestingSetup() - - r := NewRegister() - r.MustAddMetricSet("foo", "bar", newMetricSetWithOption) - source := NewLightModulesSource("testdata/lightmodules") - r.SetSecondarySource(source) - - config := struct { - Processors processors.PluginConfig `config:"processors"` - }{} - err := source.UnpackMetricSetConfiguration(r, "unpack", "withprocessors", &config) - require.NoError(t, err) - - p, err := processors.New(config.Processors) - require.NoError(t, err) - - assert.Len(t, p.List, 1) - assert.Equal(t, "add_id=[target_field=[@metadata.id]]", p.List[0].String()) -} - -func TestUnpackMetricSetConfiguration_ProcessorsUndefined(t *testing.T) { - logp.TestingSetup() - - r := NewRegister() - r.MustAddMetricSet("foo", "bar", newMetricSetWithOption) - source := NewLightModulesSource("testdata/lightmodules") - r.SetSecondarySource(source) - - config := struct { - Processors processors.PluginConfig `config:"processors"` - }{} - err := source.UnpackMetricSetConfiguration(r, "unpack", "noprocessors", &config) - require.NoError(t, err) - - p, err := processors.New(config.Processors) - require.NoError(t, err) - - assert.Len(t, p.List, 0) -} - type metricSetWithOption struct { BaseMetricSet Option string diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index fb2f605d9233..2fc769a4a20c 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -123,7 +123,7 @@ type ModulesSource interface { HasMetricSet(module, name string) bool MetricSetRegistration(r *Register, module, name string) (MetricSetRegistration, error) ModulesInfo(r *Register) string - UnpackMetricSetConfiguration(r *Register, moduleName string, metricSetName string, cfg interface{}) error + ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) } // NewRegister creates and returns a new Register. @@ -381,15 +381,8 @@ func (r *Register) ProcessorsForMetricSet(moduleName, metricSetName string) (*pr } } - if source := r.secondarySource; source != nil && source.HasMetricSet(moduleName, metricSetName) { - config := struct { - Processors processors.PluginConfig `config:"processors"` - }{} - err := source.UnpackMetricSetConfiguration(r, moduleName, metricSetName, &config) - if err != nil { - return nil, err - } - return processors.New(config.Processors) + if source := r.secondarySource; source != nil { + return source.ProcessorsForMetricSet(r, moduleName, metricSetName) } return nil, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, metricSetName, moduleName) } From 8b4c4ab5837f41c32bff156f812581aa37b0d656 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 17:07:41 +0100 Subject: [PATCH 12/25] Fix: mage check --- metricbeat/mb/lightmetricset.go | 3 ++- metricbeat/mb/lightmodules.go | 3 ++- metricbeat/mb/lightmodules_test.go | 1 - 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/metricbeat/mb/lightmetricset.go b/metricbeat/mb/lightmetricset.go index 37052601ad84..07fc92e0a71a 100644 --- a/metricbeat/mb/lightmetricset.go +++ b/metricbeat/mb/lightmetricset.go @@ -18,9 +18,10 @@ package mb import ( - "github.com/elastic/beats/libbeat/processors" "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/common" ) diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index aa0b78d7d82c..e6f53c0bc1c1 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -19,12 +19,13 @@ package mb import ( "fmt" - "github.com/elastic/beats/libbeat/processors" "io/ioutil" "os" "path/filepath" "strings" + "github.com/elastic/beats/libbeat/processors" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index f24fab1c0aae..c4c7ce891e2e 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/processors" _ "github.com/elastic/beats/libbeat/processors/add_id" "github.com/stretchr/testify/assert" From fe499b31b0d6c2aec579f5be2998fca740830db5 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Wed, 29 Jan 2020 18:07:51 +0100 Subject: [PATCH 13/25] Adjust code after review --- metricbeat/beater/metricbeat.go | 2 +- metricbeat/mb/module/connector.go | 12 +++- metricbeat/mb/module/example_test.go | 2 +- metricbeat/mb/module/factory.go | 19 +++--- metricbeat/mb/module/publish.go | 33 +--------- metricbeat/mb/module/runner.go | 36 ++-------- metricbeat/mb/module/runner_group.go | 60 +++++++++++++++++ metricbeat/mb/module/runner_test.go | 65 ++++--------------- metricbeat/mb/module/wrapper.go | 19 ++++++ .../module/activemq/broker/manifest.yml | 2 +- 10 files changed, 125 insertions(+), 125 deletions(-) create mode 100644 metricbeat/mb/module/runner_group.go diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 695bf5f96109..8d27d8066664 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -227,7 +227,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { return err } - r := module.NewRunnerForStaticModule(client, m.module) + r := module.NewRunner(client, m.module) r.Start() wg.Add(1) go func() { diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 0637d4759318..15456c160a8e 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -83,7 +83,17 @@ func (c *Connector) UseMetricSetProcessors(r metricSetRegister, moduleName, metr return errors.Wrapf(err, "reading metricset processors failed (module: %s, metricset: %s)", moduleName, metricSetName) } - c.processors.AddProcessors(*metricSetProcessors) + + if metricSetProcessors == nil || len(metricSetProcessors.List) == 0 { + return nil // no processors are defined + } + + procs := processors.NewList(nil) + procs.AddProcessors(*metricSetProcessors) + for _, p := range c.processors.List { + procs.AddProcessor(p) + } + c.processors = procs return nil } diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index 6638056032cb..e5fc7e81e830 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -143,7 +143,7 @@ func ExampleRunner() { } // Create the Runner facade. - runner := module.NewRunnerForStaticModule(client, m) + runner := module.NewRunner(client, m) // Start the module and have it publish to a new publisher.Client. runner.Start() diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index e39b81e23a06..2afc72b4b07f 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -18,8 +18,6 @@ package module import ( - "strings" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -43,29 +41,32 @@ func NewFactory(beatInfo beat.Info, options ...Option) *Factory { // Create creates a new metricbeat module runner reporting events to the passed pipeline. func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - w, err := NewWrapper(c, mb.Registry, r.options...) + module, metricSets, err := mb.NewModule(c, mb.Registry) if err != nil { return nil, err } - clients := map[string]beat.Client{} + var runners []Runner + for _, metricSet := range metricSets { + wrapper, err := NewWrapperForMetricSet(module, metricSet, r.options...) + if err != nil { + return nil, err + } - for _, msw := range w.MetricSets() { connector, err := NewConnector(r.beatInfo, p, c, meta) if err != nil { return nil, err } - connector.UseMetricSetProcessors(mb.Registry, msw.Name(), msw.MetricSet.Name()) + connector.UseMetricSetProcessors(mb.Registry, module.Name(), metricSet.Name()) client, err := connector.Connect() if err != nil { return nil, err } - clients[strings.ToLower(msw.MetricSet.Name())] = client + runners = append(runners, NewRunner(client, wrapper)) } - mr := NewRunner(clients, w) - return mr, nil + return newRunnerGroup(runners), nil } // CheckConfig checks if a config is valid or not diff --git a/metricbeat/mb/module/publish.go b/metricbeat/mb/module/publish.go index 25e75b3da357..40ce01549fe4 100644 --- a/metricbeat/mb/module/publish.go +++ b/metricbeat/mb/module/publish.go @@ -21,11 +21,8 @@ import ( "sync" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/logp" ) -const staticModuleSink = "@static" - // PublishChannels publishes the events read from each channel to the given // publisher client. If the publisher client blocks for any reason then events // will not be read from the given channels. @@ -34,40 +31,14 @@ const staticModuleSink = "@static" // and are fully read. To stop the method immediately, close the channels and // close the publisher client to ensure that publishing does not block. This // may result is some events being discarded. -func PublishChannels(clients map[string]beat.Client, cs ...<-chan beat.Event) { +func PublishChannels(client beat.Client, cs ...<-chan beat.Event) { var wg sync.WaitGroup // output publishes values from c until c is closed, then calls wg.Done. output := func(c <-chan beat.Event) { defer wg.Done() - logger := logp.NewLogger("PublishChannels") - - sink, staticModule := clients[staticModuleSink] - for event := range c { - if staticModule { - sink.Publish(event) - continue - } - - v, err := event.Fields.GetValue("metricset.name") - if err != nil { - logger.Errorf("Error occurred while retrieving key 'metricset': %v", err) - continue - } - - metricSetName, ok := v.(string) - if !ok { - logger.Error("Non-string type of 'metricset'") - continue - } - - if _, ok := clients[metricSetName]; !ok { - logger.Errorf("Non-registered metricset client (name: %s)", metricSetName) - continue - } - - clients[metricSetName].Publish(event) + client.Publish(event) } } diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 938edf77e267..c60a009caea3 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -21,8 +21,6 @@ import ( "fmt" "sync" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/monitoring" ) @@ -55,37 +53,23 @@ type Runner interface { } // NewRunner returns a Runner facade. The events generated by -// the Module will be published to one of new publisher.Clients generated from the -// pubClientFactory. -func NewRunner(clients map[string]beat.Client, mod *Wrapper) Runner { - return &runner{ - logger: logp.NewLogger("runner"), - done: make(chan struct{}), - mod: mod, - clients: clients, - } -} - -// NewRunnerForStaticModule returns a Runner facade. The events generated by // the Module will be published to a new publisher.Client generated from the // pubClientFactory. -func NewRunnerForStaticModule(client beat.Client, mod *Wrapper) Runner { +func NewRunner(client beat.Client, mod *Wrapper) Runner { return &runner{ - logger: logp.NewLogger("runner"), - done: make(chan struct{}), - mod: mod, - clients: map[string]beat.Client{staticModuleSink: client}, + done: make(chan struct{}), + mod: mod, + client: client, } } type runner struct { - logger *logp.Logger done chan struct{} wg sync.WaitGroup startOnce sync.Once stopOnce sync.Once mod *Wrapper - clients map[string]beat.Client + client beat.Client } func (mr *runner) Start() { @@ -95,7 +79,7 @@ func (mr *runner) Start() { moduleList.Add(mr.mod.Name()) go func() { defer mr.wg.Done() - PublishChannels(mr.clients, output) + PublishChannels(mr.client, output) }() }) } @@ -103,13 +87,7 @@ func (mr *runner) Start() { func (mr *runner) Stop() { mr.stopOnce.Do(func() { close(mr.done) - - for metricSetName, client := range mr.clients { - err := client.Close() - if err != nil { - mr.logger.Errorf("Error occurred while stopping runner (name: %s): %v", metricSetName, err) - } - } + mr.client.Close() mr.wg.Wait() moduleList.Remove(mr.mod.Name()) }) diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go new file mode 100644 index 000000000000..34d485046198 --- /dev/null +++ b/metricbeat/mb/module/runner_group.go @@ -0,0 +1,60 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "strings" + "sync" +) + +type runnerGroup struct { + runners []Runner + + startOnce sync.Once + stopOnce sync.Once +} + +func newRunnerGroup(runners []Runner) Runner { + return &runnerGroup{ + runners: runners, + } +} + +func (rg *runnerGroup) Start() { + rg.startOnce.Do(func() { + for _, runner := range rg.runners { + runner.Start() + } + }) +} + +func (rg *runnerGroup) Stop() { + rg.stopOnce.Do(func() { + for _, runner := range rg.runners { + runner.Stop() + } + }) +} + +func (rg *runnerGroup) String() string { + var entries []string + for _, runner := range rg.runners { + entries = append(entries, runner.String()) + } + return "RunnerGroup: " + strings.Join(entries, ", ") +} diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index 8e124b2047f0..140e6333400d 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -20,34 +20,36 @@ package module_test import ( - "strings" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" pubtest "github.com/elastic/beats/libbeat/publisher/testing" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" + + "github.com/stretchr/testify/assert" ) -func TestRunnerForStaticModule(t *testing.T) { - pubClient, factory := newPubClientFactoryForStaticModule() +func TestRunner(t *testing.T) { + pubClient, factory := newPubClientFactory() config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, "metricsets": []string{eventFetcherName}, }) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } // Create a new Wrapper based on the configuration. m, err := module.NewWrapper(config, mb.Registry, module.WithMetricSetInfo()) - require.NoError(t, err) + if err != nil { + t.Fatal(err) + } // Create the Runner facade. - runner := module.NewRunnerForStaticModule(factory(), m) + runner := module.NewRunner(factory(), m) // Start the module and have it publish to a new publisher.Client. runner.Start() @@ -59,51 +61,10 @@ func TestRunnerForStaticModule(t *testing.T) { runner.Stop() } -// newPubClientFactoryForStaticModule returns a new ChanClient and a function that returns +// newPubClientFactory returns a new ChanClient and a function that returns // the same Client when invoked. This simulates the return value of // Publisher.Connect. -func newPubClientFactoryForStaticModule() (*pubtest.ChanClient, func() beat.Client) { +func newPubClientFactory() (*pubtest.ChanClient, func() beat.Client) { client := pubtest.NewChanClient(10) return client, func() beat.Client { return client } } - -func TestRunner(t *testing.T) { - pubClients, factory := newPubClientFactory() - - config, err := common.NewConfigFrom(map[string]interface{}{ - "module": moduleName, - "metricsets": []string{eventFetcherName, reportingFetcherName}, - }) - require.NoError(t, err) - - // Create a new Wrapper based on the configuration. - m, err := module.NewWrapper(config, mb.Registry, module.WithMetricSetInfo()) - require.NoError(t, err) - - // Create the Runner facade. - runner := module.NewRunner(factory(), m) - - // Start the module and have it publish to a new publisher.Client. - runner.Start() - - assert.NotNil(t, <-pubClients[0].Channel) - assert.NotNil(t, <-pubClients[1].Channel) - - // Stop the module. This blocks until all MetricSets in the Module have - // stopped and the publisher.Client is closed. - runner.Stop() -} - -// newPubClientFactory returns new ChanClients and a function that returns -// the same Clients when invoked. This simulates the return value of -// Publisher.Connect. -func newPubClientFactory() ([]*pubtest.ChanClient, func() map[string]beat.Client) { - firstClient := pubtest.NewChanClient(10) - secondClient := pubtest.NewChanClient(10) - return []*pubtest.ChanClient{firstClient, secondClient}, func() map[string]beat.Client { - return map[string]beat.Client{ - strings.ToLower(eventFetcherName): firstClient, - strings.ToLower(reportingFetcherName): secondClient, - } - } -} diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index e0bd7d12dbda..eaf4c2700c87 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -105,6 +105,25 @@ func NewWrapper(config *common.Config, r *mb.Register, options ...Option) (*Wrap return wrapper, nil } +// NewWrapperForMetricSet creates a wrapper for the selected module and metricset. +func NewWrapperForMetricSet(module mb.Module, metricSet mb.MetricSet, options ...Option) (*Wrapper, error) { + wrapper := &Wrapper{ + Module: module, + metricSets: make([]*metricSetWrapper, 1), + } + + for _, applyOption := range options { + applyOption(wrapper) + } + + wrapper.metricSets[0] = &metricSetWrapper{ + MetricSet: metricSet, + module: wrapper, + stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), + } + return wrapper, nil +} + // Wrapper methods // Start starts the Module's MetricSet workers which are responsible for diff --git a/x-pack/metricbeat/module/activemq/broker/manifest.yml b/x-pack/metricbeat/module/activemq/broker/manifest.yml index 8b2746182329..bb62a9adc13b 100644 --- a/x-pack/metricbeat/module/activemq/broker/manifest.yml +++ b/x-pack/metricbeat/module/activemq/broker/manifest.yml @@ -38,7 +38,7 @@ processors: } var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temlep_pct != null) { + if (broker_memory_temp_pct != null) { event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) } From a232a7740cc54bcd66b1b0b8eb02280a785ae9e6 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 07:54:58 +0100 Subject: [PATCH 14/25] Adjust code after review --- metricbeat/mb/lightmodules.go | 6 ++- metricbeat/mb/lightmodules_test.go | 6 +-- metricbeat/mb/module/wrapper.go | 40 +++++++------------ metricbeat/mb/registry.go | 19 +++++---- .../module/activemq/docker-compose.yml | 2 +- 5 files changed, 31 insertions(+), 42 deletions(-) diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index e6f53c0bc1c1..632912a21227 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -24,12 +24,11 @@ import ( "path/filepath" "strings" - "github.com/elastic/beats/libbeat/processors" - "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) const ( @@ -155,6 +154,9 @@ type lightModuleConfig struct { // ProcessorsForMetricSet returns processors defined for the light metricset. func (s *LightModulesSource) ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) { module, err := s.loadModule(r, moduleName) + if err != nil { + return nil, errors.Wrapf(err, "reading processors for metricset '%s' in module '%s' failed", metricSetName, moduleName) + } metricSet, ok := module.MetricSets[metricSetName] if !ok { return nil, errors.Wrapf(err, "unknown metricset '%s' in module '%s'", metricSetName, moduleName) diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index c4c7ce891e2e..ecc6f718e597 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -23,12 +23,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" _ "github.com/elastic/beats/libbeat/processors/add_id" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // TestLightModulesAsModuleSource checks that registry correctly lists diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index eaf4c2700c87..dfb547dcd27d 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -78,48 +78,36 @@ type stats struct { events *monitoring.Int // Total events published. } -// NewWrapper create a new Module and its associated MetricSets based -// on the given configuration. +// NewWrapper creates a new module and its associated metricsets based on the given configuration. func NewWrapper(config *common.Config, r *mb.Register, options ...Option) (*Wrapper, error) { - module, metricsets, err := mb.NewModule(config, r) + module, metricSets, err := mb.NewModule(config, r) if err != nil { return nil, err } - - wrapper := &Wrapper{ - Module: module, - metricSets: make([]*metricSetWrapper, len(metricsets)), - } - for _, applyOption := range options { - applyOption(wrapper) - } - - for i, ms := range metricsets { - wrapper.metricSets[i] = &metricSetWrapper{ - MetricSet: ms, - module: wrapper, - stats: getMetricSetStats(wrapper.Name(), ms.Name()), - } - } - - return wrapper, nil + return createWrapper(module, metricSets, options...) } // NewWrapperForMetricSet creates a wrapper for the selected module and metricset. func NewWrapperForMetricSet(module mb.Module, metricSet mb.MetricSet, options ...Option) (*Wrapper, error) { + return createWrapper(module, []mb.MetricSet{metricSet}, options...) +} + +func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Option) (*Wrapper, error) { wrapper := &Wrapper{ Module: module, - metricSets: make([]*metricSetWrapper, 1), + metricSets: make([]*metricSetWrapper, len(metricSets)), } for _, applyOption := range options { applyOption(wrapper) } - wrapper.metricSets[0] = &metricSetWrapper{ - MetricSet: metricSet, - module: wrapper, - stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), + for i, metricSet := range metricSets { + wrapper.metricSets[i] = &metricSetWrapper{ + MetricSet: metricSet, + module: wrapper, + stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), + } } return wrapper, nil } diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index 2fc769a4a20c..f280b2d1970c 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -23,11 +23,10 @@ import ( "strings" "sync" - "github.com/elastic/beats/libbeat/processors" - "github.com/pkg/errors" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) const initialSize = 20 // initialSize specifies the initial size of the Register. @@ -123,7 +122,7 @@ type ModulesSource interface { HasMetricSet(module, name string) bool MetricSetRegistration(r *Register, module, name string) (MetricSetRegistration, error) ModulesInfo(r *Register) string - ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) + ProcessorsForMetricSet(r *Register, module, name string) (*processors.Processors, error) } // NewRegister creates and returns a new Register. @@ -366,25 +365,25 @@ func (r *Register) MetricSets(module string) []string { } // ProcessorsForMetricSet returns a list of processors defined in manifest of the registered metricset. -func (r *Register) ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) { +func (r *Register) ProcessorsForMetricSet(module, name string) (*processors.Processors, error) { r.lock.RLock() defer r.lock.RUnlock() - moduleName = strings.ToLower(moduleName) - metricSetName = strings.ToLower(metricSetName) + module = strings.ToLower(module) + name = strings.ToLower(name) - metricSets, exists := r.metricSets[moduleName] + metricSets, exists := r.metricSets[module] if exists { - _, exists := metricSets[metricSetName] + _, exists := metricSets[name] if exists { return nil, nil // Standard metric sets don't have processor definitions. } } if source := r.secondarySource; source != nil { - return source.ProcessorsForMetricSet(r, moduleName, metricSetName) + return source.ProcessorsForMetricSet(r, module, name) } - return nil, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, metricSetName, moduleName) + return nil, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, name, module) } // SetSecondarySource sets an additional source of modules diff --git a/x-pack/metricbeat/module/activemq/docker-compose.yml b/x-pack/metricbeat/module/activemq/docker-compose.yml index 1e1472c652ed..63f32bb0b81d 100644 --- a/x-pack/metricbeat/module/activemq/docker-compose.yml +++ b/x-pack/metricbeat/module/activemq/docker-compose.yml @@ -8,5 +8,5 @@ services: args: ACTIVEMQ_VERSION: ${ACTIVEMQ_VERSION:-5.15.9} ports: - - 8161:8161 + - 8161 - 61613 From fe1d63cd387eb528958e3ee90b658d00ea60a3ed Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 08:16:42 +0100 Subject: [PATCH 15/25] Fix: check error --- metricbeat/mb/module/factory.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 2afc72b4b07f..08aafb36875f 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -57,7 +57,11 @@ func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP if err != nil { return nil, err } - connector.UseMetricSetProcessors(mb.Registry, module.Name(), metricSet.Name()) + + err = connector.UseMetricSetProcessors(mb.Registry, module.Name(), metricSet.Name()) + if err != nil { + return nil, err + } client, err := connector.Connect() if err != nil { From 3185a0c42f13fd26c649b0fb5462c59824149625 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 08:18:06 +0100 Subject: [PATCH 16/25] Fix: imports --- metricbeat/mb/lightmetricset.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metricbeat/mb/lightmetricset.go b/metricbeat/mb/lightmetricset.go index 07fc92e0a71a..dee1d0afb7ba 100644 --- a/metricbeat/mb/lightmetricset.go +++ b/metricbeat/mb/lightmetricset.go @@ -20,9 +20,8 @@ package mb import ( "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" ) // LightMetricSet contains the definition of a non-registered metric set From b717463bd4d483284b41f33eec77e19d291ff113 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 10:55:04 +0100 Subject: [PATCH 17/25] Increase test coverage --- metricbeat/mb/lightmodules.go | 2 +- metricbeat/mb/lightmodules_test.go | 16 +++++ metricbeat/mb/module/runner_group.go | 2 + metricbeat/mb/module/runner_group_test.go | 88 +++++++++++++++++++++++ metricbeat/mb/module/wrapper_test.go | 73 ++++++++++++------- 5 files changed, 153 insertions(+), 28 deletions(-) create mode 100644 metricbeat/mb/module/runner_group_test.go diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index 632912a21227..0f0fb9f23cfa 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -159,7 +159,7 @@ func (s *LightModulesSource) ProcessorsForMetricSet(r *Register, moduleName stri } metricSet, ok := module.MetricSets[metricSetName] if !ok { - return nil, errors.Wrapf(err, "unknown metricset '%s' in module '%s'", metricSetName, moduleName) + return nil, fmt.Errorf("unknown metricset '%s' in module '%s'", metricSetName, moduleName) } return processors.New(metricSet.Processors) } diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index ecc6f718e597..e987a8eba7e3 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -303,6 +303,22 @@ func TestNewModulesCallModuleFactory(t *testing.T) { assert.True(t, called, "module factory must be called if registered") } +func TestProcessorsForMetricSet_UnknownModule(t *testing.T) { + r := NewRegister() + source := NewLightModulesSource("testdata/lightmodules") + procs, err := source.ProcessorsForMetricSet(r, "nonexisting", "fake") + require.Error(t, err) + require.Nil(t, procs) +} + +func TestProcessorsForMetricSet_UnknownMetricSet(t *testing.T) { + r := NewRegister() + source := NewLightModulesSource("testdata/lightmodules") + procs, err := source.ProcessorsForMetricSet(r, "unpack", "nonexisting") + require.Error(t, err) + require.Nil(t, procs) +} + type metricSetWithOption struct { BaseMetricSet Option string diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index 34d485046198..326e995654e6 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,6 +23,8 @@ import ( ) type runnerGroup struct { + Runner + runners []Runner startOnce sync.Once diff --git a/metricbeat/mb/module/runner_group_test.go b/metricbeat/mb/module/runner_group_test.go new file mode 100644 index 000000000000..8c07cc749357 --- /dev/null +++ b/metricbeat/mb/module/runner_group_test.go @@ -0,0 +1,88 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common/atomic" +) + +const ( + fakeRunnersNum = 3 + fakeRunnerName = "fakeRunner" +) + +type fakeRunner struct { + id int + + startCounter *atomic.Int + stopCounter *atomic.Int +} + +func (fr *fakeRunner) Start() { + if fr.startCounter != nil { + fr.startCounter.Inc() + } +} + +func (fr *fakeRunner) Stop() { + if fr.stopCounter != nil { + fr.stopCounter.Inc() + } +} + +func (fr *fakeRunner) String() string { + return fmt.Sprintf("%s-%d", fakeRunnerName, fr.id) +} + +func TestStartStop(t *testing.T) { + startCounter := atomic.NewInt(0) + stopCounter := atomic.NewInt(0) + + var runners []Runner + for i := 0; i < fakeRunnersNum; i++ { + runners = append(runners, &fakeRunner{ + id: i, + startCounter: startCounter, + stopCounter: stopCounter, + }) + } + + runnerGroup := newRunnerGroup(runners) + runnerGroup.Start() + + runnerGroup.Stop() + + assert.Equal(t, fakeRunnersNum, startCounter.Load()) + assert.Equal(t, fakeRunnersNum, stopCounter.Load()) +} + +func TestString(t *testing.T) { + var runners []Runner + for i := 0; i < fakeRunnersNum; i++ { + runners = append(runners, &fakeRunner{ + id: i, + }) + } + runnerGroup := newRunnerGroup(runners) + assert.Equal(t, "RunnerGroup: fakeRunner-0, fakeRunner-1, fakeRunner-2", runnerGroup.String()) +} diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index 8901937fadc4..6bd8285e8d32 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -23,11 +23,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" - - "github.com/stretchr/testify/assert" ) const ( @@ -107,24 +108,18 @@ func newFakePushMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { func newTestRegistry(t testing.TB) *mb.Register { r := mb.NewRegister() - if err := r.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher); err != nil { - t.Fatal(err) - } - if err := r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher); err != nil { - t.Fatal(err) - } - if err := r.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet); err != nil { - t.Fatal(err) - } - + err := r.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher) + require.NoError(t, err) + err = r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher) + require.NoError(t, err) + err = r.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet) + require.NoError(t, err) return r } func newConfig(t testing.TB, moduleConfig interface{}) *common.Config { config, err := common.NewConfigFrom(moduleConfig) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) return config } @@ -139,9 +134,7 @@ func TestWrapperOfEventFetcher(t *testing.T) { }) m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) output := m.Start(done) @@ -172,9 +165,7 @@ func TestWrapperOfReportingFetcher(t *testing.T) { }) m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) output := m.Start(done) @@ -205,9 +196,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) { }) m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) output := m.Start(done) @@ -254,9 +243,7 @@ func TestPeriodIsAddedToEvent(t *testing.T) { }) m, err := module.NewWrapper(config, registry, module.WithMetricSetInfo()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) defer close(done) @@ -270,3 +257,35 @@ func TestPeriodIsAddedToEvent(t *testing.T) { }) } } + +func TestNewWrapperForMetricSet(t *testing.T) { + hosts := []string{"alpha"} + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{eventFetcherName}, + "hosts": hosts, + }) + + aModule, metricSets, err := mb.NewModule(c, newTestRegistry(t)) + require.NoError(t, err) + + m, err := module.NewWrapperForMetricSet(aModule, metricSets[0], module.WithMetricSetInfo()) + require.NoError(t, err) + + done := make(chan struct{}) + output := m.Start(done) + + <-output + close(done) + + // Validate that the channel is closed after receiving the event. + select { + case _, ok := <-output: + if !ok { + // Channel is closed. + return + } else { + assert.Fail(t, "received unexpected event") + } + } +} From b60d8d545c64d27d5bfab6e48095ff4c1dd554e3 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 11:39:01 +0100 Subject: [PATCH 18/25] Add unit tests --- metricbeat/mb/lightmodules_test.go | 9 ++++ metricbeat/mb/registry.go | 2 +- metricbeat/mb/registry_test.go | 86 ++++++++++++++---------------- 3 files changed, 51 insertions(+), 46 deletions(-) diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index e987a8eba7e3..a1bb999f8b1c 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -319,6 +319,15 @@ func TestProcessorsForMetricSet_UnknownMetricSet(t *testing.T) { require.Nil(t, procs) } +func TestProcessorsForMetricSet_ProcessorsRead(t *testing.T) { + r := NewRegister() + source := NewLightModulesSource("testdata/lightmodules") + procs, err := source.ProcessorsForMetricSet(r, "unpack", "withprocessors") + require.NoError(t, err) + require.NotNil(t, procs) + require.Len(t, procs.List, 1) +} + type metricSetWithOption struct { BaseMetricSet Option string diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index f280b2d1970c..c7b13b3a60bc 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -376,7 +376,7 @@ func (r *Register) ProcessorsForMetricSet(module, name string) (*processors.Proc if exists { _, exists := metricSets[name] if exists { - return nil, nil // Standard metric sets don't have processor definitions. + return processors.NewList(nil), nil // Standard metricsets don't have processor definitions. } } diff --git a/metricbeat/mb/registry_test.go b/metricbeat/mb/registry_test.go index eb0a6a32fdab..f4ba90f4dd40 100644 --- a/metricbeat/mb/registry_test.go +++ b/metricbeat/mb/registry_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -52,9 +53,7 @@ func TestAddModuleNilFactory(t *testing.T) { func TestAddModuleDuplicateName(t *testing.T) { registry := NewRegister() err := registry.AddModule(moduleName, fakeModuleFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = registry.AddModule(moduleName, fakeModuleFactory) if assert.Error(t, err) { @@ -65,9 +64,7 @@ func TestAddModuleDuplicateName(t *testing.T) { func TestAddModule(t *testing.T) { registry := NewRegister() err := registry.AddModule(moduleName, fakeModuleFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) factory, found := registry.modules[moduleName] assert.True(t, found, "module not found") assert.NotNil(t, factory, "factory fuction is nil") @@ -100,9 +97,7 @@ func TestAddMetricSetNilFactory(t *testing.T) { func TestAddMetricSetDuplicateName(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) if assert.Error(t, err) { @@ -113,9 +108,7 @@ func TestAddMetricSetDuplicateName(t *testing.T) { func TestAddMetricSet(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) f, found := registry.metricSets[moduleName][metricSetName] assert.True(t, found, "metricset not found") assert.NotNil(t, f, "factory function is nil") @@ -139,14 +132,10 @@ func TestMetricSetFactory(t *testing.T) { t.Run("without HostParser", func(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Equal(t, metricSetName, reg.Name) assert.NotNil(t, reg.Factory) assert.Nil(t, reg.HostParser) @@ -158,14 +147,10 @@ func TestMetricSetFactory(t *testing.T) { registry := NewRegister() hostParser := func(Module, string) (HostData, error) { return HostData{}, nil } err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory, hostParser) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.NotNil(t, reg.HostParser) // Can't compare functions in Go so just check for non-nil. }) @@ -173,14 +158,10 @@ func TestMetricSetFactory(t *testing.T) { registry := NewRegister() hostParser := func(Module, string) (HostData, error) { return HostData{}, nil } err := registry.addMetricSet(moduleName, metricSetName, fakeMetricSetFactory, WithHostParser(hostParser)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.NotNil(t, reg.HostParser) // Can't compare functions in Go so just check for non-nil. }) @@ -189,14 +170,10 @@ func TestMetricSetFactory(t *testing.T) { registry := NewRegister() err := registry.addMetricSet(moduleName, metricSetName, fakeMetricSetFactory, WithNamespace(ns)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Equal(t, metricSetName, reg.Name) assert.NotNil(t, reg.Factory) assert.Nil(t, reg.HostParser) @@ -208,23 +185,17 @@ func TestMetricSetFactory(t *testing.T) { func TestDefaultMetricSet(t *testing.T) { registry := NewRegister() err := registry.addMetricSet(moduleName, metricSetName, fakeMetricSetFactory, DefaultMetricSet()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) names, err := registry.DefaultMetricSets(moduleName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Contains(t, names, metricSetName) } func TestMetricSetQuery(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) metricsets := registry.MetricSets(moduleName) assert.Equal(t, len(metricsets), 1) @@ -242,3 +213,28 @@ func TestModuleQuery(t *testing.T) { assert.Equal(t, len(modules), 1) assert.Equal(t, modules[0], moduleName) } + +func TestProcessorsForMetricSet_StandardMetricSet(t *testing.T) { + registry := NewRegister() + err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) + procs, err := registry.ProcessorsForMetricSet(moduleName, metricSetName) + require.NotNil(t, procs) + require.Empty(t, procs.List) + require.NoError(t, err) +} + +func TestProcessorsForMetricSet_UndefinedSecondarySource(t *testing.T) { + registry := NewRegister() + procs, err := registry.ProcessorsForMetricSet(moduleName, metricSetName) + require.Nil(t, procs) + require.Error(t, err) +} + +func TestProcessorsForMetricSet_FromSource(t *testing.T) { + registry := NewRegister() + registry.SetSecondarySource(NewLightModulesSource("testdata/lightmodules")) + procs, err := registry.ProcessorsForMetricSet("unpack", "withprocessors") + require.NoError(t, err) + require.NotNil(t, procs) + require.Len(t, procs.List, 1) +} From 8bc10f04122d2db4f806aac61ab9211ad4cd61bb Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 11:41:32 +0100 Subject: [PATCH 19/25] Fix: hound --- metricbeat/mb/module/wrapper_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index 6bd8285e8d32..b83db1ff6bf8 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -282,10 +282,8 @@ func TestNewWrapperForMetricSet(t *testing.T) { select { case _, ok := <-output: if !ok { - // Channel is closed. - return - } else { - assert.Fail(t, "received unexpected event") + return // Channel is closed. } + assert.Fail(t, "received unexpected event") } } From 9e39363bd65505be71938fd6b6a1b8b8627ac061 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 14:09:44 +0100 Subject: [PATCH 20/25] beater: use factory --- metricbeat/beater/metricbeat.go | 71 +++++++++++---------------------- 1 file changed, 23 insertions(+), 48 deletions(-) diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 8d27d8066664..f2a9fbc75576 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -20,18 +20,16 @@ package beater import ( "sync" - "github.com/elastic/beats/libbeat/common/reload" - "github.com/elastic/beats/libbeat/management" - "github.com/elastic/beats/libbeat/paths" - - "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/management" + "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" @@ -44,8 +42,8 @@ import ( // Metricbeat implements the Beater interface for metricbeat. type Metricbeat struct { - done chan struct{} // Channel used to initiate shutdown. - modules []staticModule // Active list of modules. + done chan struct{} // Channel used to initiate shutdown. + runners []module.Runner // Active list of module runners. config Config autodiscover *autodiscover.Autodiscover @@ -53,11 +51,6 @@ type Metricbeat struct { moduleOptions []module.Option } -type staticModule struct { - connector *module.Connector - module *module.Wrapper -} - // Option specifies some optional arguments used for configuring the behavior // of the Metricbeat framework. type Option func(mb *Metricbeat) @@ -162,46 +155,28 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe moduleOptions := append( []module.Option{module.WithMaxStartDelay(config.MaxStartDelay)}, metricbeat.moduleOptions...) - var errs multierror.Errors + + factory := module.NewFactory(b.Info, moduleOptions...) + for _, moduleCfg := range config.Modules { if !moduleCfg.Enabled() { continue } - failed := false - - connector, err := module.NewConnector(b.Info, b.Publisher, moduleCfg, nil) - if err != nil { - errs = append(errs, err) - failed = true - } - - module, err := module.NewWrapper(moduleCfg, mb.Registry, moduleOptions...) + runner, err := factory.Create(b.Publisher, moduleCfg, nil) if err != nil { - errs = append(errs, err) - failed = true - } - - if failed { - continue + return nil, err } - metricbeat.modules = append(metricbeat.modules, staticModule{ - connector: connector, - module: module, - }) + metricbeat.runners = append(metricbeat.runners, runner) } - if err := errs.Err(); err != nil { - return nil, err - } - if len(metricbeat.modules) == 0 && !dynamicCfgEnabled { + if len(metricbeat.runners) == 0 && !dynamicCfgEnabled { return nil, mb.ErrAllModulesDisabled } if config.Autodiscover != nil { var err error - factory := module.NewFactory(b.Info, metricbeat.moduleOptions...) adapter := autodiscover.NewFactoryAdapter(factory) metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) if err != nil { @@ -220,20 +195,16 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe func (bt *Metricbeat) Run(b *beat.Beat) error { var wg sync.WaitGroup - // Static modules (metricbeat.modules) - for _, m := range bt.modules { - client, err := m.connector.Connect() - if err != nil { - return err - } - - r := module.NewRunner(client, m.module) + // Static modules (metricbeat.runners) + for _, r := range bt.runners { r.Start() wg.Add(1) + + thatRunner := r go func() { defer wg.Done() <-bt.done - r.Stop() + thatRunner.Stop() }() } @@ -293,8 +264,12 @@ func (bt *Metricbeat) Stop() { // under dynamic config settings. func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) { var modules []*module.Wrapper - for _, m := range bt.modules { - modules = append(modules, m.module) + for _, moduleCfg := range bt.config.Modules { + module, err := module.NewWrapper(moduleCfg, mb.Registry, nil) + if err != nil { + return nil, err + } + modules = append(modules, module) } // Add dynamic modules From 282744140bab0a9147e7defb5de3747d84e86079 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 14:49:18 +0100 Subject: [PATCH 21/25] Beater: modules --- metricbeat/beater/metricbeat.go | 39 +--------------- metricbeat/mb/module/configuration.go | 65 +++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 37 deletions(-) create mode 100644 metricbeat/mb/module/configuration.go diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index f2a9fbc75576..f7f79f7e42a0 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -260,42 +260,7 @@ func (bt *Metricbeat) Stop() { close(bt.done) } -// Modules return a list of all configured modules, including anyone present -// under dynamic config settings. +// Modules return a list of all configured modules. func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) { - var modules []*module.Wrapper - for _, moduleCfg := range bt.config.Modules { - module, err := module.NewWrapper(moduleCfg, mb.Registry, nil) - if err != nil { - return nil, err - } - modules = append(modules, module) - } - - // Add dynamic modules - if bt.config.ConfigModules.Enabled() { - config := cfgfile.DefaultDynamicConfig - bt.config.ConfigModules.Unpack(&config) - - modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") - if err != nil { - return nil, errors.Wrap(err, "initialization error") - } - - for _, file := range modulesManager.ListEnabled() { - confs, err := cfgfile.LoadList(file.Path) - if err != nil { - return nil, errors.Wrap(err, "error loading config files") - } - for _, conf := range confs { - m, err := module.NewWrapper(conf, mb.Registry, bt.moduleOptions...) - if err != nil { - return nil, errors.Wrap(err, "module initialization error") - } - modules = append(modules, m) - } - } - } - - return modules, nil + return module.ConfiguredModules(bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions) } diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go new file mode 100644 index 000000000000..2ccd0bd99aca --- /dev/null +++ b/metricbeat/mb/module/configuration.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" +) + +// ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings. +func ConfiguredModules(modulesData []*common.Config, configModulesData *common.Config, moduleOptions []Option) ([]*Wrapper, error) { + var modules []*Wrapper + + for _, moduleCfg := range modulesData { + module, err := NewWrapper(moduleCfg, mb.Registry, nil) + if err != nil { + return nil, err + } + modules = append(modules, module) + } + + // Add dynamic modules + if configModulesData.Enabled() { + config := cfgfile.DefaultDynamicConfig + configModulesData.Unpack(&config) + + modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") + if err != nil { + return nil, errors.Wrap(err, "initialization error") + } + + for _, file := range modulesManager.ListEnabled() { + confs, err := cfgfile.LoadList(file.Path) + if err != nil { + return nil, errors.Wrap(err, "error loading config files") + } + for _, conf := range confs { + m, err := NewWrapper(conf, mb.Registry, moduleOptions...) + if err != nil { + return nil, errors.Wrap(err, "module initialization error") + } + modules = append(modules, m) + } + } + } + return modules, nil +} From cf816f3c7b6b4c8ef0412e016c68791c38014ddc Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 16:16:42 +0100 Subject: [PATCH 22/25] Fix: system tests --- metricbeat/mb/module/runner_group.go | 2 +- metricbeat/mb/module/runner_group_test.go | 2 +- metricbeat/tests/system/test_autodiscover.py | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index 326e995654e6..b402b4dc6559 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -58,5 +58,5 @@ func (rg *runnerGroup) String() string { for _, runner := range rg.runners { entries = append(entries, runner.String()) } - return "RunnerGroup: " + strings.Join(entries, ", ") + return "RunnerGroup{" + strings.Join(entries, ", ") + "}" } diff --git a/metricbeat/mb/module/runner_group_test.go b/metricbeat/mb/module/runner_group_test.go index 8c07cc749357..cbf2f310a87f 100644 --- a/metricbeat/mb/module/runner_group_test.go +++ b/metricbeat/mb/module/runner_group_test.go @@ -84,5 +84,5 @@ func TestString(t *testing.T) { }) } runnerGroup := newRunnerGroup(runners) - assert.Equal(t, "RunnerGroup: fakeRunner-0, fakeRunner-1, fakeRunner-2", runnerGroup.String()) + assert.Equal(t, "RunnerGroup{fakeRunner-0, fakeRunner-1, fakeRunner-2}", runnerGroup.String()) } diff --git a/metricbeat/tests/system/test_autodiscover.py b/metricbeat/tests/system/test_autodiscover.py index ae135a8f7fc5..db627c0516fe 100644 --- a/metricbeat/tests/system/test_autodiscover.py +++ b/metricbeat/tests/system/test_autodiscover.py @@ -41,12 +41,12 @@ def test_docker(self): docker_client.images.pull('memcached:latest') container = docker_client.containers.run('memcached:latest', detach=True) - self.wait_until(lambda: self.log_contains('Starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: RunnerGroup{memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: RunnerGroup{memcached')) output = self.read_output_json() proc.check_kill_and_wait() @@ -85,12 +85,12 @@ def test_docker_labels(self): } container = docker_client.containers.run('memcached:latest', labels=labels, detach=True) - self.wait_until(lambda: self.log_contains('Starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: RunnerGroup{memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: RunnerGroup{memcached')) output = self.read_output_json() proc.check_kill_and_wait() @@ -136,12 +136,12 @@ def test_config_appender(self): } container = docker_client.containers.run('memcached:latest', labels=labels, detach=True) - self.wait_until(lambda: self.log_contains('Starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: RunnerGroup{memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: RunnerGroup{memcached')) output = self.read_output_json() proc.check_kill_and_wait() From e82663b3c66d1a53c4abb0f853d2fbb3230892be Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 17:58:14 +0100 Subject: [PATCH 23/25] Fix: implements interface --- metricbeat/mb/module/runner_group.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index b402b4dc6559..e242a0281c85 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,14 +23,14 @@ import ( ) type runnerGroup struct { - Runner - runners []Runner startOnce sync.Once stopOnce sync.Once } +var _ Runner = new(runnerGroup) + func newRunnerGroup(runners []Runner) Runner { return &runnerGroup{ runners: runners, From 91b4896e1057d6b53b67132213c901bcb4889745 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 30 Jan 2020 19:40:14 +0100 Subject: [PATCH 24/25] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 628ce5512a97..ac6efd631ad2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `key/value` mode for SQL module. {issue}15770[15770] {pull]15845[15845] - Add STAN dashboard {pull}15654[15654] - Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822] +- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] *Packetbeat* From d61ca03e1b8570d1575a5fe549d0bdff5dea25ec Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Fri, 31 Jan 2020 12:32:52 +0100 Subject: [PATCH 25/25] Verify if processors are setup --- x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml | 4 ++++ x-pack/metricbeat/module/ibmmq/test_ibmmq.py | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml b/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml index f55504521d9a..21f660d8482a 100644 --- a/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml +++ b/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml @@ -14,6 +14,10 @@ processors: source: > function process(event) { var metrics = event.Get("prometheus.metrics"); + if (metrics == null) { + event.Cancel(); + return; + } Object.keys(metrics).forEach(function(key) { if (!(key.match(/^ibmmq_.*$/))) { event.Delete("prometheus.metrics." + key); diff --git a/x-pack/metricbeat/module/ibmmq/test_ibmmq.py b/x-pack/metricbeat/module/ibmmq/test_ibmmq.py index c882860d6bda..436f15aa7dba 100644 --- a/x-pack/metricbeat/module/ibmmq/test_ibmmq.py +++ b/x-pack/metricbeat/module/ibmmq/test_ibmmq.py @@ -33,3 +33,8 @@ def test_qmgr(self): self.assert_fields_are_documented(evt) self.assertIn("prometheus", evt.keys(), evt) self.assertIn("metrics", evt["prometheus"].keys(), evt) + self.assertGreater(len(evt["prometheus"]["metrics"].keys()), 0) + + # Verify if processors are correctly setup. + for metric in evt["prometheus"]["metrics"].keys(): + assert metric.startswith("ibmmq_")