Skip to content

Commit

Permalink
Add test for the processor re-use issue (#34870)
Browse files Browse the repository at this point in the history
It's a follow-up to #34761

This test makes sure that none of the critical configuration fields
are re-used between instances of the pipeline client.

(cherry picked from commit 3d917c8)
  • Loading branch information
rdner authored and mergify[bot] committed Mar 21, 2023
1 parent 6c07355 commit c893981
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 1 deletion.
2 changes: 1 addition & 1 deletion filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C) (
// - *processors*: list of local processors to be added to the processing pipeline
// - *keep_null*: keep or remove 'null' from events to be published
// - *_module_name* (hidden setting): Add fields describing the module name
// - *_ fileset_name* (hiddrn setting):
// - *_ fileset_name* (hidden setting):
// - *pipeline*: Configure the ES Ingest Node pipeline name to be used for events from this input
// - *index*: Configure the index name for events to be collected from this input
// - *type*: implicit event type
Expand Down
126 changes: 126 additions & 0 deletions filebeat/channel/runner_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"

conf "github.com/elastic/elastic-agent-libs/config"

"github.com/stretchr/testify/require"
)

type runnerFactoryMock struct {
clientCount int
cfgs []beat.ClientConfig
}

func (r *runnerFactoryMock) Create(p beat.PipelineConnector, config *conf.C) (cfgfile.Runner, error) {
// When using the connector multiple times to create a client
// it's using the same editor function for creating a new client
// with a modified configuration that includes predefined processing.
// This is why we must make sure nothing is re-used from one client to another.
for i := 0; i < r.clientCount; i++ {
client, err := p.ConnectWith(beat.ClientConfig{})
if err != nil {
return nil, err
}

// storing the config that the client was created with
// it's needed for the `Assert` later
r.cfgs = append(r.cfgs, client.(*clientMock).cfg)
}
return &struct {
cfgfile.Runner
}{}, nil
}

func (runnerFactoryMock) CheckConfig(config *conf.C) error {
return nil
}

// Assert runs various checks for the clients created by the wrapped pipeline connector
// We check that the processing configuration does not reference the same addresses as before,
// re-using some parts of the processing configuration will result in various issues, such as:
// * closing processors multiple times
// * using closed processors
// * modifiying an object shared by multiple pipeline clients
func (r runnerFactoryMock) Assert(t *testing.T) {
t.Helper()

// we need to make sure `Assert` is called after `Create`
require.Len(t, r.cfgs, r.clientCount)

t.Run("new processing configuration each time", func(t *testing.T) {
for i, c1 := range r.cfgs {
for j, c2 := range r.cfgs {
if i == j {
continue
}

require.NotSamef(t, c1.Processing, c2.Processing, "processing configuration cannot be re-used")
require.NotSamef(t, c1.Processing.Meta, c2.Processing.Meta, "`Processing.Meta` cannot be re-used")
require.NotSamef(t, c1.Processing.Fields, c2.Processing.Fields, "`Processing.Fields` cannot be re-used")
require.NotSamef(t, c1.Processing.Processor, c2.Processing.Processor, "`Processing.Processor` cannot be re-used")
}
}
})

t.Run("new processors each time", func(t *testing.T) {
var processors []beat.Processor
for _, c := range r.cfgs {
processors = append(processors, c.Processing.Processor.All()...)
}

require.NotEmptyf(t, processors, "for this test the list of processors cannot be empty")

for i, p1 := range processors {
for j, p2 := range processors {
if i == j {
continue
}

require.NotSamef(t, p1, p2, "processors must not be re-used")
}
}
})
}

type clientMock struct {
cfg beat.ClientConfig
}

func (clientMock) Publish(beat.Event) {}
func (clientMock) PublishAll([]beat.Event) {}
func (clientMock) Close() error { return nil }

type pipelineConnectorMock struct{}

func (pipelineConnectorMock) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
client := &clientMock{
cfg: cfg,
}
return client, nil
}

func (pipelineConnectorMock) Connect() (beat.Client, error) {
return &clientMock{}, nil
}
38 changes: 38 additions & 0 deletions filebeat/channel/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -211,3 +213,39 @@ func makeProcessors(procs ...processors.Processor) *processors.Processors {
procList.List = procs
return procList
}

func TestRunnerFactoryWithCommonInputSettings(t *testing.T) {

// we use `add_kubernetes_metadata` and `add_cloud_metadata`
// for testing because initially the problem we've discovered
// was visible with these 2 processors.
configYAML := `
processors:
- add_kubernetes_metadata: ~
- add_cloud_metadata: ~
keep_null: true
publisher_pipeline:
disable_host: true
type: "filestream"
service.type: "module"
pipeline: "test"
index: "%{[fields.log_type]}-%{[agent.version]}-%{+yyyy.MM.dd}"
`
cfg, err := conf.NewConfigWithYAML([]byte(configYAML), configYAML)
require.NoError(t, err)

b := beat.Info{} // not important for the test
rf := &runnerFactoryMock{
clientCount: 3, // we will create 3 clients from the wrapped pipeline
}
pcm := &pipelineConnectorMock{} // creates mock pipeline clients and will get wrapped

rfwc := RunnerFactoryWithCommonInputSettings(b, rf)

// create a wrapped runner, our mock runner will
// create the given amount of clients here using the wrapped pipeline connector.
_, err = rfwc.Create(pcm, cfg)
require.NoError(t, err)

rf.Assert(t)
}

0 comments on commit c893981

Please sign in to comment.