Skip to content

Commit

Permalink
x-pack/filebeat/input/netflow: replace invalid field value (elastic#3…
Browse files Browse the repository at this point in the history
…1295)

* x-pack/filebeat/input/netflow: reduce lint and silence linter
* x-pack/filebeat/input/netflow: replace invalid field value
  • Loading branch information
efd6 authored and kush-elastic committed May 2, 2022
1 parent ba6fd25 commit 5440a8d
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Do not emit error log when filestream reader reaches EOF and `close.reader.on_eof` is enabled. {pull}31109[31109]
- m365_defender: Fix processing when alerts.entities is an empty list. {issue}31223[31223] {pull}31227[31227]
- Prevent filestream from rereading whole files if they are rotated using rename. {pull}31268[31268]
- Netflow: replace invalid field value. {pull}31295[31295]

*Heartbeat*

Expand Down
60 changes: 35 additions & 25 deletions x-pack/filebeat/input/netflow/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ func toBeatEvent(flow record.Record, internalNetworks []string) (event beat.Even
}
}

func toBeatEventCommon(flow record.Record) (event beat.Event) {
func toBeatEventCommon(flow record.Record) beat.Event {
const (
flowType = "netflow_flow"
optionsType = "netflow_options"
unknownType = "netflow_unknown"
)

// replace net.HardwareAddress with its String() representation
fixMacAddresses(flow.Fields)
// Nest Exporter into netflow fields
Expand All @@ -43,11 +49,11 @@ func toBeatEventCommon(flow record.Record) (event beat.Event) {
// Nest Type into netflow fields
switch flow.Type {
case record.Flow:
flow.Fields["type"] = "netflow_flow"
flow.Fields["type"] = flowType
case record.Options:
flow.Fields["type"] = "netflow_options"
flow.Fields["type"] = optionsType
default:
flow.Fields["type"] = "netflow_unknown"
flow.Fields["type"] = unknownType
}

// ECS Fields -- event
Expand All @@ -57,7 +63,7 @@ func toBeatEventCommon(flow record.Record) (event beat.Event) {
"category": []string{"network_traffic", "network"},
"action": flow.Fields["type"],
}
if ecsEvent["action"] == "netflow_flow" {
if ecsEvent["action"] == flowType {
ecsEvent["type"] = []string{"connection"}
}
// ECS Fields -- device
Expand All @@ -66,13 +72,14 @@ func toBeatEventCommon(flow record.Record) (event beat.Event) {
ecsDevice["ip"] = extractIPFromIPPort(exporter)
}

event.Timestamp = flow.Timestamp
event.Fields = common.MapStr{
"netflow": fieldNameConverter.ToSnakeCase(flow.Fields),
"event": ecsEvent,
"observer": ecsDevice,
return beat.Event{
Timestamp: flow.Timestamp,
Fields: common.MapStr{
"netflow": fieldNameConverter.ToSnakeCase(flow.Fields),
"event": ecsEvent,
"observer": ecsDevice,
},
}
return
}

func extractIPFromIPPort(address string) string {
Expand Down Expand Up @@ -101,8 +108,8 @@ func optionsToBeatEvent(flow record.Record) beat.Event {
return toBeatEventCommon(flow)
}

func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) {
event = toBeatEventCommon(flow)
func flowToBeatEvent(flow record.Record, internalNetworks []string) beat.Event {
event := toBeatEventCommon(flow)

ecsEvent, ok := event.Fields["event"].(common.MapStr)
if !ok {
Expand All @@ -122,10 +129,10 @@ func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.
hasStartUptime = hasStartUptime && startUptime <= sysUptime
hasEndUptime = hasEndUptime && endUptime <= sysUptime
if hasStartUptime {
ecsEvent["start"] = flow.Timestamp.Add((time.Duration(startUptime) - time.Duration(sysUptime)) * time.Millisecond)
ecsEvent["start"] = flow.Timestamp.Add(time.Duration(startUptime-sysUptime) * time.Millisecond)
}
if hasEndUptime {
ecsEvent["end"] = flow.Timestamp.Add((time.Duration(endUptime) - time.Duration(sysUptime)) * time.Millisecond)
ecsEvent["end"] = flow.Timestamp.Add(time.Duration(endUptime-sysUptime) * time.Millisecond)
}
if hasStartUptime && hasEndUptime {
ecsEvent["duration"] = ecsEvent["end"].(time.Time).Sub(ecsEvent["start"].(time.Time)).Nanoseconds()
Expand Down Expand Up @@ -280,7 +287,7 @@ func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.
if biflowDir == 2 {
ecsDest, ecsSource = ecsSource, ecsDest
}
ecsEvent["category"] = "network_session"
ecsEvent["category"] = []string{"network", "session"}

// Assume source is the client in biflows.
event.Fields["client"] = ecsSource
Expand Down Expand Up @@ -318,7 +325,7 @@ func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.
if len(relatedIP) > 0 {
event.Fields["related"] = common.MapStr{"ip": uniqueIPs(relatedIP)}
}
return
return event
}

// unique returns ips lexically sorted and with repeated elements
Expand All @@ -344,40 +351,40 @@ func uniqueIPs(ips []net.IP) []net.IP {
func getKeyUint64(dict record.Map, key string) (value uint64, found bool) {
iface, found := dict[key]
if !found {
return
return value, found
}
value, found = iface.(uint64)
return
return value, found
}

func getKeyUint64Alternatives(dict record.Map, keys ...string) (value uint64, found bool) {
var iface interface{}
for _, key := range keys {
if iface, found = dict[key]; found {
if value, found = iface.(uint64); found {
return
return value, found
}
}
}
return
return value, found
}

func getKeyString(dict record.Map, key string) (value string, found bool) {
iface, found := dict[key]
if !found {
return
return value, found
}
value, found = iface.(string)
return
return value, found
}

func getKeyIP(dict record.Map, key string) (value net.IP, found bool) {
iface, found := dict[key]
if !found {
return
return value, found
}
value, found = iface.(net.IP)
return
return value, found
}

// Replaces each net.HardwareAddr in the dictionary with its string representation
Expand Down Expand Up @@ -437,6 +444,7 @@ func getIPLocality(internalNetworks []string, ips ...net.IP) Locality {
return LocalityInternal
}

//nolint:godox // Bad linter!
// TODO: create table from https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
// They have a CSV file available for conversion.

Expand Down Expand Up @@ -467,6 +475,7 @@ func (p IPProtocol) String() string {
func flowID(srcIP, dstIP net.IP, srcPort, dstPort uint16, proto uint8) string {
h := xxhash.New()
// Both flows will have the same ID.
//nolint:errcheck // Hash writes never fail.
if srcPort >= dstPort {
h.Write(srcIP)
binary.Write(h, binary.BigEndian, srcPort)
Expand All @@ -478,6 +487,7 @@ func flowID(srcIP, dstIP net.IP, srcPort, dstPort uint16, proto uint8) string {
h.Write(srcIP)
binary.Write(h, binary.BigEndian, srcPort)
}
//nolint:errcheck // Hash writes never fail.
binary.Write(h, binary.BigEndian, proto)

return base64.RawURLEncoding.EncodeToString(h.Sum(nil))
Expand Down
Loading

0 comments on commit 5440a8d

Please sign in to comment.