From 5a5b20d6db9517c048f4fa188b89626712eedc2b Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Thu, 2 Feb 2017 13:30:42 +0100 Subject: [PATCH] Use the Beat version in the Ingest Node pipeline This adds the Beat version to the pipeline ID, which means that if we change the pipeline between versions, the new version will be used automatically. It also means that one can run different versions of the same Beat and the pipelines won't override each other. The pipelines are loaded automatically on the Beat start. Part of #3159. --- filebeat/beater/filebeat.go | 2 +- filebeat/fileset/fileset.go | 14 +++++++------- filebeat/fileset/fileset_test.go | 12 ++++++------ filebeat/fileset/modules.go | 9 +++++---- filebeat/fileset/modules_integration_test.go | 10 +++++----- filebeat/fileset/modules_test.go | 6 +++--- 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 8c2c6a5d03d3..b33b6cf9efef 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -37,7 +37,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, fmt.Errorf("Error reading config file: %v", err) } - moduleRegistry, err := fileset.NewModuleRegistry(config.Modules) + moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Version) if err != nil { return nil, err } diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index e723dd3ed769..0938399aeb60 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -51,7 +51,7 @@ func New( } // Read reads the manifest file and evaluates the variables. -func (fs *Fileset) Read() error { +func (fs *Fileset) Read(beatVersion string) error { var err error fs.manifest, err = fs.readManifest() if err != nil { @@ -63,7 +63,7 @@ func (fs *Fileset) Read() error { return err } - fs.pipelineID, err = fs.getPipelineID() + fs.pipelineID, err = fs.getPipelineID(beatVersion) if err != nil { return err } @@ -241,13 +241,13 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) { } // getPipelineID returns the Ingest Node pipeline ID -func (fs *Fileset) getPipelineID() (string, error) { +func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline) if err != nil { return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) } - return formatPipelineID(fs.mcfg.Module, fs.name, path), nil + return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil } func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) { @@ -266,12 +266,12 @@ func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interfac if err != nil { return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) } - return formatPipelineID(fs.mcfg.Module, fs.name, path), content, nil + return fs.pipelineID, content, nil } // formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch -func formatPipelineID(module, fileset, path string) string { - return fmt.Sprintf("%s-%s-%s", module, fileset, removeExt(filepath.Base(path))) +func formatPipelineID(module, fileset, path, beatVersion string) string { + return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path))) } // removeExt returns the file name without the extension. If no dot is found, diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 6c8bfbf57016..8ee770502641 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -149,7 +149,7 @@ func TestResolveVariable(t *testing.T) { func TestGetProspectorConfigNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read()) + assert.NoError(t, fs.Read("5.2.0")) cfg, err := fs.getProspectorConfig() assert.NoError(t, err) @@ -159,7 +159,7 @@ func TestGetProspectorConfigNginx(t *testing.T) { assert.True(t, cfg.HasField("pipeline")) pipelineID, err := cfg.String("pipeline", -1) assert.NoError(t, err) - assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Equal(t, "filebeat-5.2.0-nginx-access-with_plugins", pipelineID) } func TestGetProspectorConfigNginxOverrides(t *testing.T) { @@ -172,7 +172,7 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) { }) assert.NoError(t, err) - assert.NoError(t, fs.Read()) + assert.NoError(t, fs.Read("5.2.0")) cfg, err := fs.getProspectorConfig() assert.NoError(t, err) @@ -183,17 +183,17 @@ func TestGetProspectorConfigNginxOverrides(t *testing.T) { assert.True(t, cfg.HasField("pipeline")) pipelineID, err := cfg.String("pipeline", -1) assert.NoError(t, err) - assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Equal(t, "filebeat-5.2.0-nginx-access-with_plugins", pipelineID) } func TestGetPipelineNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - assert.NoError(t, fs.Read()) + assert.NoError(t, fs.Read("5.2.0")) pipelineID, content, err := fs.GetPipeline() assert.NoError(t, err) - assert.Equal(t, "nginx-access-with_plugins", pipelineID) + assert.Equal(t, "filebeat-5.2.0-nginx-access-with_plugins", pipelineID) assert.Contains(t, content, "description") assert.Contains(t, content, "processors") } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 2bdc3631e315..bed8fb93b3a6 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -18,7 +18,8 @@ type ModuleRegistry struct { // newModuleRegistry reads and loads the configured module into the registry. func newModuleRegistry(modulesPath string, moduleConfigs []ModuleConfig, - overrides *ModuleOverrides) (*ModuleRegistry, error) { + overrides *ModuleOverrides, + beatVersion string) (*ModuleRegistry, error) { var reg ModuleRegistry reg.registry = map[string]map[string]*Fileset{} @@ -53,7 +54,7 @@ func newModuleRegistry(modulesPath string, if err != nil { return nil, err } - err = fileset.Read() + err = fileset.Read(beatVersion) if err != nil { return nil, fmt.Errorf("Error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) } @@ -81,7 +82,7 @@ func newModuleRegistry(modulesPath string, } // NewModuleRegistry reads and loads the configured module into the registry. -func NewModuleRegistry(moduleConfigs []*common.Config) (*ModuleRegistry, error) { +func NewModuleRegistry(moduleConfigs []*common.Config, beatVersion string) (*ModuleRegistry, error) { modulesPath := paths.Resolve(paths.Home, "module") stat, err := os.Stat(modulesPath) @@ -106,7 +107,7 @@ func NewModuleRegistry(moduleConfigs []*common.Config) (*ModuleRegistry, error) if err != nil { return nil, err } - return newModuleRegistry(modulesPath, mcfgs, modulesOverrides) + return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatVersion) } func mcfgFromConfig(cfg *common.Config) (*ModuleConfig, error) { diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index acfc98d7b901..5ad23372d09e 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -35,8 +35,8 @@ func TestLoadPipeline(t *testing.T) { func TestSetupNginx(t *testing.T) { client := elasticsearch.GetTestingElasticsearch() - client.Request("DELETE", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil) - client.Request("DELETE", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-5.2.0-nginx-access-with_plugins", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-5.2.0-nginx-error-pipeline", "", nil, nil) modulesPath, err := filepath.Abs("../module") assert.NoError(t, err) @@ -45,14 +45,14 @@ func TestSetupNginx(t *testing.T) { {Module: "nginx"}, } - reg, err := newModuleRegistry(modulesPath, configs, nil) + reg, err := newModuleRegistry(modulesPath, configs, nil, "5.2.0") assert.NoError(t, err) err = reg.LoadPipelines(client) assert.NoError(t, err) - status, _, _ := client.Request("GET", "/_ingest/pipeline/nginx-access-with_plugins", "", nil, nil) + status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-5.2.0-nginx-access-with_plugins", "", nil, nil) assert.Equal(t, 200, status) - status, _, _ = client.Request("GET", "/_ingest/pipeline/nginx-error-pipeline", "", nil, nil) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-5.2.0-nginx-error-pipeline", "", nil, nil) assert.Equal(t, 200, status) } diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index a6f5a612ea19..db9169e5bc70 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -30,7 +30,7 @@ func TestNewModuleRegistry(t *testing.T) { {Module: "system"}, } - reg, err := newModuleRegistry(modulesPath, configs, nil) + reg, err := newModuleRegistry(modulesPath, configs, nil, "5.2.0") assert.NoError(t, err) assert.NotNil(t, reg) @@ -86,7 +86,7 @@ func TestNewModuleRegistryConfig(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil) + reg, err := newModuleRegistry(modulesPath, configs, nil, "5.2.0") assert.NoError(t, err) assert.NotNil(t, reg) @@ -335,7 +335,7 @@ func TestMissingModuleFolder(t *testing.T) { load(t, map[string]interface{}{"module": "nginx"}), } - reg, err := NewModuleRegistry(configs) + reg, err := NewModuleRegistry(configs, "5.2.0") assert.NoError(t, err) assert.NotNil(t, reg)