Skip to content

Commit

Permalink
Add new parser called include_message to filter messages (#32094)
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch authored Jun 27, 2022
1 parent 111cea0 commit c731661
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add `auth.oauth2.google.jwt_json` option to `httpjson` input. {pull}31750[31750]
- Add authentication fields to RabbitMQ module documents. {issue}31159[31159] {pull}31680[31680]
- Add template helper function for decoding hexadecimal strings. {pull}31886[31886]
- Add new `parser` called `include_message` to filter based on message contents. {issue}31794[31794] {pull}32094[32094]

*Auditbeat*

Expand Down
14 changes: 14 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,15 @@ filebeat.inputs:
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, no lines are dropped.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, all the lines are exported.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

### Prospector options
Expand Down Expand Up @@ -327,6 +331,16 @@ filebeat.inputs:
# be used.
#add_error_key: false

#### Filtering messages

# You can filter messsages in the parsers pipeline. Use this method if you would like to
# include or exclude lines before they are aggregated into multiline or the JSON contents
# are parsed.

#parsers:
#- include_message.patterns:
- ["WARN", "ERR"]

#### Multiline options

# Multiline can be used for log messages spanning multiple lines. This is common
Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ filebeat.inputs:

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
Expand Down
22 changes: 22 additions & 0 deletions filebeat/docs/inputs/input-filestream-reader-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,25 @@ The RFC 5424 format accepts the following forms of timestamps:
** `2003-10-11T22:14:15.123456-06:00`

Formats with an asterisk (*) are a non-standard allowance.

[float]
===== `include_message`

Use the `include_message` parser to filter messages in the parsers pipeline. Messages that
match the provided pattern are passed to the next parser, the others are dropped.

You should use `include_message` instead of `include_lines` if you would like to
control when the filtering happens. `include_lines` runs after the parsers, `include_message`
runs in the parsers pipeline.

*`patterns`*:: List of regexp patterns to match.

This example shows you how to include messages that start with the string ERR or WARN:

[source,yaml]
----
paths:
- "/var/log/containers/*.log"
parsers:
- include_message.patterns: ["^ERR", "^WARN"]
----
14 changes: 14 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,15 @@ filebeat.inputs:
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, no lines are dropped.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list. The include_lines is called before
# exclude_lines. By default, all the lines are exported.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

### Prospector options
Expand Down Expand Up @@ -734,6 +738,16 @@ filebeat.inputs:
# be used.
#add_error_key: false

#### Filtering messages

# You can filter messsages in the parsers pipeline. Use this method if you would like to
# include or exclude lines before they are aggregated into multiline or the JSON contents
# are parsed.

#parsers:
#- include_message.patterns:
- ["WARN", "ERR"]

#### Multiline options

# Multiline can be used for log messages spanning multiple lines. This is common
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ filebeat.inputs:

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
# Line filtering happens after the parsers pipeline. If you would like to filter lines
# before parsers, use include_message parser.
#include_lines: ['^ERR', '^WARN']

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
Expand Down
84 changes: 84 additions & 0 deletions libbeat/reader/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filter

import (
"context"
"io"

"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/match"
"github.com/elastic/go-concert/ctxtool"
)

type Config struct {
Patterns []match.Matcher `config:"patterns" validate:"required"`
}

func DefaultConfig() Config {
return Config{}
}

// FilterParser accepts a list of matchers to determine if a line
// should be kept or not. If one of the patterns matches the
// contents of the message, it is returned to the next reader.
// If not, the message is dropped.
type FilterParser struct {
ctx ctxtool.CancelContext
logger *logp.Logger
r reader.Reader
matchers []match.Matcher
}

func NewParser(r reader.Reader, c *Config) *FilterParser {
return &FilterParser{
ctx: ctxtool.WithCancelContext(context.Background()),
logger: logp.NewLogger("filter_parser"),
r: r,
matchers: c.Patterns,
}
}

func (p *FilterParser) Next() (reader.Message, error) {
for p.ctx.Err() == nil {
message, err := p.r.Next()
if err != nil {
return message, err
}
if p.matchAny(string(message.Content)) {
return message, err
}
p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content))
}
return reader.Message{}, io.EOF
}

func (p *FilterParser) matchAny(text string) bool {
for _, m := range p.matchers {
if m.MatchString(text) {
return true
}
}
return false
}

func (p *FilterParser) Close() error {
p.ctx.Cancel()
return p.r.Close()
}
130 changes: 130 additions & 0 deletions libbeat/reader/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filter

import (
"io"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/elastic-agent-libs/config"
)

func TestParser(t *testing.T) {
tests := map[string]struct {
config map[string]interface{}
input []reader.Message
expectedMessageContent [][]byte
}{
"keep all messages": {
config: map[string]interface{}{
"patterns": []string{"this matches*"},
},
input: []reader.Message{
{
Content: []byte("this matches"),
},
{
Content: []byte("this matches again"),
},
},
expectedMessageContent: [][]byte{
[]byte("this matches"),
[]byte("this matches again"),
},
},
"keep all messages with multiple patterns": {
config: map[string]interface{}{
"patterns": []string{"this matches*", "should match as well*"},
},
input: []reader.Message{
{
Content: []byte("this matches"),
},
{
Content: []byte("should match as well"),
},
},
expectedMessageContent: [][]byte{
[]byte("this matches"),
[]byte("should match as well"),
},
},
"keep one message": {
config: map[string]interface{}{
"patterns": []string{"this matches*"},
},
input: []reader.Message{
{
Content: []byte("this matches"),
},
{
Content: []byte("this does not match"),
},
},
expectedMessageContent: [][]byte{
[]byte("this matches"),
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
var c Config
cfg := config.MustNewConfigFrom(test.config)
err := cfg.Unpack(&c)
require.NoError(t, err)
r := NewParser(newTestReader(test.input), &c)

contents := make([][]byte, 0)
msg, err := r.Next()
for err == nil {
contents = append(contents, msg.Content)
msg, err = r.Next()
}
require.ElementsMatch(t, test.expectedMessageContent, contents)
})

}
}

type testReader struct {
msg []reader.Message
idx int
}

func newTestReader(input []reader.Message) reader.Reader {
return &testReader{
msg: input,
idx: 0,
}
}

func (r *testReader) Next() (reader.Message, error) {
if r.idx == len(r.msg) {
return reader.Message{}, io.EOF
}

m := r.msg[r.idx]
r.idx += 1
return m, nil
}

func (r *testReader) Close() error { return nil }
Loading

0 comments on commit c731661

Please sign in to comment.