Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add parse_aws_vpc_flow_log processor #33656

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9f12d84
Filebeat - Add parse_aws_vpc_flow_log processor
andrewkroh Nov 12, 2022
6bcd888
Use yaml.v2 b/c it's in go.mod
andrewkroh Nov 12, 2022
26c01b1
Add changelog
andrewkroh Nov 12, 2022
3f3effb
Apply suggestions from code review
andrewkroh Nov 14, 2022
610e0cd
Add json encoding comment to String()
andrewkroh Nov 14, 2022
ab1dd4a
rename GoldenTestCase -> goldenTestCase
andrewkroh Nov 14, 2022
be1977f
Add ResetTimer
andrewkroh Nov 14, 2022
5897d05
Typo in timestampType
andrewkroh Nov 14, 2022
f427870
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
andrewkroh Nov 15, 2022
a22d80c
testing.TB -> *testing.T
andrewkroh Nov 15, 2022
7ab42c9
Store golden.json instead of golden.yml
andrewkroh Nov 15, 2022
66851a5
Refactor benchmarks
andrewkroh Nov 15, 2022
fde32b4
Clone strings.Fields and FieldsFunc
andrewkroh Nov 15, 2022
6a771b8
Hack strings.Fields to accept dst []string slice
andrewkroh Nov 15, 2022
bd22108
Replace word iterator with strings.Fields
andrewkroh Nov 15, 2022
1d35407
Allocate map based on expected field count
andrewkroh Nov 15, 2022
3a6e4cd
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
andrewkroh Nov 15, 2022
f91c155
Accept more than one format string
andrewkroh Nov 16, 2022
ee5924b
nolint errorlint on unit test
andrewkroh Nov 16, 2022
70bde35
Store []formatProcessor instead of []*formatProcessor
andrewkroh Nov 16, 2022
0ab19a4
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
andrewkroh Nov 16, 2022
53f7cc9
Add event.action, Map protocol to network.transport
andrewkroh Nov 16, 2022
024b82b
Update ECS mapping in docs
andrewkroh Nov 16, 2022
c644868
Add event.type
andrewkroh Nov 16, 2022
e16f762
Update docs
andrewkroh Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions auditbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:linux_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_script_processor:
:no_timestamp_processor:

Expand Down
1 change: 1 addition & 0 deletions heartbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:no_dashboards:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_timestamp_processor:

include::{libbeat-dir}/shared-beats-attributes.asciidoc[]
Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/processors-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ endif::[]
ifndef::no_rename_processor[]
* <<rename-fields,`rename`>>
endif::[]
ifndef::no_parse_aws_vpc_flow_log_processor[]
* <<processor-parse-aws-vpc-flow-log, `parse_aws_vpc_flow_log`>>
endif::[]
ifndef::no_script_processor[]
* <<processor-script,`script`>>
endif::[]
Expand Down Expand Up @@ -231,6 +234,9 @@ endif::[]
ifndef::no_rename_processor[]
include::{libbeat-processors-dir}/actions/docs/rename.asciidoc[]
endif::[]
ifndef::no_parse_aws_vpc_flow_log_processor[]
include::{x-filebeat-processors-dir}/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc[]
endif::[]
ifndef::no_script_processor[]
include::{libbeat-processors-dir}/script/docs/script.asciidoc[]
endif::[]
Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_timestamp_processor:

:kubernetes_default_indexers: {docdir}/kubernetes-default-indexers-matchers.asciidoc
Expand Down
1 change: 1 addition & 0 deletions packetbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_script_processor:
:no_timestamp_processor:

Expand Down
1 change: 1 addition & 0 deletions winlogbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_only:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:include_translate_sid_processor:
:export_pipeline:

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 101 additions & 0 deletions x-pack/filebeat/processors/aws_vpcflow/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws_vpcflow

import (
"errors"
"fmt"
"strings"
)

// mode represents the processing mode (original, ecs, ecs_and_original).
type mode uint8

const (
originalMode mode = iota // originalMode generates the fields specified in the format string.
ecsMode // ecsMode maps the original fields to ECS and removes the original field if it was mapped.
ecsAndOriginalMode // ecsAndOriginalMode maps the original fields to ECS and retains all the original fields.
)

var modeStrings = map[mode]string{
originalMode: "original",
ecsMode: "ecs",
ecsAndOriginalMode: "ecs_and_original",
}

func (m *mode) Unpack(s string) error {
for modeConst, modeStr := range modeStrings {
if strings.EqualFold(modeStr, s) {
*m = modeConst
return nil
}
}
return fmt.Errorf("invalid mode type <%v> for "+procName, s)
andrewkroh marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *mode) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}
return m.Unpack(str)
}

func (m *mode) String() string {
if s, found := modeStrings[*m]; found {
andrewkroh marked this conversation as resolved.
Show resolved Hide resolved
return s
}
return "unknown mode"
}

// config contains the configuration options for the processor.
type config struct {
Format string `config:"format" validate:"required"` // VPC flow log format.
Mode mode `config:"mode"` // Mode controls what fields are generated.
Field string `config:"field"` // Source field containing the VPC flow log message.
TargetField string `config:"target_field"` // Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures while parsing and transforming the flow log message.
ID string `config:"id"` // Instance ID for debugging purposes.
}

// Validate validates the config settings. It returns an error if the format
// string is invalid.
func (c *config) Validate() error {
_, err := parseFormat(c.Format)
return err
}

func defaultConfig() config {
return config{
Mode: ecsMode,
Field: "message",
TargetField: "aws.vpcflow",
}
}

// parseFormat parses VPC flow log format string and returns an ordered list of
// the expected fields.
func parseFormat(format string) ([]vpcFlowField, error) {
tokens := strings.Fields(format)
if len(tokens) == 0 {
return nil, errors.New("format must contain at lease one field")
}

fields := make([]vpcFlowField, 0, len(tokens))
for _, token := range tokens {
// Elastic uses underscores in field names rather than dashes.
underscoreToken := strings.ReplaceAll(token, "-", "_")
efd6 marked this conversation as resolved.
Show resolved Hide resolved

field, found := nameToFieldMap[underscoreToken]
if !found {
return nil, fmt.Errorf("unknown field %q", token)
}

fields = append(fields, field)
}

return fields, nil
}
75 changes: 75 additions & 0 deletions x-pack/filebeat/processors/aws_vpcflow/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws_vpcflow

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestConfigUnpack(t *testing.T) {
testCases := []struct {
yamlConfig string
error bool
}{
{
yamlConfig: `
---
mode: ecs_and_original
id: us-east-vpcflow
format: instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr
`,
},
{
yamlConfig: `
---
mode: original
format: version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status
`,
},
{
yamlConfig: `
---
mode: ecs
format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status
efd6 marked this conversation as resolved.
Show resolved Hide resolved
`,
},
{
yamlConfig: `
---
mode: ecs
format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status
`,
},
{
error: true,
yamlConfig: `
---
mode: invalid
format: version
`,
},
}

for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
rawConfig := conf.MustNewConfigFrom(tc.yamlConfig)

c := defaultConfig()
err := rawConfig.Unpack(&c)
if tc.error {
t.Log("Error:", err)
require.Error(t, err)
return
}
require.NoError(t, err)
})
}
}
Loading