From 838902e90d5a943e0f60eaaca3065e11a8e0147e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 21 Mar 2023 20:34:39 +0100 Subject: [PATCH] [7.17](backport #34870) Add test for the processor re-use issue (#34879) * Add test for the processor re-use issue (#34870) It's a follow-up to https://github.com/elastic/beats/pull/34761 This test makes sure that none of the critical configuration fields are re-used between instances of the pipeline client. (cherry picked from commit 3d917c851952e5e20a4e29c7ea0ceb871a5aec67) # Conflicts: # filebeat/channel/runner.go # filebeat/channel/runner_test.go * Resolve conflicts --------- Co-authored-by: Denis --- filebeat/channel/runner.go | 2 +- filebeat/channel/runner_mock_test.go | 125 +++++++++++++++++++++++++++ filebeat/channel/runner_test.go | 39 +++++++++ 3 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 filebeat/channel/runner_mock_test.go diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index ca6a1cda3fab..6d3ed39268da 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -78,7 +78,7 @@ func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *common.Co // - *processors*: list of local processors to be added to the processing pipeline // - *keep_null*: keep or remove 'null' from events to be published // - *_module_name* (hidden setting): Add fields describing the module name -// - *_ fileset_name* (hiddrn setting): +// - *_ fileset_name* (hidden setting): // - *pipeline*: Configure the ES Ingest Node pipeline name to be used for events from this input // - *index*: Configure the index name for events to be collected from this input // - *type*: implicit event type diff --git a/filebeat/channel/runner_mock_test.go b/filebeat/channel/runner_mock_test.go new file mode 100644 index 000000000000..2e2259d750cf --- /dev/null +++ b/filebeat/channel/runner_mock_test.go @@ -0,0 +1,125 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package channel + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/stretchr/testify/require" +) + +type runnerFactoryMock struct { + clientCount int + cfgs []beat.ClientConfig +} + +func (r *runnerFactoryMock) Create(p beat.PipelineConnector, config *common.Config) (cfgfile.Runner, error) { + // When using the connector multiple times to create a client + // it's using the same editor function for creating a new client + // with a modified configuration that includes predefined processing. + // This is why we must make sure nothing is re-used from one client to another. + for i := 0; i < r.clientCount; i++ { + client, err := p.ConnectWith(beat.ClientConfig{}) + if err != nil { + return nil, err + } + + // storing the config that the client was created with + // it's needed for the `Assert` later + r.cfgs = append(r.cfgs, client.(*clientMock).cfg) + } + return &struct { + cfgfile.Runner + }{}, nil +} + +func (runnerFactoryMock) CheckConfig(config *common.Config) error { + return nil +} + +// Assert runs various checks for the clients created by the wrapped pipeline connector +// We check that the processing configuration does not reference the same addresses as before, +// re-using some parts of the processing configuration will result in various issues, such as: +// * closing processors multiple times +// * using closed processors +// * modifiying an object shared by multiple pipeline clients +func (r runnerFactoryMock) Assert(t *testing.T) { + t.Helper() + + // we need to make sure `Assert` is called after `Create` + require.Len(t, r.cfgs, r.clientCount) + + t.Run("new processing configuration each time", func(t *testing.T) { + for i, c1 := range r.cfgs { + for j, c2 := range r.cfgs { + if i == j { + continue + } + + require.NotSamef(t, c1.Processing, c2.Processing, "processing configuration cannot be re-used") + require.NotSamef(t, c1.Processing.Meta, c2.Processing.Meta, "`Processing.Meta` cannot be re-used") + require.NotSamef(t, c1.Processing.Fields, c2.Processing.Fields, "`Processing.Fields` cannot be re-used") + require.NotSamef(t, c1.Processing.Processor, c2.Processing.Processor, "`Processing.Processor` cannot be re-used") + } + } + }) + + t.Run("new processors each time", func(t *testing.T) { + var processors []beat.Processor + for _, c := range r.cfgs { + processors = append(processors, c.Processing.Processor.All()...) + } + + require.NotEmptyf(t, processors, "for this test the list of processors cannot be empty") + + for i, p1 := range processors { + for j, p2 := range processors { + if i == j { + continue + } + + require.NotSamef(t, p1, p2, "processors must not be re-used") + } + } + }) +} + +type clientMock struct { + cfg beat.ClientConfig +} + +func (clientMock) Publish(beat.Event) {} +func (clientMock) PublishAll([]beat.Event) {} +func (clientMock) Close() error { return nil } + +type pipelineConnectorMock struct{} + +func (pipelineConnectorMock) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + client := &clientMock{ + cfg: cfg, + } + return client, nil +} + +func (pipelineConnectorMock) Connect() (beat.Client, error) { + return &clientMock{}, nil +} diff --git a/filebeat/channel/runner_test.go b/filebeat/channel/runner_test.go index cf42a38e4b8e..0531c5a8dfa5 100644 --- a/filebeat/channel/runner_test.go +++ b/filebeat/channel/runner_test.go @@ -30,6 +30,9 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" + + _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata" ) func TestProcessorsForConfig(t *testing.T) { @@ -210,3 +213,39 @@ func makeProcessors(procs ...processors.Processor) *processors.Processors { procList.List = procs return procList } + +func TestRunnerFactoryWithCommonInputSettings(t *testing.T) { + + // we use `add_kubernetes_metadata` and `add_cloud_metadata` + // for testing because initially the problem we've discovered + // was visible with these 2 processors. + configYAML := ` +processors: + - add_kubernetes_metadata: ~ + - add_cloud_metadata: ~ +keep_null: true +publisher_pipeline: + disable_host: true +type: "filestream" +service.type: "module" +pipeline: "test" +index: "%{[fields.log_type]}-%{[agent.version]}-%{+yyyy.MM.dd}" +` + cfg, err := common.NewConfigWithYAML([]byte(configYAML), configYAML) + require.NoError(t, err) + + b := beat.Info{} // not important for the test + rf := &runnerFactoryMock{ + clientCount: 3, // we will create 3 clients from the wrapped pipeline + } + pcm := &pipelineConnectorMock{} // creates mock pipeline clients and will get wrapped + + rfwc := RunnerFactoryWithCommonInputSettings(b, rf) + + // create a wrapped runner, our mock runner will + // create the given amount of clients here using the wrapped pipeline connector. + _, err = rfwc.Create(pcm, cfg) + require.NoError(t, err) + + rf.Assert(t) +}