Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add parse_aws_vpc_flow_log processor #33656

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9f12d84
Filebeat - Add parse_aws_vpc_flow_log processor
andrewkroh Nov 12, 2022
6bcd888
Use yaml.v2 b/c it's in go.mod
andrewkroh Nov 12, 2022
26c01b1
Add changelog
andrewkroh Nov 12, 2022
3f3effb
Apply suggestions from code review
andrewkroh Nov 14, 2022
610e0cd
Add json encoding comment to String()
andrewkroh Nov 14, 2022
ab1dd4a
rename GoldenTestCase -> goldenTestCase
andrewkroh Nov 14, 2022
be1977f
Add ResetTimer
andrewkroh Nov 14, 2022
5897d05
Typo in timestampType
andrewkroh Nov 14, 2022
f427870
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
andrewkroh Nov 15, 2022
a22d80c
testing.TB -> *testing.T
andrewkroh Nov 15, 2022
7ab42c9
Store golden.json instead of golden.yml
andrewkroh Nov 15, 2022
66851a5
Refactor benchmarks
andrewkroh Nov 15, 2022
fde32b4
Clone strings.Fields and FieldsFunc
andrewkroh Nov 15, 2022
6a771b8
Hack strings.Fields to accept dst []string slice
andrewkroh Nov 15, 2022
bd22108
Replace word iterator with strings.Fields
andrewkroh Nov 15, 2022
1d35407
Allocate map based on expected field count
andrewkroh Nov 15, 2022
3a6e4cd
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
andrewkroh Nov 15, 2022
f91c155
Accept more than one format string
andrewkroh Nov 16, 2022
ee5924b
nolint errorlint on unit test
andrewkroh Nov 16, 2022
70bde35
Store []formatProcessor instead of []*formatProcessor
andrewkroh Nov 16, 2022
0ab19a4
Merge remote-tracking branch 'elastic/main' into feature/fb/parse-aws…
andrewkroh Nov 16, 2022
53f7cc9
Add event.action, Map protocol to network.transport
andrewkroh Nov 16, 2022
024b82b
Update ECS mapping in docs
andrewkroh Nov 16, 2022
c644868
Add event.type
andrewkroh Nov 16, 2022
e16f762
Update docs
andrewkroh Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions x-pack/filebeat/processors/aws_vpcflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,32 @@ func (m *mode) String() string {

// config contains the configuration options for the processor.
type config struct {
Format string `config:"format" validate:"required"` // VPC flow log format.
Mode mode `config:"mode"` // Mode controls what fields are generated.
Field string `config:"field"` // Source field containing the VPC flow log message.
TargetField string `config:"target_field"` // Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures while parsing and transforming the flow log message.
ID string `config:"id"` // Instance ID for debugging purposes.
Format formats `config:"format" validate:"required"` // VPC flow log format. In config, it can accept a string or list of strings. Each format must have a unique number of fields to enable matching it to a flow log message.
Mode mode `config:"mode"` // Mode controls what fields are generated.
Field string `config:"field"` // Source field containing the VPC flow log message.
TargetField string `config:"target_field"` // Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures while parsing and transforming the flow log message.
ID string `config:"id"` // Instance ID for debugging purposes.
}

// Validate validates the config settings. It returns an error if the format
// string is invalid.
// Validate validates the format strings. Each format must have a unique number
// of fields.
func (c *config) Validate() error {
_, err := parseFormat(c.Format)
return err
counts := map[int]struct{}{}
for _, format := range c.Format {
fields, err := parseFormat(format)
if err != nil {
return err
}

_, found := counts[len(fields)]
if found {
return fmt.Errorf("each format must have a unique number of fields")
}
counts[len(fields)] = struct{}{}
}
return nil
}

func defaultConfig() config {
Expand Down Expand Up @@ -102,3 +114,27 @@ func parseFormat(format string) ([]vpcFlowField, error) {

return fields, nil
}

type formats []string

func (f *formats) Unpack(value interface{}) error {
switch v := value.(type) {
case string:
*f = []string{v}
case []string:
*f = v
case []interface{}:
list := make([]string, 0, len(v))
for _, ifc := range v {
s, ok := ifc.(string)
if !ok {
return fmt.Errorf("format values must be strings, got %T", ifc)
}
list = append(list, s)
}
*f = list
default:
return fmt.Errorf("format must be a string or list of strings, got %T", v)
}
return nil
}
28 changes: 27 additions & 1 deletion x-pack/filebeat/processors/aws_vpcflow/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ format: version srcaddr dstaddr srcport dstport protocol start end type packets
---
mode: invalid
format: version
`,
},
{
error: false,
yamlConfig: `
---
mode: ecs
format:
- version srcaddr dstaddr
- version srcaddr dstaddr srcport dstport protocol
`,
},
{
// Each format must have a unique token count.
error: true,
yamlConfig: `
---
mode: ecs
format:
- version srcaddr dstaddr
- srcport dstport protocol
`,
},
}
Expand All @@ -65,11 +86,16 @@ format: version
c := defaultConfig()
err := rawConfig.Unpack(&c)
if tc.error {
require.Error(t, err, "config: %v", tc.yamlConfig)
t.Log("Error:", err)
require.Error(t, err)
return
}
require.NoError(t, err)

// Make sure valid configs produce processors.
p, err := New(rawConfig)
require.NoError(t, err)
require.NotNil(t, p)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The `parse_aws_vpc_flow_log` processor has the following configuration settings.
| Name | Required | Default | Description |
| `field` | no | `message` | Source field containing the VPC flow log message. |
| `target_field` | no | `aws.vpcflow` | Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location. |
| `format` | yes | | VPC flow log format. This supports VPC flow log fields from versions 2 through 5. |
| `format` | yes | | VPC flow log format. This supports VPC flow log fields from versions 2 through 5. It will accept a string or a list of strings. Each format must have a unique number of fields to enable matching it to a flow log message.|
| `mode` | no | `ecs` | Mode controls what fields are generated. The available options are `original`, `ecs`, and `ecs_and_original`. `original` generates the fields specified in the format string. `ecs` maps the original fields to ECS and removes the original fields that are mapped to ECS. `ecs_and_original` maps the original fields to ECS and retains all the original fields. |
| `ignore_missing` | no | false | Ignore missing source field. |
| `ignore_failure` | no | false | Ignore failures while parsing and transforming the flow log message. |
Expand Down
99 changes: 68 additions & 31 deletions x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ var (

type processor struct {
config
fields []vpcFlowField
log *logp.Logger
originalFieldCount int
expectedIPCount int
formats []*formatProcessor
andrewkroh marked this conversation as resolved.
Show resolved Hide resolved
}

// New constructs a new processor built from ucfg config.
Expand All @@ -58,35 +55,23 @@ func newParseAWSVPCFlowLog(c config) (*processor, error) {
log = log.With("instance_id", c.ID)
}

fields, err := parseFormat(c.Format)
if err != nil {
return nil, fmt.Errorf("failed to parse vpc flow log format: %w", err)
}

var ipCount int
for _, f := range fields {
if f.Type == ipType {
ipCount++
}
// Validate configs that did not pass through go-ucfg.
if err := c.Validate(); err != nil {
return nil, err
}

originalFieldCount := len(fields)
if c.Mode == ecsMode {
for _, f := range fields {
// If an ECS mapping exists then ECS mode will not include the
// original field.
if len(f.ECSMappings) > 0 {
originalFieldCount--
}
var formatProcessors []*formatProcessor
for _, f := range c.Format {
p, err := newFormatProcessor(c, log, f)
if err != nil {
return nil, err
}
formatProcessors = append(formatProcessors, p)
}

return &processor{
config: c,
fields: fields,
originalFieldCount: originalFieldCount,
expectedIPCount: ipCount,
log: log,
config: c,
formats: formatProcessors,
}, nil
}

Expand Down Expand Up @@ -124,14 +109,66 @@ func (p *processor) run(event *beat.Event) error {
}
substrings := dst[:n]

// Find the matching format based on substring count.
for _, format := range p.formats {
if len(format.fields) == n {
return format.process(substrings, event)
}
}
return errInvalidFormat
}

// formatProcessor processes an event using a single VPC flow log format.
type formatProcessor struct {
config
log *logp.Logger
fields []vpcFlowField
originalFieldCount int
expectedIPCount int
}

func newFormatProcessor(c config, log *logp.Logger, format string) (*formatProcessor, error) {
fields, err := parseFormat(format)
if err != nil {
return nil, fmt.Errorf("failed to parse vpc flow log format: %w", err)
}

originalFieldCount := len(fields)
if c.Mode == ecsMode {
for _, f := range fields {
// If an ECS mapping exists then ECS mode will not include the
// original field.
if len(f.ECSMappings) > 0 {
originalFieldCount--
}
}
}

var ipCount int
for _, f := range fields {
if f.Type == ipType {
ipCount++
}
}

return &formatProcessor{
config: c,
log: log,
fields: fields,
originalFieldCount: originalFieldCount,
expectedIPCount: ipCount,
}, nil
}

func (p *formatProcessor) process(substrings []string, event *beat.Event) error {
originalFields := make(mapstr.M, p.originalFieldCount)

var relatedIPs []string
if p.Mode > originalMode {
// Allocate space for the expected number of IPs assuming all are unique.
relatedIPs = make([]string, 0, p.expectedIPCount)
}

originalFields := make(mapstr.M, p.originalFieldCount)

// Iterate over the substrings in the source string and apply type
// conversion and then ECS mappings.
for i, word := range substrings {
Expand Down Expand Up @@ -175,12 +212,12 @@ func (p *processor) run(event *beat.Event) error {
}
}

if _, err = event.PutValue(p.TargetField, originalFields); err != nil {
if _, err := event.PutValue(p.TargetField, originalFields); err != nil {
return err
}

if len(relatedIPs) > 0 {
if _, err = event.PutValue("related.ip", relatedIPs); err != nil {
if _, err := event.PutValue("related.ip", relatedIPs); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
"testing"
"time"

"gopkg.in/yaml.v2"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-libs/mapstr"

Expand All @@ -31,14 +30,20 @@ const (
)

func TestProcessorRun(t *testing.T) {
t.Run("v5-mode-ecs_and_original", func(t *testing.T) {
t.Run("ecs_and_original-mode-v5-message", func(t *testing.T) {
c := defaultConfig()
c.Format = formatV5
c.Format = []string{
"version account-id", // Not a match.
formatV5,
}
c.Mode = ecsAndOriginalMode

p, err := newParseAWSVPCFlowLog(c)
require.NoError(t, err)

assert.Contains(t, p.String(), procName+"=")
assert.Contains(t, p.String(), formatV5)

evt := beat.Event{
Timestamp: time.Now().UTC(),
Fields: map[string]interface{}{
Expand Down Expand Up @@ -152,7 +157,7 @@ func TestGoldenFile(t *testing.T) {

t.Run(tc.Name, func(t *testing.T) {
c := defaultConfig()
c.Format = tc.Format
c.Format = []string{tc.Format}
if tc.Mode != nil {
c.Mode = *tc.Mode
}
Expand Down Expand Up @@ -260,7 +265,7 @@ func BenchmarkProcessorRun(b *testing.B) {
benchmark := benchmark
b.Run(benchmark.name, func(b *testing.B) {
c := defaultConfig()
c.Format = benchmark.format
c.Format = []string{benchmark.format}
c.Mode = benchmark.mode

p, err := newParseAWSVPCFlowLog(c)
Expand Down