Skip to content

Commit

Permalink
Filebeat modules: Machine Learning jobs (elastic#4506)
Browse files Browse the repository at this point in the history
This adds support for loading ML configurations (job + datafeed) from the filebeat modules.
An example ML configuration is added to the Nginx Filebeat module. This sample applies
ML anomaly detection on the response codes.

The loading is implemented as part of the `setup` command and part of the `--setup` flag.

If a job configuration with the same ID exists, it is not overwritten, because deleting jobs
could potentially delete user data. The user should manually delete the jobs in the UI if they
want to upgrade.
  • Loading branch information
tsg authored and exekias committed Jun 23, 2017
1 parent 766ff5b commit 1213483
Show file tree
Hide file tree
Showing 19 changed files with 533 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Add experimental Redis module. {pull}4441[4441]
- Nginx module: use the first not-private IP address as the remote_ip. {pull}4417[4417]
- Load Ingest Node pipelines when the Elasticsearch connection is established, instead of only once at startup. {pull}4479[4479]
- Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506]

*Heartbeat*

Expand Down
41 changes: 36 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package beater

import (
"errors"
"flag"
"fmt"
"sync"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -67,7 +68,12 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

if !config.ConfigProspector.Enabled() && !haveEnabledProspectors {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
if !b.InSetupCmd {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
} else {
// in the `setup` command, log this only as a warning
logp.Warn("Setup called, but no modules enabled.")
}
}

if *once && config.ConfigProspector.Enabled() {
Expand All @@ -79,12 +85,19 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: &config,
moduleRegistry: moduleRegistry,
}

// register `setup` callback for ML jobs
if !moduleRegistry.Empty() {
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
}
return fb, nil
}

// modulesSetup is called when modules are configured to do the initial
// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) modulesSetup(b *beat.Beat) error {
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand All @@ -104,13 +117,31 @@ func (fb *Filebeat) modulesSetup(b *beat.Beat) error {
return nil
}

func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")

esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
" modules because the Elasticsearch output is not configured/enabled.")
return nil
}

esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}

return fb.moduleRegistry.LoadML(esClient)
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

if !fb.moduleRegistry.Empty() {
err = fb.modulesSetup(b)
err = fb.loadModulesPipelines(b)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ func init() {
RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
RootCmd.ConfigTest.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
RootCmd.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
}
30 changes: 25 additions & 5 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"text/template"

"github.com/elastic/beats/libbeat/common"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
)

// Fileset struct is the representation of a fileset.
Expand Down Expand Up @@ -74,11 +75,16 @@ func (fs *Fileset) Read(beatVersion string) error {
// manifest structure is the representation of the manifest.yml file from the
// fileset.
type manifest struct {
ModuleVersion string `config:"module_version"`
Vars []map[string]interface{} `config:"var"`
IngestPipeline string `config:"ingest_pipeline"`
Prospector string `config:"prospector"`
Requires struct {
ModuleVersion string `config:"module_version"`
Vars []map[string]interface{} `config:"var"`
IngestPipeline string `config:"ingest_pipeline"`
Prospector string `config:"prospector"`
MachineLearning []struct {
Name string `config:"name"`
Job string `config:"job"`
Datafeed string `config:"datafeed"`
} `config:"machine_learning"`
Requires struct {
Processors []ProcessorRequirement `config:"processors"`
} `config:"requires"`
}
Expand Down Expand Up @@ -310,3 +316,17 @@ func removeExt(path string) string {
func (fs *Fileset) GetRequiredProcessors() []ProcessorRequirement {
return fs.manifest.Requires.Processors
}

// GetMLConfigs returns the list of machine-learning configurations declared
// by this fileset.
func (fs *Fileset) GetMLConfigs() []mlimporter.MLConfig {
var mlConfigs []mlimporter.MLConfig
for _, ml := range fs.manifest.MachineLearning {
mlConfigs = append(mlConfigs, mlimporter.MLConfig{
ID: fmt.Sprintf("filebeat-%s-%s-%s", fs.mcfg.Module, fs.name, ml.Name),
JobPath: filepath.Join(fs.modulePath, fs.name, ml.Job),
DatafeedPath: filepath.Join(fs.modulePath, fs.name, ml.Datafeed),
})
}
return mlConfigs
}
29 changes: 28 additions & 1 deletion filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package fileset

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
"github.com/elastic/beats/libbeat/paths"
)

Expand Down Expand Up @@ -418,6 +420,31 @@ func interpretError(initialErr error, body []byte) error {
return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// LoadML loads the machine-learning configurations into Elasticsearch, if Xpack is avaiable
func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
haveXpack, err := mlimporter.HaveXpackML(esClient)
if err != nil {
return errors.Errorf("Error checking if xpack is available: %v", err)
}
if !haveXpack {
logp.Warn("Xpack Machine Learning is not enabled")
return nil
}

for module, filesets := range reg.registry {
for name, fileset := range filesets {
for _, mlConfig := range fileset.GetMLConfigs() {
err = mlimporter.ImportMachineLearningJob(esClient, &mlConfig)
if err != nil {
return errors.Errorf("Error loading ML config from %s/%s: %v", module, name, err)
}
}
}
}

return nil
}

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"job_id": "JOB_ID",
"query_delay": "60s",
"frequency": "60s",
"indexes": [
"filebeat-*"
],
"types": [
"_default_",
"log"
],
"query": {
"match_all": {
"boost": 1
}
},
"aggregations": {
"buckets": {
"date_histogram": {
"field": "@timestamp",
"interval": 3600000,
"offset": 0,
"order": {
"_key": "asc"
},
"keyed": false,
"min_doc_count": 0
},
"aggregations": {
"@timestamp": {
"max": {
"field": "@timestamp"
}
},
"nginx.access.response_code": {
"terms": {
"field": "nginx.access.response_code",
"size": 10000
}
}
}
}
}
}
23 changes: 23 additions & 0 deletions filebeat/module/nginx/access/machine_learning/response_code.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"description" : "Anomaly detector for changes in event rates of nginx.access.response_code responses",
"analysis_config" : {
"bucket_span": "1h",
"summary_count_field_name": "doc_count",
"detectors": [
{
"detector_description": "Event rate for nginx.access.response_code",
"function": "count",
"detector_rules": [],
"partition_field_name": "nginx.access.response_code"
}
],
"influencers": ["nginx.access.response_code"]
},
"data_description": {
"time_field": "@timestamp",
"time_format": "epoch_ms"
},
"model_plot_config": {
"enabled": true
}
}
5 changes: 5 additions & 0 deletions filebeat/module/nginx/access/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ var:
ingest_pipeline: ingest/default.json
prospector: config/nginx-access.yml

machine_learning:
- name: response_code
job: machine_learning/response_code.json
datafeed: machine_learning/datafeed_response_code.json

requires.processors:
- name: user_agent
plugin: ingest-user-agent
Expand Down
36 changes: 36 additions & 0 deletions filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,39 @@ def search_objects():
assert len(objects) == 1
o = objects[0]
assert o["x-pipeline"] == "test-pipeline"

@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_machine_learning_nginx(self):
"""
Tests that setup works and loads nginx dashboards.
"""
self.init()
# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
self.render_config_template(
template_name="filebeat_modules",
output=cfgfile,
index_name=self.index_name,
elasticsearch_url=self.elasticsearch_url)

cmd = [
self.filebeat, "-systemTest",
"-e", "-d", "*",
"-c", cfgfile,
"setup", "--modules=nginx", "--machine-learning"]

output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=subprocess.STDOUT,
bufsize=0).wait()

jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")
assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"])

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])
32 changes: 31 additions & 1 deletion libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,20 @@ type Beater interface {
// the beat its run-loop.
type Creator func(*Beat, *common.Config) (Beater, error)

// SetupMLCallback can be used by the Beat to register MachineLearning configurations
// for the enabled modules.
type SetupMLCallback func(*Beat) error

// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Info common.BeatInfo // beat metadata.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher publisher.Publisher // Publisher

SetupMLCallback SetupMLCallback // setup callback for ML job configs
InSetupCmd bool // this is set to true when the `setup` command is called
}

// BeatConfig struct contains the basic configuration of every beat
Expand Down Expand Up @@ -290,6 +297,12 @@ func (b *Beat) launch(bt Creator) error {
if err != nil {
return err
}
if b.SetupMLCallback != nil && *setup {
err = b.SetupMLCallback(b)
if err != nil {
return err
}
}

logp.Info("%s start running.", b.Info.Beat)
defer logp.Info("%s stopped.", b.Info.Beat)
Expand Down Expand Up @@ -322,13 +335,22 @@ func (b *Beat) TestConfig(bt Creator) error {
}

// Setup registers ES index template and kibana dashboards
func (b *Beat) Setup(template, dashboards bool) error {
func (b *Beat) Setup(bt Creator, template, dashboards, machineLearning bool) error {
return handleError(func() error {
err := b.init()
if err != nil {
return err
}

// Tell the beat that we're in the setup command
b.InSetupCmd = true

// Create beater to give it the opportunity to set loading callbacks
_, err = b.createBeater(bt)
if err != nil {
return err
}

if template {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
Expand Down Expand Up @@ -365,6 +387,14 @@ func (b *Beat) Setup(template, dashboards bool) error {
fmt.Println("Loaded dashboards")
}

if machineLearning && b.SetupMLCallback != nil {
err = b.SetupMLCallback(b)
if err != nil {
return err
}
fmt.Println("Loaded machine learning job configurations")
}

return nil
}())
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func GenRootCmdWithRunFlags(name, version string, beatCreator beat.Creator, runF
rootCmd.Use = name

rootCmd.RunCmd = genRunCmd(name, version, beatCreator, runFlags)
rootCmd.SetupCmd = genSetupCmd(name, version)
rootCmd.SetupCmd = genSetupCmd(name, version, beatCreator)
rootCmd.VersionCmd = genVersionCmd(name, version)
rootCmd.ConfigTest = genConfigTestCmd(name, version, beatCreator)

Expand Down
Loading

0 comments on commit 1213483

Please sign in to comment.