Skip to content

Commit

Permalink
Feature: Dissect processor (#6925)
Browse files Browse the repository at this point in the history
Implement the Dissect Tokenizer from `logstash-input-dissect`[1]

This tokenizer allows you to define patterns of strings and extract the
relevant informations. It also permet to do some string manipulations
when extracting the keys.

Example tokenizer:

```yaml
tokenizer: "%{at} - [%{machine}] %{code} - %{message}"
message: "10/10/2017 - [wopr] 1 - oh fire fire!"
result:
  at: "10/10/2017"
  machine: "wopr"
  code: "1"
  message: "of fire fire!"
```

```yaml
tokenizer: "%{?key} %{&key}"
message: "hello world"
result:
  hello: "world"
```

Example of configuration:

```yaml
processors:
 - dissect:
    tokenizer: "%{key1} - %{key2}"
    field: "message"
    target_field: "extracted"
```

Dissect support a few more features:

- Indirect field
- Append
- skip field
- Greedy padding for CSV file

[1]: https://github.com/logstash-plugins/logstash-filter-dissect
  • Loading branch information
ph authored and ruflin committed May 29, 2018
1 parent 7abc684 commit 81c8671
Show file tree
Hide file tree
Showing 23 changed files with 1,426 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add default_fields to Elasticsearch template when connecting to Elasticsearch >= 7.0. {pull}7015[7015]
- Add support for loading a template.json file directly instead of using fields.yml. {pull}7039[7039]
- Add support for keyword multifields in field.yml. {pull}7131[7131]
- Add dissect processor. {pull}6925[6925]

*Auditbeat*

Expand Down
8 changes: 8 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ auditbeat.modules:
# - from: "a"
# to: "b"
#
# The following example tokenizes the string into fields:
#
#processors:
#- dissect:
# tokenizer: "%{key1} - %{key2}"
# field: "message"
# target_prefix: "dissect"
#
# The following example enriches each event with metadata from the cloud
# provider about the host machine. It works on EC2, GCE, DigitalOcean,
# Tencent Cloud, and Alibaba Cloud.
Expand Down
8 changes: 8 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,14 @@ filebeat.inputs:
# - from: "a"
# to: "b"
#
# The following example tokenizes the string into fields:
#
#processors:
#- dissect:
# tokenizer: "%{key1} - %{key2}"
# field: "message"
# target_prefix: "dissect"
#
# The following example enriches each event with metadata from the cloud
# provider about the host machine. It works on EC2, GCE, DigitalOcean,
# Tencent Cloud, and Alibaba Cloud.
Expand Down
77 changes: 77 additions & 0 deletions filebeat/tests/system/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,80 @@ def test_condition(self):
assert "beat.name" in output
assert "message" in output
assert "test" in output["message"]

def test_dissect_good_tokenizer(self):
"""
Check dissect with a good tokenizer
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"dissect": {
"tokenizer": "\"%{key} world\"",
"field": "message",
"target_prefix": "extracted"
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("Hello world\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp"],
)[0]
assert output["extracted.key"] == "Hello"

def test_dissect_defaults(self):
"""
Check dissect defaults
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"dissect": {
"tokenizer": "\"%{key} world\"",
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("Hello world\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp"],
)[0]
assert output["dissect.key"] == "Hello"

def test_dissect_bad_tokenizer(self):
"""
Check dissect with a bad tokenizer
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"dissect": {
"tokenizer": "\"not %{key} world\"",
"field": "message",
"target_prefix": "extracted"
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("Hello world\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp"],
)[0]
assert "extracted.key" not in output
assert output["message"] == "Hello world"
8 changes: 8 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ heartbeat.scheduler:
# - from: "a"
# to: "b"
#
# The following example tokenizes the string into fields:
#
#processors:
#- dissect:
# tokenizer: "%{key1} - %{key2}"
# field: "message"
# target_prefix: "dissect"
#
# The following example enriches each event with metadata from the cloud
# provider about the host machine. It works on EC2, GCE, DigitalOcean,
# Tencent Cloud, and Alibaba Cloud.
Expand Down
8 changes: 8 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@
# - from: "a"
# to: "b"
#
# The following example tokenizes the string into fields:
#
#processors:
#- dissect:
# tokenizer: "%{key1} - %{key2}"
# field: "message"
# target_prefix: "dissect"
#
# The following example enriches each event with metadata from the cloud
# provider about the host machine. It works on EC2, GCE, DigitalOcean,
# Tencent Cloud, and Alibaba Cloud.
Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/dissect"

// Register autodiscover providers
_ "github.com/elastic/beats/libbeat/autodiscover/providers/docker"
Expand Down
32 changes: 32 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The supported processors are:
* <<add-kubernetes-metadata,`add_kubernetes_metadata`>>
* <<add-docker-metadata,`add_docker_metadata`>>
* <<add-host-metadata,`add_host_metadata`>>
* <<dissect, `dissect`>>

[[conditions]]
==== Conditions
Expand Down Expand Up @@ -761,3 +762,34 @@ The fields added to the event are looking as following:
-------------------------------------------------------------------------------

NOTE: The host information is refreshed every 5 minutes.

[[dissect]]
=== Dissect strings

The dissect processor tokenizes incoming strings using defined patterns.

[source,yaml]
-------
processors:
- dissect:
tokenizer: "%{key1} %{key2}"
field: "message"
target_prefix: "dissect"
-------

The `dissect` processor has the following configuration settings:

`field`:: (Optional) The event field to tokenize. Default is `message`.

`target_prefix`:: (Optional) The name of the field where the values will be extracted. When an empty
string is defined, the processor will create the keys at the root of the event. Default is
`dissect`. When the target key already exists in the event, the processor won't replace it and log
an error; you need to either drop or rename the key before using dissect.

For tokenization to be successful, all keys must be found and extracted, if one of them cannot be
found an error will be logged and no modification is done on the original event.

NOTE: A key can contain any characters except reserved suffix or prefix modifiers: `/`,`&`, `+`
and `?`.

See <<conditions>> for a list of supported conditions.
1 change: 1 addition & 0 deletions libbeat/processors/dissect/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dissect_tests.json
25 changes: 25 additions & 0 deletions libbeat/processors/dissect/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dissect

type config struct {
Tokenizer *tokenizer `config:"tokenizer"`
Field string `config:"field"`
TargetPrefix string `config:"target_prefix"`
}

var defaultConfig = config{
Field: "message",
TargetPrefix: "dissect",
}

// tokenizer add validation at the unpack level for this specific field.
type tokenizer = Dissector

// Unpack a tokenizer into a dissector this will trigger the normal validation of the dissector.
func (t *tokenizer) Unpack(v string) error {
d, err := New(v)
if err != nil {
return err
}
*t = *d
return nil
}
43 changes: 43 additions & 0 deletions libbeat/processors/dissect/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package dissect

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
)

func TestTokenizerType(t *testing.T) {
t.Run("valid", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "%{value1}",
"field": "message",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.NoError(t, err) {
return
}
})

t.Run("invalid", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "%value1}",
"field": "message",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.Error(t, err) {
return
}
})
}
32 changes: 32 additions & 0 deletions libbeat/processors/dissect/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dissect

import (
"errors"
"regexp"
)

var (
// delimiterRE tokenizes the following string into walkable with extracted delimiter + key.
// string:
// ` %{key}, %{key/2}`
// into:
// [["", "key" ], [", ", "key/2"]]
delimiterRE = regexp.MustCompile("(?s)(.*?)%\\{([^}]*?)}")
suffixRE = regexp.MustCompile("(.+?)(/(\\d{1,2}))?(->)?$")

skipFieldPrefix = "?"
appendFieldPrefix = "+"
indirectFieldPrefix = "&"
appendIndirectPrefix = "+&"
indirectAppendPrefix = "&+"
greedySuffix = "->"

defaultJoinString = " "

errParsingFailure = errors.New("parsing failure")
errInvalidTokenizer = errors.New("invalid dissect tokenizer")
errEmpty = errors.New("empty string provided")
errMixedPrefixIndirectAppend = errors.New("mixed prefix `&+`")
errMixedPrefixAppendIndirect = errors.New("mixed prefix `&+`")
errEmptyKey = errors.New("empty key")
)
Loading

0 comments on commit 81c8671

Please sign in to comment.