From 63fe81463544896825d05a7467498f41f952bcb3 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Tue, 7 Jul 2020 14:52:52 +0200 Subject: [PATCH] Cherry-pick #17655 to 7.x: Start to split filebeat/channel up (#19654) (cherry picked from commit bc8123a) --- filebeat/beater/channels.go | 91 ++++++++ filebeat/beater/crawler.go | 4 +- filebeat/beater/filebeat.go | 20 +- filebeat/channel/connector.go | 87 +------- filebeat/channel/factory.go | 62 +----- filebeat/channel/outlet.go | 8 +- filebeat/channel/runner.go | 201 ++++++++++++++++++ .../{connector_test.go => runner_test.go} | 38 ++-- libbeat/autodiscover/autodiscover.go | 4 +- libbeat/cfgfile/list.go | 4 +- libbeat/cfgfile/reload.go | 4 +- libbeat/publisher/pipetool/pipetool.go | 91 ++++++++ 12 files changed, 426 insertions(+), 188 deletions(-) create mode 100644 filebeat/channel/runner.go rename filebeat/channel/{connector_test.go => runner_test.go} (90%) create mode 100644 libbeat/publisher/pipetool/pipetool.go diff --git a/filebeat/beater/channels.go b/filebeat/beater/channels.go index 38874cf483a..de65fbf5c68 100644 --- a/filebeat/beater/channels.go +++ b/filebeat/beater/channels.go @@ -22,7 +22,9 @@ import ( "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/filebeat/registrar" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) type registrarLogger struct { @@ -41,6 +43,23 @@ type eventCounter struct { wg sync.WaitGroup } +// countingClient adds and substracts from a counter when events have been +// published, dropped or ACKed. The countingClient can be used to keep track of +// inflight events for a beat.Client instance. The counter is updated after the +// client has been disconnected from the publisher pipeline via 'Closed'. +type countingClient struct { + counter *eventCounter + client beat.Client +} + +type countingEventer struct { + wgEvents *eventCounter +} + +type combinedEventer struct { + a, b beat.ClientEventer +} + func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { return ®istrarLogger{ done: make(chan struct{}), @@ -87,3 +106,75 @@ func (c *eventCounter) Done() { func (c *eventCounter) Wait() { c.wg.Wait() } + +// withPipelineEventCounter adds a counter to the pipeline that keeps track of +// all events published, dropped and ACKed by any active client. +// The type accepted by counter is compatible with sync.WaitGroup. +func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCounter) beat.PipelineConnector { + counterListener := &countingEventer{counter} + + pipeline = pipetool.WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) { + if evts := config.Events; evts != nil { + config.Events = &combinedEventer{evts, counterListener} + } else { + config.Events = counterListener + } + return config, nil + }) + + pipeline = pipetool.WithClientWrapper(pipeline, func(client beat.Client) beat.Client { + return &countingClient{ + counter: counter, + client: client, + } + }) + return pipeline +} + +func (c *countingClient) Publish(event beat.Event) { + c.counter.Add(1) + c.client.Publish(event) +} + +func (c *countingClient) PublishAll(events []beat.Event) { + c.counter.Add(len(events)) + c.client.PublishAll(events) +} + +func (c *countingClient) Close() error { + return c.client.Close() +} + +func (*countingEventer) Closing() {} +func (*countingEventer) Closed() {} +func (*countingEventer) Published() {} + +func (c *countingEventer) FilteredOut(_ beat.Event) {} +func (c *countingEventer) DroppedOnPublish(_ beat.Event) { + c.wgEvents.Done() +} + +func (c *combinedEventer) Closing() { + c.a.Closing() + c.b.Closing() +} + +func (c *combinedEventer) Closed() { + c.a.Closed() + c.b.Closed() +} + +func (c *combinedEventer) Published() { + c.a.Published() + c.b.Published() +} + +func (c *combinedEventer) FilteredOut(event beat.Event) { + c.a.FilteredOut(event) + c.b.FilteredOut(event) +} + +func (c *combinedEventer) DroppedOnPublish(event beat.Event) { + c.a.DroppedOnPublish(event) + c.b.DroppedOnPublish(event) +} diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index 8418f299e0d..842cb92d2d8 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -62,7 +62,7 @@ func newCrawler( // Start starts the crawler with all inputs func (c *crawler) Start( - pipeline beat.Pipeline, + pipeline beat.PipelineConnector, configInputs *common.Config, configModules *common.Config, ) error { @@ -111,7 +111,7 @@ func (c *crawler) Start( } func (c *crawler) startInput( - pipeline beat.Pipeline, + pipeline beat.PipelineConnector, config *common.Config, ) error { if !config.Enabled() { diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index ba8514b0365..71334e4da47 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -43,6 +43,7 @@ import ( "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" _ "github.com/elastic/beats/v7/filebeat/include" @@ -68,6 +69,7 @@ type Filebeat struct { config *cfg.Config moduleRegistry *fileset.ModuleRegistry done chan struct{} + pipeline beat.PipelineConnector } // New creates a new Filebeat pointer instance. @@ -169,7 +171,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config()) modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory) if fb.config.ConfigModules.Enabled() { - modulesLoader := cfgfile.NewReloader(b.Publisher, fb.config.ConfigModules) + modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules) modulesLoader.Load(modulesFactory) } @@ -328,8 +330,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } + fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend) + fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents) + outDone := make(chan struct{}) // outDone closes down all active pipeline connections - pipelineConnector := channel.NewOutletFactory(outDone, wgEvents, b.Info).Create + pipelineConnector := channel.NewOutletFactory(outDone).Create // Create a ES connection factory for dynamic modules pipeline loading var pipelineLoaderFactory fileset.PipelineLoaderFactory @@ -339,7 +344,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Warn(pipelinesWarning) } - inputLoader := input.NewRunnerFactory(pipelineConnector, registrar, fb.done) + inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, + input.NewRunnerFactory(pipelineConnector, registrar, fb.done)) moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once) @@ -376,7 +382,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(b.Publisher, config.ConfigInput, config.ConfigModules) + err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() return fmt.Errorf("Failed to start crawler: %+v", err) @@ -393,17 +399,17 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } // Register reloadable list of inputs and modules - inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, b.Publisher) + inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline) reload.Register.MustRegisterList("filebeat.inputs", inputs) - modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, b.Publisher) + modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline) reload.Register.MustRegisterList("filebeat.modules", modules) var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { adiscover, err = autodiscover.NewAutodiscover( "filebeat", - b.Publisher, + fb.pipeline, cfgfile.MultiplexedRunnerFactory( cfgfile.MatchHasField("module", moduleLoader), cfgfile.MatchDefault(inputLoader), diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 7e39799b960..279c50c58b0 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -20,9 +20,6 @@ package channel import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/beats/v7/libbeat/processors" - "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" ) // ConnectorFunc is an adapter for using ordinary functions as Connector. @@ -48,97 +45,15 @@ func (c *pipelineConnector) Connect(cfg *common.Config) (Outleter, error) { } func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) { - config := inputOutletConfig{} - if err := cfg.Unpack(&config); err != nil { - return nil, err - } - - procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg) - if err != nil { - return nil, err - } - - setOptional := func(to common.MapStr, key string, value string) { - if value != "" { - to.Put(key, value) - } - } - - meta := clientCfg.Processing.Meta.Clone() - fields := clientCfg.Processing.Fields.Clone() - - serviceType := config.ServiceType - if serviceType == "" { - serviceType = config.Module - } - - setOptional(meta, "pipeline", config.Pipeline) - setOptional(fields, "fileset.name", config.Fileset) - setOptional(fields, "service.type", serviceType) - setOptional(fields, "input.type", config.Type) - if config.Module != "" { - event := common.MapStr{"module": config.Module} - if config.Fileset != "" { - event["dataset"] = config.Module + "." + config.Fileset - } - fields["event"] = event - } - - mode := clientCfg.PublishMode - if mode == beat.DefaultGuarantees { - mode = beat.GuaranteedSend - } - // connect with updated configuration - clientCfg.PublishMode = mode - clientCfg.Processing.EventMetadata = config.EventMetadata - clientCfg.Processing.Meta = meta - clientCfg.Processing.Fields = fields - clientCfg.Processing.Processor = procs - clientCfg.Processing.KeepNull = config.KeepNull - clientCfg.Processing.DisableHost = config.PublisherPipeline.DisableHost client, err := c.pipeline.ConnectWith(clientCfg) if err != nil { return nil, err } - outlet := newOutlet(client, c.parent.wgEvents) + outlet := newOutlet(client) if c.parent.done != nil { return CloseOnSignal(outlet, c.parent.done), nil } return outlet, nil } - -// processorsForConfig assembles the Processors for a pipelineConnector. -func processorsForConfig( - beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig, -) (*processors.Processors, error) { - procs := processors.NewList(nil) - - // Processor ordering is important: - // 1. Index configuration - if !config.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) - timestampFormat, err := - fmtstr.NewTimestampFormatString(&config.Index, staticFields) - if err != nil { - return nil, err - } - indexProcessor := add_formatted_index.New(timestampFormat) - procs.AddProcessor(indexProcessor) - } - - // 2. ClientConfig processors - if lst := clientCfg.Processing.Processor; lst != nil { - procs.AddProcessor(lst) - } - - // 3. User processors - userProcessors, err := processors.New(config.Processors) - if err != nil { - return nil, err - } - procs.AddProcessors(*userProcessors) - - return procs, nil -} diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index f0782767dc1..7962377856c 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -19,69 +19,17 @@ package channel import ( "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/beats/v7/libbeat/processors" ) type OutletFactory struct { done <-chan struct{} - - eventer beat.ClientEventer - wgEvents eventCounter - beatInfo beat.Info -} - -type eventCounter interface { - Add(n int) - Done() -} - -// clientEventer adjusts wgEvents if events are dropped during shutdown. -type clientEventer struct { - wgEvents eventCounter -} - -// inputOutletConfig defines common input settings -// for the publisher pipeline. -type inputOutletConfig struct { - // event processing - common.EventMetadata `config:",inline"` // Fields and tags to add to events. - Processors processors.PluginConfig `config:"processors"` - KeepNull bool `config:"keep_null"` - - PublisherPipeline struct { - DisableHost bool `config:"disable_host"` // Disable addition of host.name. - } `config:"publisher_pipeline"` - - // implicit event fields - Type string `config:"type"` // input.type - ServiceType string `config:"service.type"` // service.type - - // hidden filebeat modules settings - Module string `config:"_module_name"` // hidden setting - Fileset string `config:"_fileset_name"` // hidden setting - - // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - Index fmtstr.EventFormatString `config:"index"` // ES output index pattern } // NewOutletFactory creates a new outlet factory for // connecting an input to the publisher pipeline. -func NewOutletFactory( - done <-chan struct{}, - wgEvents eventCounter, - beatInfo beat.Info, -) *OutletFactory { +func NewOutletFactory(done <-chan struct{}) *OutletFactory { o := &OutletFactory{ - done: done, - wgEvents: wgEvents, - beatInfo: beatInfo, - } - - if wgEvents != nil { - o.eventer = &clientEventer{wgEvents} + done: done, } return o @@ -94,9 +42,3 @@ func NewOutletFactory( func (f *OutletFactory) Create(p beat.PipelineConnector) Connector { return &pipelineConnector{parent: f, pipeline: p} } - -func (e *clientEventer) Closing() {} -func (e *clientEventer) Closed() {} -func (e *clientEventer) Published() {} -func (e *clientEventer) FilteredOut(evt beat.Event) {} -func (e *clientEventer) DroppedOnPublish(evt beat.Event) { e.wgEvents.Done() } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index 3211a9d7293..fd5c9b12fc1 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -23,15 +23,13 @@ import ( ) type outlet struct { - wg eventCounter client beat.Client isOpen atomic.Bool done chan struct{} } -func newOutlet(client beat.Client, wg eventCounter) *outlet { +func newOutlet(client beat.Client) *outlet { o := &outlet{ - wg: wg, client: client, isOpen: atomic.MakeBool(true), done: make(chan struct{}), @@ -57,10 +55,6 @@ func (o *outlet) OnEvent(event beat.Event) bool { return false } - if o.wg != nil { - o.wg.Add(1) - } - o.client.Publish(event) // Note: race condition on shutdown: diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go new file mode 100644 index 00000000000..941e1cc8161 --- /dev/null +++ b/filebeat/channel/runner.go @@ -0,0 +1,201 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package channel + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" +) + +type onCreateFactory struct { + factory cfgfile.RunnerFactory + create onCreateWrapper +} + +type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config, *common.MapStrPointer) (cfgfile.Runner, error) + +// commonInputConfig defines common input settings +// for the publisher pipeline. +type commonInputConfig struct { + // event processing + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + Processors processors.PluginConfig `config:"processors"` + KeepNull bool `config:"keep_null"` + + PublisherPipeline struct { + DisableHost bool `config:"disable_host"` // Disable addition of host.name. + } `config:"publisher_pipeline"` + + // implicit event fields + Type string `config:"type"` // input.type + ServiceType string `config:"service.type"` // service.type + + // hidden filebeat modules settings + Module string `config:"_module_name"` // hidden setting + Fileset string `config:"_fileset_name"` // hidden setting + + // Output meta data settings + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern +} + +func (f *onCreateFactory) CheckConfig(cfg *common.Config) error { + return f.factory.CheckConfig(cfg) +} + +func (f *onCreateFactory) Create( + pipeline beat.PipelineConnector, + cfg *common.Config, + meta *common.MapStrPointer, +) (cfgfile.Runner, error) { + return f.create(f.factory, pipeline, cfg, meta) +} + +// RunnerFactoryWithCommonInputSettings wraps a runner factory, such that all runners +// created by this factory have the same processing capabilities and related +// configuration file settings. +// +// Common settings ensured by this factory wrapper: +// - *fields*: common fields to be added to the pipeline +// - *fields_under_root*: select at which level to store the fields +// - *tags*: add additional tags to the events +// - *processors*: list of local processors to be added to the processing pipeline +// - *keep_null*: keep or remove 'null' from events to be published +// - *_module_name* (hidden setting): Add fields describing the module name +// - *_ fileset_name* (hiddrn setting): +// - *pipeline*: Configure the ES Ingest Node pipeline name to be used for events from this input +// - *index*: Configure the index name for events to be collected from this input +// - *type*: implicit event type +// - *service.type*: implicit event type +func RunnerFactoryWithCommonInputSettings(info beat.Info, f cfgfile.RunnerFactory) cfgfile.RunnerFactory { + return wrapRunnerCreate(f, + func( + f cfgfile.RunnerFactory, + pipeline beat.PipelineConnector, + cfg *common.Config, + meta *common.MapStrPointer, + ) (runner cfgfile.Runner, err error) { + pipeline, err = withClientConfig(info, pipeline, cfg) + if err != nil { + return nil, err + } + + return f.Create(pipeline, cfg, meta) + }) +} + +func wrapRunnerCreate(f cfgfile.RunnerFactory, edit onCreateWrapper) cfgfile.RunnerFactory { + return &onCreateFactory{factory: f, create: edit} +} + +// withClientConfig reads common Beat input instance configurations from the +// configuration object and ensure that the settings are applied to each client. +func withClientConfig( + beatInfo beat.Info, + pipeline beat.PipelineConnector, + cfg *common.Config, +) (beat.PipelineConnector, error) { + editor, err := newCommonConfigEditor(beatInfo, cfg) + if err != nil { + return nil, err + } + return pipetool.WithClientConfigEdit(pipeline, editor), nil +} + +func newCommonConfigEditor( + beatInfo beat.Info, + cfg *common.Config, +) (pipetool.ConfigEditor, error) { + config := commonInputConfig{} + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + var indexProcessor processors.Processor + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + + serviceType := config.ServiceType + if serviceType == "" { + serviceType = config.Module + } + + return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + meta := clientCfg.Processing.Meta.Clone() + fields := clientCfg.Processing.Fields.Clone() + + setOptional(meta, "pipeline", config.Pipeline) + setOptional(fields, "fileset.name", config.Fileset) + setOptional(fields, "service.type", serviceType) + setOptional(fields, "input.type", config.Type) + if config.Module != "" { + event := common.MapStr{"module": config.Module} + if config.Fileset != "" { + event["dataset"] = config.Module + "." + config.Fileset + } + fields["event"] = event + } + + // assemble the processors. Ordering is important. + // 1. add support for index configuration via processor + // 2. add processors added by the input that wants to connect + // 3. add locally configured processors from the 'processors' settings + procs := processors.NewList(nil) + if indexProcessor != nil { + procs.AddProcessor(indexProcessor) + } + if lst := clientCfg.Processing.Processor; lst != nil { + procs.AddProcessor(lst) + } + if userProcessors != nil { + procs.AddProcessors(*userProcessors) + } + + clientCfg.Processing.EventMetadata = config.EventMetadata + clientCfg.Processing.Meta = meta + clientCfg.Processing.Fields = fields + clientCfg.Processing.Processor = procs + clientCfg.Processing.KeepNull = config.KeepNull + clientCfg.Processing.DisableHost = config.PublisherPipeline.DisableHost + + return clientCfg, nil + }, nil +} + +func setOptional(to common.MapStr, key string, value string) { + if value != "" { + to.Put(key, value) + } +} diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/runner_test.go similarity index 90% rename from filebeat/channel/connector_test.go rename to filebeat/channel/runner_test.go index b75a0fe5eee..cf42a38e4b8 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/runner_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" @@ -110,16 +111,22 @@ func TestProcessorsForConfig(t *testing.T) { if test.event.Fields == nil { test.event.Fields = common.MapStr{} } - config, err := outletConfigFromString(test.configStr) + config, err := common.NewConfigFrom(test.configStr) if err != nil { t.Errorf("[%s] %v", description, err) continue } - processors, err := processorsForConfig(test.beatInfo, config, test.clientCfg) + + editor, err := newCommonConfigEditor(test.beatInfo, config) if err != nil { t.Errorf("[%s] %v", description, err) continue } + + clientCfg, err := editor(test.clientCfg) + require.NoError(t, err) + + processors := clientCfg.Processing.Processor processedEvent, err := processors.Run(&test.event) // We don't check if err != nil, because we are testing the final outcome // of running the processors, including when some of them fail. @@ -161,16 +168,21 @@ func TestProcessorsForConfigIsFlat(t *testing.T) { configStr := `processors: - add_fields: {fields: {testField: value}} - add_fields: {fields: {testField2: stuff}}` - config, err := outletConfigFromString(configStr) + config, err := common.NewConfigFrom(configStr) if err != nil { t.Fatal(err) } - processors, err := processorsForConfig( - beat.Info{}, config, beat.ClientConfig{}) + + editor, err := newCommonConfigEditor(beat.Info{}, config) if err != nil { t.Fatal(err) } - assert.Equal(t, 2, len(processors.List)) + + clientCfg, err := editor(beat.ClientConfig{}) + require.NoError(t, err) + + lst := clientCfg.Processing.Processor + assert.Equal(t, 2, len(lst.(*processors.Processors).List)) } // setRawIndex is a bare-bones processor to set the raw_index field to a @@ -192,20 +204,6 @@ func (p *setRawIndex) String() string { return fmt.Sprintf("set_raw_index=%v", p.indexStr) } -// Helper function to convert from YML input string to an unpacked -// inputOutletConfig -func outletConfigFromString(s string) (inputOutletConfig, error) { - config := inputOutletConfig{} - cfg, err := common.NewConfigFrom(s) - if err != nil { - return config, err - } - if err := cfg.Unpack(&config); err != nil { - return config, err - } - return config, nil -} - // makeProcessors wraps one or more bare Processor objects in Processors. func makeProcessors(procs ...processors.Processor) *processors.Processors { procList := processors.NewList(nil) diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index f67bf97db03..e26a2521c16 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -55,7 +55,7 @@ type EventConfigurer interface { // new modules when any configured providers does a match type Autodiscover struct { bus bus.Bus - defaultPipeline beat.Pipeline + defaultPipeline beat.PipelineConnector factory cfgfile.RunnerFactory configurer EventConfigurer providers []Provider @@ -69,7 +69,7 @@ type Autodiscover struct { // NewAutodiscover instantiates and returns a new Autodiscover manager func NewAutodiscover( name string, - pipeline beat.Pipeline, + pipeline beat.PipelineConnector, factory cfgfile.RunnerFactory, configurer EventConfigurer, config *Config, diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 514bfb36611..41f0ad7ab77 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -35,12 +35,12 @@ type RunnerList struct { runners map[uint64]Runner mutex sync.RWMutex factory RunnerFactory - pipeline beat.Pipeline + pipeline beat.PipelineConnector logger *logp.Logger } // NewRunnerList builds and returns a RunnerList -func NewRunnerList(name string, factory RunnerFactory, pipeline beat.Pipeline) *RunnerList { +func NewRunnerList(name string, factory RunnerFactory, pipeline beat.PipelineConnector) *RunnerList { return &RunnerList{ runners: map[uint64]Runner{}, factory: factory, diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 2d8729e881e..b4458e543b0 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -97,7 +97,7 @@ type Runner interface { // Reloader is used to register and reload modules type Reloader struct { - pipeline beat.Pipeline + pipeline beat.PipelineConnector config DynamicConfig path string done chan struct{} @@ -105,7 +105,7 @@ type Reloader struct { } // NewReloader creates new Reloader instance for the given config -func NewReloader(pipeline beat.Pipeline, cfg *common.Config) *Reloader { +func NewReloader(pipeline beat.PipelineConnector, cfg *common.Config) *Reloader { config := DefaultDynamicConfig cfg.Unpack(&config) diff --git a/libbeat/publisher/pipetool/pipetool.go b/libbeat/publisher/pipetool/pipetool.go new file mode 100644 index 00000000000..a05ec6e6022 --- /dev/null +++ b/libbeat/publisher/pipetool/pipetool.go @@ -0,0 +1,91 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipetool + +import "github.com/elastic/beats/v7/libbeat/beat" + +// connectEditPipeline modifies the client configuration using edit before calling +// edit. +type connectEditPipeline struct { + parent beat.PipelineConnector + edit ConfigEditor +} + +// ConfigEditor modifies the client configuration before connecting to a Pipeline. +type ConfigEditor func(beat.ClientConfig) (beat.ClientConfig, error) + +func (p *connectEditPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *connectEditPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + cfg, err := p.edit(cfg) + if err != nil { + return nil, err + } + return p.parent.ConnectWith(cfg) +} + +// wrapClientPipeline applies edit to the beat.Client returned by Connect and ConnectWith. +// The edit function can wrap the client to add additional functionality to clients +// that connect to the pipeline. +type wrapClientPipeline struct { + parent beat.PipelineConnector + wrapper ClientWrapper +} + +// ClientWrapper allows client instances to be wrapped. +type ClientWrapper func(beat.Client) beat.Client + +func (p *wrapClientPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *wrapClientPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + client, err := p.parent.ConnectWith(cfg) + if err == nil { + client = p.wrapper(client) + } + return client, err +} + +// WithClientConfigEdit creates a pipeline connector, that allows the +// beat.ClientConfig to be modified before connecting to the underlying +// pipeline. +// The edit function is applied before calling Connect or ConnectWith. +func WithClientConfigEdit(pipeline beat.PipelineConnector, edit ConfigEditor) beat.PipelineConnector { + return &connectEditPipeline{parent: pipeline, edit: edit} +} + +// WithDefaultGuarantee sets the default sending guarantee to `mode` if the +// beat.ClientConfig does not set the mode explicitly. +func WithDefaultGuarantees(pipeline beat.PipelineConnector, mode beat.PublishMode) beat.PipelineConnector { + return WithClientConfigEdit(pipeline, func(cfg beat.ClientConfig) (beat.ClientConfig, error) { + if cfg.PublishMode == beat.DefaultGuarantees { + cfg.PublishMode = mode + } + return cfg, nil + }) +} + +// WithClientWrapper calls wrap on beat.Client instance, after a successful +// call to `pipeline.Connect` or `pipeline.ConnectWith`. The wrap function can +// wrap the client to provide additional functionality. +func WithClientWrapper(pipeline beat.PipelineConnector, wrap ClientWrapper) beat.PipelineConnector { + return &wrapClientPipeline{parent: pipeline, wrapper: wrap} +}