From 4a166ce674620daddd7aa89e02f39acb20ce4d91 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 16 Nov 2022 13:41:14 -0500 Subject: [PATCH] [Filebeat] Add parse_aws_vpc_flow_log processor (#33656) This is a processor for parsing AWS VPC flow logs. It requires a user specified log format. It can populate the original flow log fields, ECS fields, or both. Usage: ```yaml processors: - parse_aws_vpc_flow_log: format: version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status - community_id: ~ ``` Benchmark: ``` goos: darwin goarch: arm64 pkg: github.com/elastic/beats/v7/x-pack/filebeat/processors/aws_vpcflow BenchmarkProcessorRun/original-mode-v5-message-10 2810948 2138 ns/op 2836 B/op 31 allocs/op BenchmarkProcessorRun/ecs-mode-v5-message-10 1914754 3107 ns/op 1908 B/op 41 allocs/op BenchmarkProcessorRun/ecs_and_original-mode-v5-message-10 1693279 3538 ns/op 3076 B/op 41 allocs/op ``` Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com> (cherry picked from commit 1a86e4259a6df2aeccbbcc704f1cad06904947cb) --- CHANGELOG.next.asciidoc | 1 + auditbeat/docs/index.asciidoc | 1 + heartbeat/docs/index.asciidoc | 1 + libbeat/docs/processors-list.asciidoc | 6 + metricbeat/docs/index.asciidoc | 1 + packetbeat/docs/index.asciidoc | 1 + winlogbeat/docs/index.asciidoc | 1 + x-pack/filebeat/include/list.go | 1 + .../filebeat/processors/aws_vpcflow/config.go | 140 +++++ .../processors/aws_vpcflow/config_test.go | 101 ++++ .../docs/parse_aws_vpc_flow_log.asciidoc | 218 ++++++++ .../aws_vpcflow/internal/strings/strings.go | 133 +++++ .../internal/strings/strings_test.go | 156 ++++++ .../processors/aws_vpcflow/mapping.go | 295 ++++++++++ .../aws_vpcflow/parse_aws_vpc_flow_log.go | 245 +++++++++ .../parse_aws_vpc_flow_log_test.go | 291 ++++++++++ .../testdata/aws-vpc-flow-logs.yml | 75 +++ .../testdata/custom-nat-gateway.golden.json | 206 +++++++ .../testdata/default-v2-format.golden.json | 303 +++++++++++ .../service-name-path-direction.golden.json | 140 +++++ .../testdata/tcp-flag-sequence.golden.json | 511 ++++++++++++++++++ .../testdata/transit-gateway.golden.json | 228 ++++++++ .../testdata/v5-fields-ecs.golden.json | 74 +++ .../v5-fields-ecs_and_original.golden.json | 89 +++ .../testdata/v5-fields-original.golden.json | 41 ++ .../filebeat/processors/aws_vpcflow/types.go | 83 +++ x-pack/functionbeat/docs/index.asciidoc | 1 + 27 files changed, 3343 insertions(+) create mode 100644 x-pack/filebeat/processors/aws_vpcflow/config.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/config_test.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc create mode 100644 x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings_test.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/mapping.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log_test.go create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/aws-vpc-flow-logs.yml create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/custom-nat-gateway.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/default-v2-format.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/service-name-path-direction.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/tcp-flag-sequence.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/transit-gateway.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs_and_original.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-original.golden.json create mode 100644 x-pack/filebeat/processors/aws_vpcflow/types.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 46c084c1fd0c..854f87404352 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -157,6 +157,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Improve httpjson documentation for split processor. {pull}33473[33473] - Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499] - Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456] +- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656] - Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658] - Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673] - Add Common Expression Language input. {pull}31233[31233] diff --git a/auditbeat/docs/index.asciidoc b/auditbeat/docs/index.asciidoc index 56f3d7cc7560..6afd1de2b6a5 100644 --- a/auditbeat/docs/index.asciidoc +++ b/auditbeat/docs/index.asciidoc @@ -22,6 +22,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :linux_os: :no_decode_cef_processor: :no_decode_csv_fields_processor: +:no_parse_aws_vpc_flow_log_processor: :no_script_processor: :no_timestamp_processor: diff --git a/heartbeat/docs/index.asciidoc b/heartbeat/docs/index.asciidoc index 144e71918b81..d323e65baf2a 100644 --- a/heartbeat/docs/index.asciidoc +++ b/heartbeat/docs/index.asciidoc @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :no_dashboards: :no_decode_cef_processor: :no_decode_csv_fields_processor: +:no_parse_aws_vpc_flow_log_processor: :no_timestamp_processor: include::{libbeat-dir}/shared-beats-attributes.asciidoc[] diff --git a/libbeat/docs/processors-list.asciidoc b/libbeat/docs/processors-list.asciidoc index c09ad5ac3a35..19d352816506 100644 --- a/libbeat/docs/processors-list.asciidoc +++ b/libbeat/docs/processors-list.asciidoc @@ -104,6 +104,9 @@ endif::[] ifndef::no_rename_processor[] * <> endif::[] +ifndef::no_parse_aws_vpc_flow_log_processor[] +* <> +endif::[] ifndef::no_script_processor[] * <> endif::[] @@ -231,6 +234,9 @@ endif::[] ifndef::no_rename_processor[] include::{libbeat-processors-dir}/actions/docs/rename.asciidoc[] endif::[] +ifndef::no_parse_aws_vpc_flow_log_processor[] +include::{x-filebeat-processors-dir}/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc[] +endif::[] ifndef::no_script_processor[] include::{libbeat-processors-dir}/script/docs/script.asciidoc[] endif::[] diff --git a/metricbeat/docs/index.asciidoc b/metricbeat/docs/index.asciidoc index 7dedaf6bda55..0b7937787f2f 100644 --- a/metricbeat/docs/index.asciidoc +++ b/metricbeat/docs/index.asciidoc @@ -28,6 +28,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :win_os: :no_decode_cef_processor: :no_decode_csv_fields_processor: +:no_parse_aws_vpc_flow_log_processor: :no_timestamp_processor: :kubernetes_default_indexers: {docdir}/kubernetes-default-indexers-matchers.asciidoc diff --git a/packetbeat/docs/index.asciidoc b/packetbeat/docs/index.asciidoc index 7ff9a667c6d2..0f66d2dde5d0 100644 --- a/packetbeat/docs/index.asciidoc +++ b/packetbeat/docs/index.asciidoc @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :win_os: :no_decode_cef_processor: :no_decode_csv_fields_processor: +:no_parse_aws_vpc_flow_log_processor: :no_script_processor: :no_timestamp_processor: diff --git a/winlogbeat/docs/index.asciidoc b/winlogbeat/docs/index.asciidoc index 97c1023d4c26..80ed3ef23451 100644 --- a/winlogbeat/docs/index.asciidoc +++ b/winlogbeat/docs/index.asciidoc @@ -20,6 +20,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :win_only: :no_decode_cef_processor: :no_decode_csv_fields_processor: +:no_parse_aws_vpc_flow_log_processor: :include_translate_sid_processor: :export_pipeline: diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index c1382ae36275..f7c35308ed32 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -65,5 +65,6 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/module/zoom" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/zscaler" _ "github.com/elastic/beats/v7/x-pack/filebeat/processors/add_nomad_metadata" + _ "github.com/elastic/beats/v7/x-pack/filebeat/processors/aws_vpcflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/processors/decode_cef" ) diff --git a/x-pack/filebeat/processors/aws_vpcflow/config.go b/x-pack/filebeat/processors/aws_vpcflow/config.go new file mode 100644 index 000000000000..39616d49a6d1 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/config.go @@ -0,0 +1,140 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws_vpcflow + +import ( + "errors" + "fmt" + "strings" +) + +// mode represents the processing mode (original, ecs, ecs_and_original). +type mode uint8 + +const ( + originalMode mode = iota // originalMode generates the fields specified in the format string. + ecsMode // ecsMode maps the original fields to ECS and removes the original field if it was mapped. + ecsAndOriginalMode // ecsAndOriginalMode maps the original fields to ECS and retains all the original fields. +) + +var modeStrings = map[mode]string{ + originalMode: "original", + ecsMode: "ecs", + ecsAndOriginalMode: "ecs_and_original", +} + +func (m *mode) Unpack(s string) error { + for modeConst, modeStr := range modeStrings { + if strings.EqualFold(modeStr, s) { + *m = modeConst + return nil + } + } + return fmt.Errorf("invalid mode type %q for "+procName, s) +} + +func (m *mode) UnmarshalYAML(unmarshal func(interface{}) error) error { + var str string + if err := unmarshal(&str); err != nil { + return err + } + return m.Unpack(str) +} + +func (m *mode) String() string { + if m == nil { + return "" + } + if s, found := modeStrings[*m]; found { + return s + } + return "unknown mode" +} + +// config contains the configuration options for the processor. +type config struct { + Format formats `config:"format" validate:"required"` // VPC flow log format. In config, it can accept a string or list of strings. Each format must have a unique number of fields to enable matching it to a flow log message. + Mode mode `config:"mode"` // Mode controls what fields are generated. + Field string `config:"field"` // Source field containing the VPC flow log message. + TargetField string `config:"target_field"` // Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location. + IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field. + IgnoreFailure bool `config:"ignore_failure"` // Ignore failures while parsing and transforming the flow log message. + ID string `config:"id"` // Instance ID for debugging purposes. +} + +// Validate validates the format strings. Each format must have a unique number +// of fields. +func (c *config) Validate() error { + counts := map[int]struct{}{} + for _, format := range c.Format { + fields, err := parseFormat(format) + if err != nil { + return err + } + + _, found := counts[len(fields)] + if found { + return fmt.Errorf("each format must have a unique number of fields") + } + counts[len(fields)] = struct{}{} + } + return nil +} + +func defaultConfig() config { + return config{ + Mode: ecsMode, + Field: "message", + TargetField: "aws.vpcflow", + } +} + +// parseFormat parses VPC flow log format string and returns an ordered list of +// the expected fields. +func parseFormat(format string) ([]vpcFlowField, error) { + tokens := strings.Fields(format) + if len(tokens) == 0 { + return nil, errors.New("format must contain at lease one field") + } + + fields := make([]vpcFlowField, 0, len(tokens)) + for _, token := range tokens { + // Elastic uses underscores in field names rather than dashes. + underscoreToken := strings.ReplaceAll(token, "-", "_") + + field, found := nameToFieldMap[underscoreToken] + if !found { + return nil, fmt.Errorf("unknown field %q", token) + } + + fields = append(fields, field) + } + + return fields, nil +} + +type formats []string + +func (f *formats) Unpack(value interface{}) error { + switch v := value.(type) { + case string: + *f = []string{v} + case []string: + *f = v + case []interface{}: + list := make([]string, 0, len(v)) + for _, ifc := range v { + s, ok := ifc.(string) + if !ok { + return fmt.Errorf("format values must be strings, got %T", ifc) + } + list = append(list, s) + } + *f = list + default: + return fmt.Errorf("format must be a string or list of strings, got %T", v) + } + return nil +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/config_test.go b/x-pack/filebeat/processors/aws_vpcflow/config_test.go new file mode 100644 index 000000000000..83c2db01e2d4 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/config_test.go @@ -0,0 +1,101 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws_vpcflow + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestConfigUnpack(t *testing.T) { + testCases := []struct { + yamlConfig string + error bool + }{ + { + yamlConfig: ` +--- +mode: ecs_and_original +id: us-east-vpcflow +format: instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr +`, + }, + { + yamlConfig: ` +--- +mode: original +format: version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status +`, + }, + { + yamlConfig: ` +--- +mode: ecs +format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status +`, + }, + { + yamlConfig: ` +--- +mode: ecs +format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status +`, + }, + { + error: true, + yamlConfig: ` +--- +mode: invalid +format: version +`, + }, + { + error: false, + yamlConfig: ` +--- +mode: ecs +format: + - version srcaddr dstaddr + - version srcaddr dstaddr srcport dstport protocol +`, + }, + { + // Each format must have a unique token count. + error: true, + yamlConfig: ` +--- +mode: ecs +format: + - version srcaddr dstaddr + - srcport dstport protocol +`, + }, + } + + for i, tc := range testCases { + tc := tc + t.Run(strconv.Itoa(i), func(t *testing.T) { + rawConfig := conf.MustNewConfigFrom(tc.yamlConfig) + + c := defaultConfig() + err := rawConfig.Unpack(&c) + if tc.error { + require.Error(t, err, "config: %v", tc.yamlConfig) + t.Log("Error:", err) + return + } + require.NoError(t, err) + + // Make sure valid configs produce processors. + p, err := New(rawConfig) + require.NoError(t, err) + require.NotNil(t, p) + }) + } +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc b/x-pack/filebeat/processors/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc new file mode 100644 index 000000000000..9c9b1f4539cb --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc @@ -0,0 +1,218 @@ +[[processor-parse-aws-vpc-flow-log]] +[role="xpack"] +=== Parse AWS VPC Flow Log + +++++ +parse_aws_vpc_flow_log +++++ + +The `parse_aws_vpc_flow_log` processor decodes AWS VPC Flow log messages. + +Below is an example configuration that decodes the `message` field using the +default version 2 VPC flow log format. + +[source,yaml] +---- +processors: + - parse_aws_vpc_flow_log: + format: version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status + field: message +---- + +The `parse_aws_vpc_flow_log` processor has the following configuration settings. + +.Parse AWS VPC Flow Log options +[options="header"] +|====== +| Name | Required | Default | Description | +| `field` | no | `message` | Source field containing the VPC flow log message. | +| `target_field` | no | `aws.vpcflow` | Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location. | +| `format` | yes | | VPC flow log format. This supports VPC flow log fields from versions 2 through 5. It will accept a string or a list of strings. Each format must have a unique number of fields to enable matching it to a flow log message.| +| `mode` | no | `ecs` | Mode controls what fields are generated. The available options are `original`, `ecs`, and `ecs_and_original`. `original` generates the fields specified in the format string. `ecs` maps the original fields to ECS and removes the original fields that are mapped to ECS. `ecs_and_original` maps the original fields to ECS and retains all the original fields. | +| `ignore_missing` | no | false | Ignore missing source field. | +| `ignore_failure` | no | false | Ignore failures while parsing and transforming the flow log message. | +| `id` | no | | Instance ID for debugging purposes. | +|====== + +[float] +=== Modes + +[float] +==== Original + +This mode returns the same fields found in the `format` string. It will drop any +fields whose value a dash (`-`). It converts the strings into the appropriate +data types. These are the known field names and their data types. + +NOTE: The AWS VPC flow field names use underscores instead of dashes within +Filebeat. You may configure the `format` using field names that contain either. + +[options="header"] +|====== +| VPC Flow Log Field | Data Type | +| account_id | string | +| action | string | +| az_id | string | +| bytes | long | +| dstaddr | ip | +| dstport | integer | +| end | timestamp | +| flow_direction | string | +| instance_id | string | +| interface_id | string | +| log_status | string | +| packets | long | +| pkt_dst_aws_service | string | +| pkt_dstaddr | ip | +| pkt_src_aws_service | string | +| pkt_srcaddr | ip | +| protocol | integer | +| region | string | +| srcaddr | ip | +| srcport | integer | +| start | timestamp | +| sublocation_id | string | +| sublocation_type | string | +| subnet_id | string | +| tcp_flags | integer | +| tcp_flags_array* | integer | +| traffic_path | integer | +| type | string | +| version | integer | +| vpc_id | string | +|====== + +[float] +==== ECS + +This mode maps the original VPC flow log fields into their associated Elastic +Common Schema (ECS) fields. It removes the original fields that were mapped to +ECS to reduced duplication. These are the field associations. There may be some +transformations applied to derive the ECS field. + +[options="header"] +|====== +| VPC Flow Log Field | ECS Field | +| account_id | cloud.account.id | +| action | event.outcome | +| action | event.action | +| action | event.type | +| az_id | cloud.availability_zone | +| bytes | network.bytes | +| bytes | source.bytes | +| dstaddr | destination.address | +| dstaddr | destination.ip | +| dstport | destination.port | +| end | @timestamp | +| end | event.end | +| flow_direction | network.direction | +| instance_id | cloud.instance.id | +| packets | network.packets | +| packets | source.packets | +| protocol | network.iana_number | +| protocol | network.transport | +| region | cloud.region | +| srcaddr | network.type | +| srcaddr | source.address | +| srcaddr | source.ip | +| srcport | source.port | +| start | event.start | +|====== + +[float] +==== ECS and Original + +This mode maps the fields into ECS and retains all the original fields. Below +is an example document produced using `ecs_and_orignal` mode. + +[source,json] +---- +{ + "@timestamp": "2021-03-26T03:29:09Z", + "aws": { + "vpcflow": { + "account_id": "64111117617", + "action": "REJECT", + "az_id": "use1-az5", + "bytes": 1, + "dstaddr": "10.200.0.0", + "dstport": 33004, + "end": "2021-03-26T03:29:09Z", + "flow_direction": "ingress", + "instance_id": "i-0axxxxxx1ad77", + "interface_id": "eni-069xxxxxb7a490", + "log_status": "OK", + "packets": 52, + "pkt_dst_aws_service": "CLOUDFRONT", + "pkt_dstaddr": "10.200.0.80", + "pkt_src_aws_service": "AMAZON", + "pkt_srcaddr": "89.160.20.156", + "protocol": 17, + "region": "us-east-1", + "srcaddr": "89.160.20.156", + "srcport": 50041, + "start": "2021-03-26T03:28:12Z", + "sublocation_id": "fake-id", + "sublocation_type": "wavelength", + "subnet_id": "subnet-02d645xxxxxxxdbc0", + "tcp_flags": 1, + "tcp_flags_array": [ + "fin" + ], + "traffic_path": 1, + "type": "IPv4", + "version": 5, + "vpc_id": "vpc-09676f97xxxxxb8a7" + } + }, + "cloud": { + "account": { + "id": "64111117617" + }, + "availability_zone": "use1-az5", + "instance": { + "id": "i-0axxxxxx1ad77" + }, + "region": "us-east-1" + }, + "destination": { + "address": "10.200.0.0", + "ip": "10.200.0.0", + "port": 33004 + }, + "event": { + "action": "reject", + "end": "2021-03-26T03:29:09Z", + "outcome": "failure", + "start": "2021-03-26T03:28:12Z", + "type": [ + "connection", + "denied" + ] + }, + "message": "5 64111117617 eni-069xxxxxb7a490 89.160.20.156 10.200.0.0 50041 33004 17 52 1 1616729292 1616729349 REJECT OK vpc-09676f97xxxxxb8a7 subnet-02d645xxxxxxxdbc0 i-0axxxxxx1ad77 1 IPv4 89.160.20.156 10.200.0.80 us-east-1 use1-az5 wavelength fake-id AMAZON CLOUDFRONT ingress 1", + "network": { + "bytes": 1, + "direction": "ingress", + "iana_number": "17", + "packets": 52, + "transport": "udp", + "type": "ipv4" + }, + "related": { + "ip": [ + "89.160.20.156", + "10.200.0.0", + "10.200.0.80" + ] + }, + "source": { + "address": "89.160.20.156", + "bytes": 1, + "ip": "89.160.20.156", + "packets": 52, + "port": 50041 + } +} +---- + diff --git a/x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings.go b/x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings.go new file mode 100644 index 000000000000..1aede98cd492 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings.go @@ -0,0 +1,133 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package strings + +import ( + "errors" + "unicode" + "unicode/utf8" +) + +var errTooManySubstrings = errors.New("len of dst slice is less than the number of substrings") + +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} + +// Fields splits the string s around each instance of one or more consecutive white space +// characters, as defined by unicode.IsSpace, returning a slice of substrings of s or an +// empty slice if s contains only white space. +// +// It writes at most len(dst) substrings from s into dst. It returns the number +// of substrings copied and an error if there were more than len(dst) substrings. +func Fields(dst []string, s string) (int, error) { + // First count the fields. + // This is an exact count if s is ASCII, otherwise it is an approximation. + n := 0 + wasSpace := 1 + // setBits is used to track which bits are set in the bytes of s. + setBits := uint8(0) + for i := 0; i < len(s); i++ { + r := s[i] + setBits |= r + isSpace := int(asciiSpace[r]) + n += wasSpace & ^isSpace + wasSpace = isSpace + } + + if setBits >= utf8.RuneSelf { + // Some runes in the input string are not ASCII. + return fieldsFunc(dst, s, unicode.IsSpace) + } + // ASCII fast path + na := 0 + fieldStart := 0 + i := 0 + // Skip spaces in the front of the input. + for i < len(s) && asciiSpace[s[i]] != 0 { + i++ + } + fieldStart = i + for i < len(s) { + if asciiSpace[s[i]] == 0 { + i++ + continue + } + if na >= len(dst) { + return na, errTooManySubstrings + } + dst[na] = s[fieldStart:i] + na++ + i++ + // Skip spaces in between fields. + for i < len(s) && asciiSpace[s[i]] != 0 { + i++ + } + fieldStart = i + } + if fieldStart < len(s) { // Last field might end at EOF. + if na >= len(dst) { + return na, errTooManySubstrings + } + dst[na] = s[fieldStart:] + na++ + } + return na, nil +} + +// fieldsFunc splits the string s at each run of Unicode code points c satisfying f(c) +// and returns an array of slices of s. If all code points in s satisfy f(c) or the +// string is empty, an empty slice is returned. +// +// FieldsFunc makes no guarantees about the order in which it calls f(c) +// and assumes that f always returns the same value for a given c. +func fieldsFunc(dst []string, s string, f func(rune) bool) (int, error) { + // A span is used to record a slice of s of the form s[start:end]. + // The start index is inclusive and the end index is exclusive. + type span struct { + start int + end int + } + spans := make([]span, 0, len(dst)) + + // Find the field start and end indices. + // Doing this in a separate pass (rather than slicing the string s + // and collecting the result substrings right away) is significantly + // more efficient, possibly due to cache effects. + start := -1 // valid span start if >= 0 + for end, rune := range s { + if f(rune) { + if start >= 0 { + if len(spans) < len(dst) { + spans = append(spans, span{start, end}) + } else { + break + } + // Set start to a negative value. + // Note: using -1 here consistently and reproducibly + // slows down this code by a several percent on amd64. + start = ^start + } + } else { + if start < 0 { + start = end + } + } + } + + // Last field might end at EOF. + if start >= 0 && len(spans) < len(dst) { + spans = append(spans, span{start, len(s)}) + } + + // Create strings from recorded field indices. + for i, span := range spans { + dst[i] = s[span.start:span.end] + } + + return len(spans), nil +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings_test.go b/x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings_test.go new file mode 100644 index 000000000000..dc111f168c0b --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/internal/strings/strings_test.go @@ -0,0 +1,156 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package strings + +import ( + "testing" + "unicode" +) + +func eq(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + if a[i] != b[i] { + return false + } + } + return true +} + +var faces = "☺☻☹" + +type FieldsTest struct { + s string + a []string +} + +var fieldstests = []FieldsTest{ + {"", []string{}}, + {" ", []string{}}, + {" \t ", []string{}}, + {"\u2000", []string{}}, + {" abc ", []string{"abc"}}, + {"1 2 3 4", []string{"1", "2", "3", "4"}}, + {"1 2 3 4", []string{"1", "2", "3", "4"}}, + {"1\t\t2\t\t3\t4", []string{"1", "2", "3", "4"}}, + {"1\u20002\u20013\u20024", []string{"1", "2", "3", "4"}}, + {"\u2000\u2001\u2002", []string{}}, + {"\n™\t™\n", []string{"™", "™"}}, + {"\n\u20001™2\u2000 \u2001 ™", []string{"1™2", "™"}}, + {"\n1\uFFFD \uFFFD2\u20003\uFFFD4", []string{"1\uFFFD", "\uFFFD2", "3\uFFFD4"}}, + {"1\xFF\u2000\xFF2\xFF \xFF", []string{"1\xFF", "\xFF2\xFF", "\xFF"}}, + {faces, []string{faces}}, +} + +func TestFields(t *testing.T) { + var dst [4]string + for _, tt := range fieldstests { + n, err := Fields(dst[:], tt.s) + if err != nil { + t.Fatal(err) + } + if !eq(dst[:n], tt.a) { + t.Errorf("Fields(%q) = %v; want %v", tt.s, dst[:n], tt.a) + continue + } + if len(tt.a) != n { + t.Errorf("Return count n = %d; want %d", n, len(tt.a)) + } + } + + // Smaller + var smallDst [2]string + for _, tt := range fieldstests { + n, err := Fields(smallDst[:], tt.s) + if err == errTooManySubstrings { //nolint:errorlint // errTooManySubstrings is never wrapped. + if len(tt.a) > len(smallDst) { + continue + } + } + if err != nil { + t.Fatal(err) + } + + if !eq(smallDst[:n], tt.a[:n]) { + t.Errorf("Fields(%q) = %v; want %v", tt.s, smallDst[:n], tt.a) + continue + } + } +} + +var FieldsFuncTests = []FieldsTest{ + {"", []string{}}, + {"XX", []string{}}, + {"XXhiXXX", []string{"hi"}}, + {"aXXbXXXcX", []string{"a", "b", "c"}}, +} + +//nolint:errorlint // errTooManySubstrings is never wrapped. +func TestFieldsFunc(t *testing.T) { + var dst [4]string + for _, tt := range fieldstests { + n, err := fieldsFunc(dst[:], tt.s, unicode.IsSpace) + if err != nil { + t.Fatal(err) + } + if !eq(dst[:n], tt.a) { + t.Errorf("FieldsFunc(%q, unicode.IsSpace) = %v; want %v", tt.s, dst, tt.a) + continue + } + if len(tt.a) != n { + t.Errorf("Return count n = %d; want %d", n, len(tt.a)) + } + } + pred := func(c rune) bool { return c == 'X' } + for _, tt := range FieldsFuncTests { + n, err := fieldsFunc(dst[:], tt.s, pred) + if err != nil { + t.Fatal(err) + } + if !eq(dst[:n], tt.a) { + t.Errorf("FieldsFunc(%q) = %v, want %v", tt.s, dst[:n], tt.a) + } + if len(tt.a) != n { + t.Errorf("Return count n = %d; want %d", n, len(tt.a)) + } + } + + // Smaller + var smallDst [2]string + for _, tt := range fieldstests { + n, err := Fields(smallDst[:], tt.s) + if err == errTooManySubstrings { + if len(tt.a) > len(smallDst) { + continue + } + } + if err != nil { + t.Fatal(err) + } + + if !eq(smallDst[:n], tt.a[:n]) { + t.Errorf("Fields(%q) = %v; want %v", tt.s, smallDst[:n], tt.a) + continue + } + } + for _, tt := range FieldsFuncTests { + n, err := fieldsFunc(smallDst[:], tt.s, pred) + if err == errTooManySubstrings { + if len(tt.a) > len(smallDst) { + continue + } + } + if err != nil { + t.Fatal(err) + } + + if !eq(smallDst[:n], tt.a[:n]) { + t.Errorf("Fields(%q) = %v; want %v", tt.s, smallDst[:n], tt.a) + continue + } + } +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/mapping.go b/x-pack/filebeat/processors/aws_vpcflow/mapping.go new file mode 100644 index 000000000000..ce4679f89e6b --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/mapping.go @@ -0,0 +1,295 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws_vpcflow + +import ( + "math/bits" + "strconv" + "strings" + + "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/beat" +) + +type vpcFlowField struct { + Name string // Name of the VPC flow field that is added to our events. + Type dataType // Data type to convert the string into. + Enrich func(originalFields mapstr.M, value interface{}) // Optional enrichment function to add new derived fields into the 'target_field' namespace. + ECSMappings []ecsFieldMapping // List of ECS fields to create or derive from this field. +} + +type ecsFieldMapping struct { + Target string // ECS field target. + Transform func(targetField string, value interface{}, event *beat.Event) // Optional transform to modify the value. If omitted the value is copied. +} + +var nameToFieldMap map[string]vpcFlowField + +func init() { + nameToFieldMap = make(map[string]vpcFlowField, len(vpcFlowFields)) + for _, field := range vpcFlowFields { + nameToFieldMap[field.Name] = field + } +} + +var vpcFlowFields = [...]vpcFlowField{ + { + Name: "version", + Type: integerType, + }, + { + Name: "account_id", + Type: stringType, + ECSMappings: []ecsFieldMapping{ + {Target: "cloud.account.id"}, + }, + }, + { + Name: "interface_id", + Type: stringType, + }, + { + Name: "srcaddr", + Type: ipType, + ECSMappings: []ecsFieldMapping{ + {Target: "source.address"}, + {Target: "source.ip"}, + { + Target: "network.type", + Transform: func(targetField string, value interface{}, event *beat.Event) { + if ip := value.(string); strings.Contains(ip, ".") { + event.PutValue(targetField, "ipv4") //nolint:errcheck // This can only fail if 'network' is not an object. + } else { + event.PutValue(targetField, "ipv6") //nolint:errcheck // This can only fail if 'network' is not an object. + } + }, + }, + }, + }, + { + Name: "dstaddr", + Type: ipType, + ECSMappings: []ecsFieldMapping{ + {Target: "destination.address"}, + {Target: "destination.ip"}, + }, + }, + { + Name: "srcport", + Type: integerType, + ECSMappings: []ecsFieldMapping{ + {Target: "source.port"}, + }, + }, + { + Name: "dstport", + Type: integerType, + ECSMappings: []ecsFieldMapping{ + {Target: "destination.port"}, + }, + }, + { + Name: "protocol", + Type: integerType, + ECSMappings: []ecsFieldMapping{ + { + Target: "network.iana_number", + Transform: func(targetField string, value interface{}, event *beat.Event) { + protocol := value.(int32) + event.PutValue(targetField, strconv.Itoa(int(protocol))) //nolint:errcheck // This can only fail if 'network' is not an object. + }, + }, + { + Target: "network.transport", + Transform: func(targetField string, value interface{}, event *beat.Event) { + var name string + switch protocol := value.(int32); protocol { + case 0: + name = "hopopt" + case 1: + name = "icmp" + case 2: + name = "igmp" + case 6: + name = "tcp" + case 8: + name = "egp" + case 17: + name = "udp" + case 47: + name = "gre" + case 50: + name = "esp" + case 58: + name = "ipv6-icmp" + case 112: + name = "vrrp" + case 132: + name = "sctp" + } + + if name != "" { + event.PutValue(targetField, name) //nolint:errcheck // This can only fail if 'network' is not an object. + } + }, + }, + }, + }, + { + Name: "packets", + Type: longType, + ECSMappings: []ecsFieldMapping{ + {Target: "source.packets"}, + {Target: "network.packets"}, + }, + }, + { + Name: "bytes", + Type: longType, + ECSMappings: []ecsFieldMapping{ + {Target: "source.bytes"}, + {Target: "network.bytes"}, + }, + }, + { + Name: "start", + Type: timestampType, + ECSMappings: []ecsFieldMapping{ + {Target: "event.start"}, + }, + }, + { + Name: "end", + Type: timestampType, + ECSMappings: []ecsFieldMapping{ + {Target: "event.end"}, + {Target: "@timestamp"}, + }, + }, + { + Name: "action", + Type: stringType, + ECSMappings: []ecsFieldMapping{ + { + Target: "event.outcome", + Transform: func(targetField string, value interface{}, event *beat.Event) { + var outcome string + + switch s := value.(string); s { + case "ACCEPT": + outcome = "success" + case "REJECT": + outcome = "failure" + } + + if outcome != "" { + event.PutValue(targetField, outcome) //nolint:errcheck // This can only fail if 'event' is not an object. + } + }, + }, + { + Target: "event.action", + Transform: func(targetField string, value interface{}, event *beat.Event) { + event.PutValue(targetField, strings.ToLower(value.(string))) //nolint:errcheck // This can only fail if 'event' is not an object. + }, + }, + { + Target: "event.type", + Transform: func(targetField string, value interface{}, event *beat.Event) { + var eventType string + + switch s := value.(string); s { + case "ACCEPT": + eventType = "allowed" + case "REJECT": + eventType = "denied" + } + + if len(eventType) > 0 { + // The processor always adds event.type: [connection] in ECS mode. + v, _ := event.GetValue(targetField) + if eventTypes, ok := v.([]string); ok { + event.PutValue(targetField, append(eventTypes, eventType)) //nolint:errcheck // This can only fail if 'event' is not an object. + return + } + + event.PutValue(targetField, []string{eventType}) //nolint:errcheck // This can only fail if 'event' is not an object. + } + }, + }, + }, + }, + {Name: "log_status", Type: stringType}, + {Name: "vpc_id", Type: stringType}, + {Name: "subnet_id", Type: stringType}, + { + Name: "instance_id", + Type: stringType, + ECSMappings: []ecsFieldMapping{ + {Target: "cloud.instance.id"}, + }, + }, + { + Name: "tcp_flags", + Type: integerType, + Enrich: func(originalFields mapstr.M, value interface{}) { + flag := value.(int32) + flags := make([]string, 0, bits.OnesCount8(uint8(flag))) + if flag&0x01 != 0 { + flags = append(flags, "fin") + } + if flag&0x02 != 0 { + flags = append(flags, "syn") + } + if flag&0x04 != 0 { + flags = append(flags, "rst") + } + if flag&0x08 != 0 { + flags = append(flags, "psh") + } + if flag&0x10 != 0 { + flags = append(flags, "ack") + } + if flag&0x20 != 0 { + flags = append(flags, "urg") + } + + if len(flags) > 0 { + originalFields["tcp_flags_array"] = flags + } + }, + }, + {Name: "type", Type: stringType}, + // TODO: Could these be used in some way to set source.nat.* and destination.nat.*. + {Name: "pkt_srcaddr", Type: ipType}, + {Name: "pkt_dstaddr", Type: ipType}, + { + Name: "region", + Type: stringType, + ECSMappings: []ecsFieldMapping{ + {Target: "cloud.region"}, + }, + }, + { + Name: "az_id", + Type: stringType, + ECSMappings: []ecsFieldMapping{ + {Target: "cloud.availability_zone"}, + }, + }, + {Name: "sublocation_type", Type: stringType}, + {Name: "sublocation_id", Type: stringType}, + {Name: "pkt_src_aws_service", Type: stringType}, + {Name: "pkt_dst_aws_service", Type: stringType}, + { + Name: "flow_direction", + Type: stringType, + ECSMappings: []ecsFieldMapping{ + {Target: "network.direction"}, + }, + }, + {Name: "traffic_path", Type: integerType}, +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log.go b/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log.go new file mode 100644 index 000000000000..dafbe748dcde --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log.go @@ -0,0 +1,245 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws_vpcflow + +import ( + "encoding/json" + "errors" + "fmt" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" + jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + "github.com/elastic/beats/v7/x-pack/filebeat/processors/aws_vpcflow/internal/strings" +) + +const ( + procName = "parse_aws_vpc_flow_log" + logName = "processor." + procName +) + +func init() { + processors.RegisterPlugin(procName, New) + jsprocessor.RegisterPlugin("ParseAWSVPCFlowLog", New) +} + +var ( + errValueNotString = errors.New("field must be a string") + errInvalidFormat = errors.New("log did not match the specified format") +) + +type processor struct { + config + formats []formatProcessor +} + +// New constructs a new processor built from ucfg config. +func New(cfg *conf.C) (processors.Processor, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, fmt.Errorf("fail to unpack the "+procName+" processor configuration: %w", err) + } + + return newParseAWSVPCFlowLog(c) +} + +func newParseAWSVPCFlowLog(c config) (*processor, error) { + log := logp.NewLogger(logName) + if c.ID != "" { + log = log.With("instance_id", c.ID) + } + + // Validate configs that did not pass through go-ucfg. + if err := c.Validate(); err != nil { + return nil, err + } + + var formatProcessors []formatProcessor + for _, f := range c.Format { + p, err := newFormatProcessor(c, log, f) + if err != nil { + return nil, err + } + formatProcessors = append(formatProcessors, *p) + } + + return &processor{ + config: c, + formats: formatProcessors, + }, nil +} + +func (p *processor) String() string { + // JSON encoding of the config struct should never cause an error. + json, _ := json.Marshal(p.config) + return procName + "=" + string(json) +} + +func (p *processor) Run(event *beat.Event) (*beat.Event, error) { + err := p.run(event) + if err == nil || p.IgnoreFailure || (p.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound)) { + return event, nil + } + + return event, err +} + +func (p *processor) run(event *beat.Event) error { + v, err := event.GetValue(p.Field) + if err != nil { + return err + } + + strValue, ok := v.(string) + if !ok { + return errValueNotString + } + + // Split the string at whitespace without allocating. + var dst [len(vpcFlowFields)]string + n, err := strings.Fields(dst[:], strValue) + if err != nil { + return errInvalidFormat + } + substrings := dst[:n] + + // Find the matching format based on substring count. + for _, format := range p.formats { + if len(format.fields) == n { + return format.process(substrings, event) + } + } + return errInvalidFormat +} + +// formatProcessor processes an event using a single VPC flow log format. +type formatProcessor struct { + config + log *logp.Logger + fields []vpcFlowField + originalFieldCount int + expectedIPCount int +} + +func newFormatProcessor(c config, log *logp.Logger, format string) (*formatProcessor, error) { + fields, err := parseFormat(format) + if err != nil { + return nil, fmt.Errorf("failed to parse vpc flow log format: %w", err) + } + + originalFieldCount := len(fields) + if c.Mode == ecsMode { + for _, f := range fields { + // If an ECS mapping exists then ECS mode will not include the + // original field. + if len(f.ECSMappings) > 0 { + originalFieldCount-- + } + } + } + + var ipCount int + for _, f := range fields { + if f.Type == ipType { + ipCount++ + } + } + + return &formatProcessor{ + config: c, + log: log, + fields: fields, + originalFieldCount: originalFieldCount, + expectedIPCount: ipCount, + }, nil +} + +func (p *formatProcessor) process(substrings []string, event *beat.Event) error { + originalFields := make(mapstr.M, p.originalFieldCount) + + var relatedIPs []string + if p.Mode > originalMode { + // Allocate space for the expected number of IPs assuming all are unique. + relatedIPs = make([]string, 0, p.expectedIPCount) + + // Preallocate event.type with extra capacity for "allowed" or "denied". + eventTypes := make([]string, 1, 2) + eventTypes[0] = "connection" + if _, err := event.PutValue("event.type", eventTypes); err != nil { + return err + } + } + + // Iterate over the substrings in the source string and apply type + // conversion and then ECS mappings. + for i, word := range substrings { + if word == "-" { + continue + } + field := p.fields[i] + + // Convert the string to the expected type. + v, err := toType(word, field.Type) + if err != nil { + return fmt.Errorf("failed to parse %q: %w", field.Name, err) + } + + // Add to the 'original' fields when we are in original mode + // or ecs_and_original mode. Or if there are no ECS mappings then + // retain the original field. + if p.Mode != ecsMode || len(field.ECSMappings) == 0 { + originalFields[field.Name] = v + + if field.Enrich != nil { + field.Enrich(originalFields, v) + } + } + + // Apply ECS transforms when in ecs or ecs_and_original modes. + if p.Mode > originalMode { + for _, mapping := range field.ECSMappings { + if mapping.Transform == nil { + if _, err = event.PutValue(mapping.Target, v); err != nil { + return err + } + } else { + mapping.Transform(mapping.Target, v, event) + } + } + + if field.Type == ipType { + relatedIPs = appendUnique(relatedIPs, v.(string)) + } + } + } + + if _, err := event.PutValue(p.TargetField, originalFields); err != nil { + return err + } + + if len(relatedIPs) > 0 { + if _, err := event.PutValue("related.ip", relatedIPs); err != nil { + return err + } + } + + return nil +} + +// appendUnique appends a value to the slice if the given value does not already +// exist in the slice. It determines if item is in the slice by iterating over +// all elements in the slice and checking equality. +func appendUnique(s []string, item string) []string { + for _, existing := range s { + if item == existing { + return s + } + } + return append(s, item) +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log_test.go b/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log_test.go new file mode 100644 index 000000000000..2744256a1bab --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log_test.go @@ -0,0 +1,291 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws_vpcflow + +import ( + "encoding/json" + "flag" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/beat" +) + +var updateGolden = flag.Bool("update", false, "Update golden test data.") + +const ( + formatV5 = `version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status vpc-id subnet-id instance-id tcp-flags type pkt-srcaddr pkt-dstaddr region az-id sublocation-type sublocation-id pkt-src-aws-service pkt-dst-aws-service flow-direction traffic-path` + formatV5Sample = `5 64111117617 eni-069xxxxxb7a490 89.160.20.156 10.200.0.0 50041 33004 17 52 1 1616729292 1616729349 REJECT OK vpc-09676f97xxxxxb8a7 subnet-02d645xxxxxxxdbc0 i-0axxxxxx1ad77 1 IPv4 89.160.20.156 10.200.0.80 us-east-1 use1-az5 wavelength fake-id AMAZON CLOUDFRONT ingress 1` +) + +func TestProcessorRun(t *testing.T) { + t.Run("ecs_and_original-mode-v5-message", func(t *testing.T) { + c := defaultConfig() + c.Format = []string{ + "version account-id", // Not a match. + formatV5, + } + c.Mode = ecsAndOriginalMode + + p, err := newParseAWSVPCFlowLog(c) + require.NoError(t, err) + + assert.Contains(t, p.String(), procName+"=") + assert.Contains(t, p.String(), formatV5) + + evt := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: map[string]interface{}{ + "message": formatV5Sample, + }, + } + + out, err := p.Run(&evt) + require.NoError(t, err) + + start := time.Unix(1616729292, 0).UTC() + end := time.Unix(1616729349, 0).UTC() + expected := mapstr.M{ + "aws": mapstr.M{ + "vpcflow": mapstr.M{ + "account_id": "64111117617", + "action": "REJECT", + "az_id": "use1-az5", + "bytes": int64(1), + "dstaddr": "10.200.0.0", + "dstport": int32(33004), + "end": end, + "flow_direction": "ingress", + "instance_id": "i-0axxxxxx1ad77", + "interface_id": "eni-069xxxxxb7a490", + "log_status": "OK", + "packets": int64(52), + "pkt_dst_aws_service": "CLOUDFRONT", + "pkt_dstaddr": "10.200.0.80", + "pkt_src_aws_service": "AMAZON", + "pkt_srcaddr": "89.160.20.156", + "protocol": int32(17), + "region": "us-east-1", + "srcaddr": "89.160.20.156", + "srcport": int32(50041), + "start": start, + "sublocation_id": "fake-id", + "sublocation_type": "wavelength", + "subnet_id": "subnet-02d645xxxxxxxdbc0", + "tcp_flags": int32(1), + "tcp_flags_array": []string{ + "fin", + }, + "traffic_path": int32(1), + "type": "IPv4", + "version": int32(5), + "vpc_id": "vpc-09676f97xxxxxb8a7", + }, + }, + "cloud": mapstr.M{ + "account": mapstr.M{ + "id": "64111117617", + }, + "availability_zone": "use1-az5", + "instance": mapstr.M{ + "id": "i-0axxxxxx1ad77", + }, + "region": "us-east-1", + }, + "destination": mapstr.M{ + "address": "10.200.0.0", + "ip": "10.200.0.0", + "port": int32(33004), + }, + "event": mapstr.M{ + "action": "reject", + "end": end, + "outcome": "failure", + "start": start, + "type": []string{"connection", "denied"}, + }, + "message": formatV5Sample, + "network": mapstr.M{ + "bytes": int64(1), + "direction": "ingress", + "iana_number": "17", + "packets": int64(52), + "transport": "udp", + "type": "ipv4", + }, + "related": mapstr.M{ + "ip": []string{"89.160.20.156", "10.200.0.0", "10.200.0.80"}, + }, + "source": mapstr.M{ + "address": "89.160.20.156", + "bytes": int64(1), + "ip": "89.160.20.156", + "packets": int64(52), + "port": int32(50041), + }, + } + + assert.Equal(t, end, out.Timestamp) + if diff := cmp.Diff(expected, out.Fields); diff != "" { + t.Fatal(diff) + } + }) +} + +func TestGoldenFile(t *testing.T) { + testCases := readGoldenTestCase(t) + + if *updateGolden { + // Delete existing golden files. + goldens, _ := filepath.Glob("testdata/*.golden.*") + for _, golden := range goldens { + os.Remove(golden) + } + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.Name, func(t *testing.T) { + c := defaultConfig() + c.Format = []string{tc.Format} + if tc.Mode != nil { + c.Mode = *tc.Mode + } + + p, err := newParseAWSVPCFlowLog(c) + require.NoError(t, err) + + observed := make([]mapstr.M, 0, len(tc.Samples)) + for _, sample := range tc.Samples { + evt := &beat.Event{Fields: mapstr.M{"message": sample}} + out, err := p.Run(evt) + require.NoError(t, err) + + if !out.Timestamp.IsZero() { + out.Fields["@timestamp"] = out.Timestamp + } + observed = append(observed, out.Fields) + } + + goldenFile := filepath.Join("testdata", tc.Name+".golden.json") + if *updateGolden { + writeGolden(t, goldenFile, observed) + } else { + expectedJSON := readGolden(t, goldenFile) + + observedJSON, err := json.Marshal(observed) + require.NoError(t, err) + + assert.JSONEq(t, expectedJSON, string(observedJSON)) + } + }) + } +} + +type goldenTestCase struct { + Name string `yaml:"-"` // Name of test. + Mode *mode // Processing mode (what fields to generate). + Format string // Flow log format. + Samples []string // List of sample logs to parse. +} + +func readGoldenTestCase(t *testing.T) []goldenTestCase { + t.Helper() + + f, err := os.Open("testdata/aws-vpc-flow-logs.yml") + if err != nil { + t.Fatal(err) + } + + dec := yaml.NewDecoder(f) + + var testCases map[string]goldenTestCase + if err = dec.Decode(&testCases); err != nil { + t.Fatal(err) + } + + testCasesList := make([]goldenTestCase, 0, len(testCases)) + for k, v := range testCases { + v.Name = k + testCasesList = append(testCasesList, v) + } + + return testCasesList +} + +func writeGolden(t *testing.T, path string, events []mapstr.M) { + t.Helper() + + f, err := os.Create(path) + require.NoError(t, err) + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + enc.SetEscapeHTML(false) + if err = enc.Encode(events); err != nil { + t.Fatal() + } +} + +func readGolden(t *testing.T, path string) string { + t.Helper() + + data, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + + return string(data) +} + +func BenchmarkProcessorRun(b *testing.B) { + benchmarks := []struct { + name string + mode mode + format string + message string + }{ + {"original-mode-v5-message", originalMode, formatV5, formatV5Sample}, + {"ecs-mode-v5-message", ecsMode, formatV5, formatV5Sample}, + {"ecs_and_original-mode-v5-message", ecsAndOriginalMode, formatV5, formatV5Sample}, + } + + for _, benchmark := range benchmarks { + benchmark := benchmark + b.Run(benchmark.name, func(b *testing.B) { + c := defaultConfig() + c.Format = []string{benchmark.format} + c.Mode = benchmark.mode + + p, err := newParseAWSVPCFlowLog(c) + require.NoError(b, err) + + evt := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: map[string]interface{}{ + "message": benchmark.message, + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err = p.Run(&evt); err != nil { + b.Fatal(err) + } + } + }) + } +} diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/aws-vpc-flow-logs.yml b/x-pack/filebeat/processors/aws_vpcflow/testdata/aws-vpc-flow-logs.yml new file mode 100644 index 000000000000..d9d21d98c04b --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/aws-vpc-flow-logs.yml @@ -0,0 +1,75 @@ +--- + +default-v2-format: + format: version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status + samples: + # Accepted and rejected traffic + # https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-accepted-rejected + - 2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK + - 2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK + # No data and skipped records + # https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-no-data + - 2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA + - 2 123456789010 eni-11111111aaaaaaaaa - - - - - - - 1431280876 1431280934 - SKIPDATA + # Security group and network ACL rules + # https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-security-groups + - 2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK + - 2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK + # IPV6 Traffic + # https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-ipv6 + - 2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK + +# https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-tcp-flag +tcp-flag-sequence: + format: version vpc-id subnet-id instance-id interface-id account-id type srcaddr dstaddr srcport dstport pkt-srcaddr pkt-dstaddr protocol bytes packets start end action tcp-flags log-status + samples: + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 100701 70 1566848875 1566848933 ACCEPT 2 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 632 12 1566848875 1566848933 ACCEPT 18 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 63388 1219 1566848933 1566849113 ACCEPT 1 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 23294588 15774 1566848933 1566849113 ACCEPT 1 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43638 5001 52.213.180.42 10.0.0.62 6 1260 17 1566933133 1566933193 ACCEPT 3 OK + - 3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43638 10.0.0.62 52.213.180.42 6 967 14 1566933133 1566933193 ACCEPT 19 OK + +# https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-nat +custom-nat-gateway: + format: instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr + samples: + - '- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5' + - '- eni-1235b8ca123456789 10.0.0.220 203.0.113.5 10.0.0.220 203.0.113.5' + - '- eni-1235b8ca123456789 203.0.113.5 10.0.0.220 203.0.113.5 10.0.0.220' + - '- eni-1235b8ca123456789 10.0.0.220 10.0.1.5 203.0.113.5 10.0.1.5' + - i-01234567890123456 eni-1111aaaa2222bbbb3 10.0.1.5 203.0.113.5 10.0.1.5 203.0.113.5 #Traffic from the source instance to host on the internet + - i-01234567890123456 eni-1111aaaa2222bbbb3 203.0.113.5 10.0.1.5 203.0.113.5 10.0.1.5 #Response traffic from host on the internet to the source instance + +# https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-tgw +transit-gateway: + format: version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status + samples: + - 3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.20.33.164 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK + - 3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.40.2.236 10.20.33.164 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK + - 3 eni-11111111111111111 123456789010 vpc-abcdefab012345678 subnet-11111111aaaaaaaaa - 10.40.1.175 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK + - 3 eni-22222222222222222 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb - 10.40.2.236 10.40.2.31 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK + +# https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-traffic-path +service-name-path-direction: + format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status + samples: + - 5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK + - 5 10.0.0.71 52.95.128.179 34210 80 6 1616729292 1616729349 IPv4 7 471 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 3 10.0.0.71 52.95.128.179 - S3 8 egress OK + +# Full list of all fields in v5 in order. +v5-fields-original: &v5-fields + mode: original + format: version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status vpc-id subnet-id instance-id tcp-flags type pkt-srcaddr pkt-dstaddr region az-id sublocation-type sublocation-id pkt-src-aws-service pkt-dst-aws-service flow-direction traffic-path + samples: + - 5 64111117617 eni-069xxxxxb7a490 89.160.20.156 10.200.0.0 50041 33004 17 52 1 1616729292 1616729349 REJECT OK vpc-09676f97xxxxxb8a7 subnet-02d645xxxxxxxdbc0 i-0axxxxxx1ad77 1 IPv4 89.160.20.156 10.200.0.80 us-east-1 use1-az5 wavelength fake-id AMAZON CLOUDFRONT ingress 1 + +v5-fields-ecs: + <<: *v5-fields + mode: ecs + +v5-fields-ecs_and_original: + <<: *v5-fields + mode: ecs_and_original diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/custom-nat-gateway.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/custom-nat-gateway.golden.json new file mode 100644 index 000000000000..ca511ed5c8e1 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/custom-nat-gateway.golden.json @@ -0,0 +1,206 @@ +[ + { + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "pkt_dstaddr": "203.0.113.5", + "pkt_srcaddr": "10.0.1.5" + } + }, + "destination": { + "address": "10.0.0.220", + "ip": "10.0.0.220" + }, + "event": { + "type": [ + "connection" + ] + }, + "message": "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5", + "network": { + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.1.5", + "10.0.0.220", + "203.0.113.5" + ] + }, + "source": { + "address": "10.0.1.5", + "ip": "10.0.1.5" + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "pkt_dstaddr": "203.0.113.5", + "pkt_srcaddr": "10.0.0.220" + } + }, + "destination": { + "address": "203.0.113.5", + "ip": "203.0.113.5" + }, + "event": { + "type": [ + "connection" + ] + }, + "message": "- eni-1235b8ca123456789 10.0.0.220 203.0.113.5 10.0.0.220 203.0.113.5", + "network": { + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.220", + "203.0.113.5" + ] + }, + "source": { + "address": "10.0.0.220", + "ip": "10.0.0.220" + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "pkt_dstaddr": "10.0.0.220", + "pkt_srcaddr": "203.0.113.5" + } + }, + "destination": { + "address": "10.0.0.220", + "ip": "10.0.0.220" + }, + "event": { + "type": [ + "connection" + ] + }, + "message": "- eni-1235b8ca123456789 203.0.113.5 10.0.0.220 203.0.113.5 10.0.0.220", + "network": { + "type": "ipv4" + }, + "related": { + "ip": [ + "203.0.113.5", + "10.0.0.220" + ] + }, + "source": { + "address": "203.0.113.5", + "ip": "203.0.113.5" + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "pkt_dstaddr": "10.0.1.5", + "pkt_srcaddr": "203.0.113.5" + } + }, + "destination": { + "address": "10.0.1.5", + "ip": "10.0.1.5" + }, + "event": { + "type": [ + "connection" + ] + }, + "message": "- eni-1235b8ca123456789 10.0.0.220 10.0.1.5 203.0.113.5 10.0.1.5", + "network": { + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.220", + "10.0.1.5", + "203.0.113.5" + ] + }, + "source": { + "address": "10.0.0.220", + "ip": "10.0.0.220" + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-1111aaaa2222bbbb3", + "pkt_dstaddr": "203.0.113.5", + "pkt_srcaddr": "10.0.1.5" + } + }, + "cloud": { + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "203.0.113.5", + "ip": "203.0.113.5" + }, + "event": { + "type": [ + "connection" + ] + }, + "message": "i-01234567890123456 eni-1111aaaa2222bbbb3 10.0.1.5 203.0.113.5 10.0.1.5 203.0.113.5", + "network": { + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.1.5", + "203.0.113.5" + ] + }, + "source": { + "address": "10.0.1.5", + "ip": "10.0.1.5" + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-1111aaaa2222bbbb3", + "pkt_dstaddr": "10.0.1.5", + "pkt_srcaddr": "203.0.113.5" + } + }, + "cloud": { + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.0.1.5", + "ip": "10.0.1.5" + }, + "event": { + "type": [ + "connection" + ] + }, + "message": "i-01234567890123456 eni-1111aaaa2222bbbb3 203.0.113.5 10.0.1.5 203.0.113.5 10.0.1.5", + "network": { + "type": "ipv4" + }, + "related": { + "ip": [ + "203.0.113.5", + "10.0.1.5" + ] + }, + "source": { + "address": "203.0.113.5", + "ip": "203.0.113.5" + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/default-v2-format.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/default-v2-format.golden.json new file mode 100644 index 000000000000..72a0bac1fd00 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/default-v2-format.golden.json @@ -0,0 +1,303 @@ +[ + { + "@timestamp": "2014-12-14T04:07:50Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "172.31.16.21", + "ip": "172.31.16.21", + "port": 22 + }, + "event": { + "action": "accept", + "end": "2014-12-14T04:07:50Z", + "outcome": "success", + "start": "2014-12-14T04:06:50Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK", + "network": { + "bytes": 4249, + "iana_number": "6", + "packets": 20, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "172.31.16.139", + "172.31.16.21" + ] + }, + "source": { + "address": "172.31.16.139", + "bytes": 4249, + "ip": "172.31.16.139", + "packets": 20, + "port": 20641 + } + }, + { + "@timestamp": "2014-12-14T04:07:50Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "172.31.9.12", + "ip": "172.31.9.12", + "port": 3389 + }, + "event": { + "action": "reject", + "end": "2014-12-14T04:07:50Z", + "outcome": "failure", + "start": "2014-12-14T04:06:50Z", + "type": [ + "connection", + "denied" + ] + }, + "message": "2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK", + "network": { + "bytes": 4249, + "iana_number": "6", + "packets": 20, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "172.31.9.69", + "172.31.9.12" + ] + }, + "source": { + "address": "172.31.9.69", + "bytes": 4249, + "ip": "172.31.9.69", + "packets": 20, + "port": 49761 + } + }, + { + "@timestamp": "2015-05-10T18:02:14Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "NODATA", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "event": { + "end": "2015-05-10T18:02:14Z", + "start": "2015-05-10T18:01:16Z", + "type": [ + "connection" + ] + }, + "message": "2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA" + }, + { + "@timestamp": "2015-05-10T18:02:14Z", + "aws": { + "vpcflow": { + "interface_id": "eni-11111111aaaaaaaaa", + "log_status": "SKIPDATA", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "event": { + "end": "2015-05-10T18:02:14Z", + "start": "2015-05-10T18:01:16Z", + "type": [ + "connection" + ] + }, + "message": "2 123456789010 eni-11111111aaaaaaaaa - - - - - - - 1431280876 1431280934 - SKIPDATA" + }, + { + "@timestamp": "2015-05-29T16:32:22Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "172.31.16.139", + "ip": "172.31.16.139", + "port": 0 + }, + "event": { + "action": "accept", + "end": "2015-05-29T16:32:22Z", + "outcome": "success", + "start": "2015-05-29T16:30:27Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK", + "network": { + "bytes": 336, + "iana_number": "1", + "packets": 4, + "transport": "icmp", + "type": "ipv4" + }, + "related": { + "ip": [ + "203.0.113.12", + "172.31.16.139" + ] + }, + "source": { + "address": "203.0.113.12", + "bytes": 336, + "ip": "203.0.113.12", + "packets": 4, + "port": 0 + } + }, + { + "@timestamp": "2015-05-29T16:32:22Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "203.0.113.12", + "ip": "203.0.113.12", + "port": 0 + }, + "event": { + "action": "reject", + "end": "2015-05-29T16:32:22Z", + "outcome": "failure", + "start": "2015-05-29T16:31:34Z", + "type": [ + "connection", + "denied" + ] + }, + "message": "2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK", + "network": { + "bytes": 336, + "iana_number": "1", + "packets": 4, + "transport": "icmp", + "type": "ipv4" + }, + "related": { + "ip": [ + "172.31.16.139", + "203.0.113.12" + ] + }, + "source": { + "address": "172.31.16.139", + "bytes": 336, + "ip": "172.31.16.139", + "packets": 4, + "port": 0 + } + }, + { + "@timestamp": "2016-10-31T11:37:00Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "version": 2 + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "2001:db8:1234:a102:3304:8879:34cf:4071", + "ip": "2001:db8:1234:a102:3304:8879:34cf:4071", + "port": 22 + }, + "event": { + "action": "accept", + "end": "2016-10-31T11:37:00Z", + "outcome": "success", + "start": "2016-10-31T11:35:08Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK", + "network": { + "bytes": 8855, + "iana_number": "6", + "packets": 54, + "transport": "tcp", + "type": "ipv6" + }, + "related": { + "ip": [ + "2001:db8:1234:a100:8d6e:3477:df66:f105", + "2001:db8:1234:a102:3304:8879:34cf:4071" + ] + }, + "source": { + "address": "2001:db8:1234:a100:8d6e:3477:df66:f105", + "bytes": 8855, + "ip": "2001:db8:1234:a100:8d6e:3477:df66:f105", + "packets": 54, + "port": 34892 + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/service-name-path-direction.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/service-name-path-direction.golden.json new file mode 100644 index 000000000000..ddf0a807ff50 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/service-name-path-direction.golden.json @@ -0,0 +1,140 @@ +[ + { + "@timestamp": "2021-03-26T03:29:09Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "10.0.0.71", + "pkt_src_aws_service": "S3", + "pkt_srcaddr": "52.95.128.179", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 19, + "tcp_flags_array": [ + "fin", + "syn", + "ack" + ], + "type": "IPv4", + "version": 5, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789012" + }, + "availability_zone": "apse2-az3", + "instance": { + "id": "i-0c50d5961bcb2d47b" + }, + "region": "ap-southeast-2" + }, + "destination": { + "address": "10.0.0.71", + "ip": "10.0.0.71", + "port": 34210 + }, + "event": { + "action": "accept", + "end": "2021-03-26T03:29:09Z", + "outcome": "success", + "start": "2021-03-26T03:28:12Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK", + "network": { + "bytes": 15044, + "direction": "ingress", + "iana_number": "6", + "packets": 14, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "52.95.128.179", + "10.0.0.71" + ] + }, + "source": { + "address": "52.95.128.179", + "bytes": 15044, + "ip": "52.95.128.179", + "packets": 14, + "port": 80 + } + }, + { + "@timestamp": "2021-03-26T03:29:09Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dst_aws_service": "S3", + "pkt_dstaddr": "52.95.128.179", + "pkt_srcaddr": "10.0.0.71", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 3, + "tcp_flags_array": [ + "fin", + "syn" + ], + "traffic_path": 8, + "type": "IPv4", + "version": 5, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789012" + }, + "availability_zone": "apse2-az3", + "instance": { + "id": "i-0c50d5961bcb2d47b" + }, + "region": "ap-southeast-2" + }, + "destination": { + "address": "52.95.128.179", + "ip": "52.95.128.179", + "port": 80 + }, + "event": { + "action": "accept", + "end": "2021-03-26T03:29:09Z", + "outcome": "success", + "start": "2021-03-26T03:28:12Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "5 10.0.0.71 52.95.128.179 34210 80 6 1616729292 1616729349 IPv4 7 471 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 3 10.0.0.71 52.95.128.179 - S3 8 egress OK", + "network": { + "bytes": 471, + "direction": "egress", + "iana_number": "6", + "packets": 7, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.71", + "52.95.128.179" + ] + }, + "source": { + "address": "10.0.0.71", + "bytes": 471, + "ip": "10.0.0.71", + "packets": 7, + "port": 34210 + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/tcp-flag-sequence.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/tcp-flag-sequence.golden.json new file mode 100644 index 000000000000..b879422eeca4 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/tcp-flag-sequence.golden.json @@ -0,0 +1,511 @@ +[ + { + "@timestamp": "2019-08-26T19:48:53Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "10.0.0.62", + "pkt_srcaddr": "52.213.180.42", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 2, + "tcp_flags_array": [ + "syn" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.0.0.62", + "ip": "10.0.0.62", + "port": 5001 + }, + "event": { + "action": "accept", + "end": "2019-08-26T19:48:53Z", + "outcome": "success", + "start": "2019-08-26T19:47:55Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK", + "network": { + "bytes": 568, + "iana_number": "6", + "packets": 8, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "52.213.180.42", + "10.0.0.62" + ] + }, + "source": { + "address": "52.213.180.42", + "bytes": 568, + "ip": "52.213.180.42", + "packets": 8, + "port": 43416 + } + }, + { + "@timestamp": "2019-08-26T19:48:53Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "52.213.180.42", + "pkt_srcaddr": "10.0.0.62", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 18, + "tcp_flags_array": [ + "syn", + "ack" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "52.213.180.42", + "ip": "52.213.180.42", + "port": 43416 + }, + "event": { + "action": "accept", + "end": "2019-08-26T19:48:53Z", + "outcome": "success", + "start": "2019-08-26T19:47:55Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK", + "network": { + "bytes": 376, + "iana_number": "6", + "packets": 7, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.62", + "52.213.180.42" + ] + }, + "source": { + "address": "10.0.0.62", + "bytes": 376, + "ip": "10.0.0.62", + "packets": 7, + "port": 5001 + } + }, + { + "@timestamp": "2019-08-26T19:48:53Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "10.0.0.62", + "pkt_srcaddr": "52.213.180.42", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 2, + "tcp_flags_array": [ + "syn" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.0.0.62", + "ip": "10.0.0.62", + "port": 5001 + }, + "event": { + "action": "accept", + "end": "2019-08-26T19:48:53Z", + "outcome": "success", + "start": "2019-08-26T19:47:55Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 100701 70 1566848875 1566848933 ACCEPT 2 OK", + "network": { + "bytes": 100701, + "iana_number": "6", + "packets": 70, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "52.213.180.42", + "10.0.0.62" + ] + }, + "source": { + "address": "52.213.180.42", + "bytes": 100701, + "ip": "52.213.180.42", + "packets": 70, + "port": 43418 + } + }, + { + "@timestamp": "2019-08-26T19:48:53Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "52.213.180.42", + "pkt_srcaddr": "10.0.0.62", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 18, + "tcp_flags_array": [ + "syn", + "ack" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "52.213.180.42", + "ip": "52.213.180.42", + "port": 43418 + }, + "event": { + "action": "accept", + "end": "2019-08-26T19:48:53Z", + "outcome": "success", + "start": "2019-08-26T19:47:55Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 632 12 1566848875 1566848933 ACCEPT 18 OK", + "network": { + "bytes": 632, + "iana_number": "6", + "packets": 12, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.62", + "52.213.180.42" + ] + }, + "source": { + "address": "10.0.0.62", + "bytes": 632, + "ip": "10.0.0.62", + "packets": 12, + "port": 5001 + } + }, + { + "@timestamp": "2019-08-26T19:51:53Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "52.213.180.42", + "pkt_srcaddr": "10.0.0.62", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 1, + "tcp_flags_array": [ + "fin" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "52.213.180.42", + "ip": "52.213.180.42", + "port": 43418 + }, + "event": { + "action": "accept", + "end": "2019-08-26T19:51:53Z", + "outcome": "success", + "start": "2019-08-26T19:48:53Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 63388 1219 1566848933 1566849113 ACCEPT 1 OK", + "network": { + "bytes": 63388, + "iana_number": "6", + "packets": 1219, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.62", + "52.213.180.42" + ] + }, + "source": { + "address": "10.0.0.62", + "bytes": 63388, + "ip": "10.0.0.62", + "packets": 1219, + "port": 5001 + } + }, + { + "@timestamp": "2019-08-26T19:51:53Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "10.0.0.62", + "pkt_srcaddr": "52.213.180.42", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 1, + "tcp_flags_array": [ + "fin" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.0.0.62", + "ip": "10.0.0.62", + "port": 5001 + }, + "event": { + "action": "accept", + "end": "2019-08-26T19:51:53Z", + "outcome": "success", + "start": "2019-08-26T19:48:53Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 23294588 15774 1566848933 1566849113 ACCEPT 1 OK", + "network": { + "bytes": 23294588, + "iana_number": "6", + "packets": 15774, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "52.213.180.42", + "10.0.0.62" + ] + }, + "source": { + "address": "52.213.180.42", + "bytes": 23294588, + "ip": "52.213.180.42", + "packets": 15774, + "port": 43418 + } + }, + { + "@timestamp": "2019-08-27T19:13:13Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "10.0.0.62", + "pkt_srcaddr": "52.213.180.42", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 3, + "tcp_flags_array": [ + "fin", + "syn" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.0.0.62", + "ip": "10.0.0.62", + "port": 5001 + }, + "event": { + "action": "accept", + "end": "2019-08-27T19:13:13Z", + "outcome": "success", + "start": "2019-08-27T19:12:13Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43638 5001 52.213.180.42 10.0.0.62 6 1260 17 1566933133 1566933193 ACCEPT 3 OK", + "network": { + "bytes": 1260, + "iana_number": "6", + "packets": 17, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "52.213.180.42", + "10.0.0.62" + ] + }, + "source": { + "address": "52.213.180.42", + "bytes": 1260, + "ip": "52.213.180.42", + "packets": 17, + "port": 43638 + } + }, + { + "@timestamp": "2019-08-27T19:13:13Z", + "aws": { + "vpcflow": { + "interface_id": "eni-1235b8ca123456789", + "log_status": "OK", + "pkt_dstaddr": "52.213.180.42", + "pkt_srcaddr": "10.0.0.62", + "subnet_id": "subnet-aaaaaaaa012345678", + "tcp_flags": 19, + "tcp_flags_array": [ + "fin", + "syn", + "ack" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "52.213.180.42", + "ip": "52.213.180.42", + "port": 43638 + }, + "event": { + "action": "accept", + "end": "2019-08-27T19:13:13Z", + "outcome": "success", + "start": "2019-08-27T19:12:13Z", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43638 10.0.0.62 52.213.180.42 6 967 14 1566933133 1566933193 ACCEPT 19 OK", + "network": { + "bytes": 967, + "iana_number": "6", + "packets": 14, + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.0.0.62", + "52.213.180.42" + ] + }, + "source": { + "address": "10.0.0.62", + "bytes": 967, + "ip": "10.0.0.62", + "packets": 14, + "port": 5001 + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/transit-gateway.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/transit-gateway.golden.json new file mode 100644 index 000000000000..57ca61c35520 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/transit-gateway.golden.json @@ -0,0 +1,228 @@ +[ + { + "aws": { + "vpcflow": { + "interface_id": "eni-33333333333333333", + "log_status": "OK", + "pkt_dstaddr": "10.40.2.236", + "pkt_srcaddr": "10.20.33.164", + "subnet_id": "subnet-22222222bbbbbbbbb", + "tcp_flags": 3, + "tcp_flags_array": [ + "fin", + "syn" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.40.2.236", + "ip": "10.40.2.236", + "port": 80 + }, + "event": { + "action": "accept", + "outcome": "success", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.20.33.164 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK", + "network": { + "iana_number": "6", + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.20.33.164", + "10.40.2.236" + ] + }, + "source": { + "address": "10.20.33.164", + "ip": "10.20.33.164", + "port": 39812 + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-33333333333333333", + "log_status": "OK", + "pkt_dstaddr": "10.20.33.164", + "pkt_srcaddr": "10.40.2.236", + "subnet_id": "subnet-22222222bbbbbbbbb", + "tcp_flags": 19, + "tcp_flags_array": [ + "fin", + "syn", + "ack" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + }, + "instance": { + "id": "i-01234567890123456" + } + }, + "destination": { + "address": "10.20.33.164", + "ip": "10.20.33.164", + "port": 39812 + }, + "event": { + "action": "accept", + "outcome": "success", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.40.2.236 10.20.33.164 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK", + "network": { + "iana_number": "6", + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.40.2.236", + "10.20.33.164" + ] + }, + "source": { + "address": "10.40.2.236", + "ip": "10.40.2.236", + "port": 80 + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-11111111111111111", + "log_status": "OK", + "pkt_dstaddr": "10.40.2.236", + "pkt_srcaddr": "10.20.33.164", + "subnet_id": "subnet-11111111aaaaaaaaa", + "tcp_flags": 3, + "tcp_flags_array": [ + "fin", + "syn" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "10.40.2.236", + "ip": "10.40.2.236", + "port": 80 + }, + "event": { + "action": "accept", + "outcome": "success", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 eni-11111111111111111 123456789010 vpc-abcdefab012345678 subnet-11111111aaaaaaaaa - 10.40.1.175 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK", + "network": { + "iana_number": "6", + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.40.1.175", + "10.40.2.236", + "10.20.33.164" + ] + }, + "source": { + "address": "10.40.1.175", + "ip": "10.40.1.175", + "port": 39812 + } + }, + { + "aws": { + "vpcflow": { + "interface_id": "eni-22222222222222222", + "log_status": "OK", + "pkt_dstaddr": "10.20.33.164", + "pkt_srcaddr": "10.40.2.236", + "subnet_id": "subnet-22222222bbbbbbbbb", + "tcp_flags": 19, + "tcp_flags_array": [ + "fin", + "syn", + "ack" + ], + "type": "IPv4", + "version": 3, + "vpc_id": "vpc-abcdefab012345678" + } + }, + "cloud": { + "account": { + "id": "123456789010" + } + }, + "destination": { + "address": "10.40.2.31", + "ip": "10.40.2.31", + "port": 39812 + }, + "event": { + "action": "accept", + "outcome": "success", + "type": [ + "connection", + "allowed" + ] + }, + "message": "3 eni-22222222222222222 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb - 10.40.2.236 10.40.2.31 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK", + "network": { + "iana_number": "6", + "transport": "tcp", + "type": "ipv4" + }, + "related": { + "ip": [ + "10.40.2.236", + "10.40.2.31", + "10.20.33.164" + ] + }, + "source": { + "address": "10.40.2.236", + "ip": "10.40.2.236", + "port": 80 + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs.golden.json new file mode 100644 index 000000000000..09cdb9c44756 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs.golden.json @@ -0,0 +1,74 @@ +[ + { + "@timestamp": "2021-03-26T03:29:09Z", + "aws": { + "vpcflow": { + "interface_id": "eni-069xxxxxb7a490", + "log_status": "OK", + "pkt_dst_aws_service": "CLOUDFRONT", + "pkt_dstaddr": "10.200.0.80", + "pkt_src_aws_service": "AMAZON", + "pkt_srcaddr": "89.160.20.156", + "sublocation_id": "fake-id", + "sublocation_type": "wavelength", + "subnet_id": "subnet-02d645xxxxxxxdbc0", + "tcp_flags": 1, + "tcp_flags_array": [ + "fin" + ], + "traffic_path": 1, + "type": "IPv4", + "version": 5, + "vpc_id": "vpc-09676f97xxxxxb8a7" + } + }, + "cloud": { + "account": { + "id": "64111117617" + }, + "availability_zone": "use1-az5", + "instance": { + "id": "i-0axxxxxx1ad77" + }, + "region": "us-east-1" + }, + "destination": { + "address": "10.200.0.0", + "ip": "10.200.0.0", + "port": 33004 + }, + "event": { + "action": "reject", + "end": "2021-03-26T03:29:09Z", + "outcome": "failure", + "start": "2021-03-26T03:28:12Z", + "type": [ + "connection", + "denied" + ] + }, + "message": "5 64111117617 eni-069xxxxxb7a490 89.160.20.156 10.200.0.0 50041 33004 17 52 1 1616729292 1616729349 REJECT OK vpc-09676f97xxxxxb8a7 subnet-02d645xxxxxxxdbc0 i-0axxxxxx1ad77 1 IPv4 89.160.20.156 10.200.0.80 us-east-1 use1-az5 wavelength fake-id AMAZON CLOUDFRONT ingress 1", + "network": { + "bytes": 1, + "direction": "ingress", + "iana_number": "17", + "packets": 52, + "transport": "udp", + "type": "ipv4" + }, + "related": { + "ip": [ + "89.160.20.156", + "10.200.0.0", + "10.200.0.80" + ] + }, + "source": { + "address": "89.160.20.156", + "bytes": 1, + "ip": "89.160.20.156", + "packets": 52, + "port": 50041 + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs_and_original.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs_and_original.golden.json new file mode 100644 index 000000000000..8ad491c45ccf --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-ecs_and_original.golden.json @@ -0,0 +1,89 @@ +[ + { + "@timestamp": "2021-03-26T03:29:09Z", + "aws": { + "vpcflow": { + "account_id": "64111117617", + "action": "REJECT", + "az_id": "use1-az5", + "bytes": 1, + "dstaddr": "10.200.0.0", + "dstport": 33004, + "end": "2021-03-26T03:29:09Z", + "flow_direction": "ingress", + "instance_id": "i-0axxxxxx1ad77", + "interface_id": "eni-069xxxxxb7a490", + "log_status": "OK", + "packets": 52, + "pkt_dst_aws_service": "CLOUDFRONT", + "pkt_dstaddr": "10.200.0.80", + "pkt_src_aws_service": "AMAZON", + "pkt_srcaddr": "89.160.20.156", + "protocol": 17, + "region": "us-east-1", + "srcaddr": "89.160.20.156", + "srcport": 50041, + "start": "2021-03-26T03:28:12Z", + "sublocation_id": "fake-id", + "sublocation_type": "wavelength", + "subnet_id": "subnet-02d645xxxxxxxdbc0", + "tcp_flags": 1, + "tcp_flags_array": [ + "fin" + ], + "traffic_path": 1, + "type": "IPv4", + "version": 5, + "vpc_id": "vpc-09676f97xxxxxb8a7" + } + }, + "cloud": { + "account": { + "id": "64111117617" + }, + "availability_zone": "use1-az5", + "instance": { + "id": "i-0axxxxxx1ad77" + }, + "region": "us-east-1" + }, + "destination": { + "address": "10.200.0.0", + "ip": "10.200.0.0", + "port": 33004 + }, + "event": { + "action": "reject", + "end": "2021-03-26T03:29:09Z", + "outcome": "failure", + "start": "2021-03-26T03:28:12Z", + "type": [ + "connection", + "denied" + ] + }, + "message": "5 64111117617 eni-069xxxxxb7a490 89.160.20.156 10.200.0.0 50041 33004 17 52 1 1616729292 1616729349 REJECT OK vpc-09676f97xxxxxb8a7 subnet-02d645xxxxxxxdbc0 i-0axxxxxx1ad77 1 IPv4 89.160.20.156 10.200.0.80 us-east-1 use1-az5 wavelength fake-id AMAZON CLOUDFRONT ingress 1", + "network": { + "bytes": 1, + "direction": "ingress", + "iana_number": "17", + "packets": 52, + "transport": "udp", + "type": "ipv4" + }, + "related": { + "ip": [ + "89.160.20.156", + "10.200.0.0", + "10.200.0.80" + ] + }, + "source": { + "address": "89.160.20.156", + "bytes": 1, + "ip": "89.160.20.156", + "packets": 52, + "port": 50041 + } + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-original.golden.json b/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-original.golden.json new file mode 100644 index 000000000000..3493c7e256f6 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/testdata/v5-fields-original.golden.json @@ -0,0 +1,41 @@ +[ + { + "aws": { + "vpcflow": { + "account_id": "64111117617", + "action": "REJECT", + "az_id": "use1-az5", + "bytes": 1, + "dstaddr": "10.200.0.0", + "dstport": 33004, + "end": "2021-03-26T03:29:09Z", + "flow_direction": "ingress", + "instance_id": "i-0axxxxxx1ad77", + "interface_id": "eni-069xxxxxb7a490", + "log_status": "OK", + "packets": 52, + "pkt_dst_aws_service": "CLOUDFRONT", + "pkt_dstaddr": "10.200.0.80", + "pkt_src_aws_service": "AMAZON", + "pkt_srcaddr": "89.160.20.156", + "protocol": 17, + "region": "us-east-1", + "srcaddr": "89.160.20.156", + "srcport": 50041, + "start": "2021-03-26T03:28:12Z", + "sublocation_id": "fake-id", + "sublocation_type": "wavelength", + "subnet_id": "subnet-02d645xxxxxxxdbc0", + "tcp_flags": 1, + "tcp_flags_array": [ + "fin" + ], + "traffic_path": 1, + "type": "IPv4", + "version": 5, + "vpc_id": "vpc-09676f97xxxxxb8a7" + } + }, + "message": "5 64111117617 eni-069xxxxxb7a490 89.160.20.156 10.200.0.0 50041 33004 17 52 1 1616729292 1616729349 REJECT OK vpc-09676f97xxxxxb8a7 subnet-02d645xxxxxxxdbc0 i-0axxxxxx1ad77 1 IPv4 89.160.20.156 10.200.0.80 us-east-1 use1-az5 wavelength fake-id AMAZON CLOUDFRONT ingress 1" + } +] diff --git a/x-pack/filebeat/processors/aws_vpcflow/types.go b/x-pack/filebeat/processors/aws_vpcflow/types.go new file mode 100644 index 000000000000..6c733c331fb1 --- /dev/null +++ b/x-pack/filebeat/processors/aws_vpcflow/types.go @@ -0,0 +1,83 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws_vpcflow + +import ( + "errors" + "fmt" + "net" + "strconv" + "time" +) + +// dataType specifies one of AWS VPC flow field data types. +type dataType uint8 + +// List of DataTypes. +const ( + integerType dataType = iota + 1 + longType + stringType + ipType + timestampType +) + +var dataTypeNames = map[dataType]string{ + integerType: "integer", + longType: "long", + stringType: "string", + ipType: "ip", + timestampType: "timestamp", +} + +func (dt dataType) String() string { + if dt < integerType || timestampType < dt { + return fmt.Sprintf("invaild(%d)", dt) + } + return dataTypeNames[dt] +} + +// toType converts the given value string value to the specified data type. +func toType(value string, typ dataType) (interface{}, error) { + switch typ { + case stringType: + return value, nil + case longType: + return toLong(value) + case integerType: + return toInteger(value) + case ipType: + return toIP(value) + case timestampType: + return toTimestamp(value) + default: + return nil, fmt.Errorf("invalid data type: %v", typ) + } +} + +func toLong(v string) (int64, error) { + return strconv.ParseInt(v, 0, 64) +} + +func toInteger(v string) (int32, error) { + i, err := strconv.ParseInt(v, 0, 32) + return int32(i), err +} + +func toIP(v string) (string, error) { + // This is validating that the value is an IP. + if net.ParseIP(v) != nil { + return v, nil + } + return "", errors.New("value is not a valid IP address") +} + +func toTimestamp(v string) (time.Time, error) { + sec, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return time.Time{}, err + } + return time.Unix(sec, 0).UTC(), nil +} diff --git a/x-pack/functionbeat/docs/index.asciidoc b/x-pack/functionbeat/docs/index.asciidoc index 2fc5cdc98538..0ecfb31faee5 100644 --- a/x-pack/functionbeat/docs/index.asciidoc +++ b/x-pack/functionbeat/docs/index.asciidoc @@ -29,6 +29,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[] :no_repos: :no_decode_cef_processor: :no_decode_csv_fields_processor: +:no_parse_aws_vpc_flow_log_processor: :no_script_processor: :no_timestamp_processor: :no_keystore: