Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory allocations in decode_cef processor #16587

Merged
merged 5 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve ECS field mappings in aws module. {issue}16154[16154] {pull}16307[16307]
- Improve ECS categorization field mappings in googlecloud module. {issue}16030[16030] {pull}16500[16500]
- Improve ECS field mappings in haproxy module. {issue}16162[16162] {pull}16529[16529]
- Improve the decode_cef processor by reducing the number of memory allocations. {pull}16587[16587]

*Heartbeat*

Expand Down
55 changes: 35 additions & 20 deletions x-pack/filebeat/processors/decode_cef/cef/cef.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package cef

import (
"bytes"
"strings"

"github.com/pkg/errors"
"go.uber.org/multierr"
Expand Down Expand Up @@ -59,7 +59,7 @@ type Event struct {
Extensions map[string]*Field `json:"extensions,omitempty"`
}

func (e *Event) init() {
func (e *Event) init(data string) {
e.Version = -1
e.DeviceVendor = ""
e.DeviceProduct = ""
Expand All @@ -68,13 +68,25 @@ func (e *Event) init() {
e.Name = ""
e.Severity = ""
e.Extensions = nil

// Estimate length of the extensions. But limit the allocation because
// it's based on user input. This doesn't account for escaped equals.
if n := strings.Count(data, "="); n > 0 {
const maxLen = 50
if n <= maxLen {
e.Extensions = make(map[string]*Field, n)
} else {
e.Extensions = make(map[string]*Field, maxLen)
}
}
}

func (e *Event) pushExtension(key []byte, value []byte) {
func (e *Event) pushExtension(key, value string) {
if e.Extensions == nil {
e.Extensions = map[string]*Field{}
}
e.Extensions[string(key)] = &Field{String: string(value)}
field := &Field{String: value}
e.Extensions[key] = field
}

// Unpack unpacks a common event format (CEF) message. The data is expected to
Expand All @@ -99,7 +111,7 @@ func (e *Event) pushExtension(key []byte, value []byte) {
// and may contain alphanumeric, underscore (_), period (.), comma (,), and
// brackets ([) (]). This is less strict than the CEF specification, but aligns
// the key names used in practice.
func (e *Event) Unpack(data []byte, opts ...Option) error {
func (e *Event) Unpack(data string, opts ...Option) error {
var settings Settings
for _, opt := range opts {
opt.Apply(&settings)
Expand Down Expand Up @@ -137,29 +149,32 @@ func (e *Event) Unpack(data []byte, opts ...Option) error {
return multierr.Combine(errs...)
}

var (
backslash = []byte(`\`)
escapedBackslash = []byte(`\\`)
const (
backslash = `\`
escapedBackslash = `\\`

pipe = []byte(`|`)
escapedPipe = []byte(`\|`)
pipe = `|`
escapedPipe = `\|`

equalsSign = []byte(`=`)
escapedEqualsSign = []byte(`\=`)
equalsSign = `=`
escapedEqualsSign = `\=`
)

var (
headerEscapes = strings.NewReplacer(escapedBackslash, backslash, escapedPipe, pipe)
extensionEscapes = strings.NewReplacer(escapedBackslash, backslash, escapedEqualsSign, equalsSign)
)

func replaceHeaderEscapes(b []byte) []byte {
if bytes.IndexByte(b, '\\') != -1 {
b = bytes.ReplaceAll(b, escapedBackslash, backslash)
b = bytes.ReplaceAll(b, escapedPipe, pipe)
func replaceHeaderEscapes(b string) string {
if strings.Index(b, escapedBackslash) != -1 || strings.Index(b, escapedPipe) != -1 {
return headerEscapes.Replace(b)
}
return b
}

func replaceExtensionEscapes(b []byte) []byte {
if bytes.IndexByte(b, '\\') != -1 {
b = bytes.ReplaceAll(b, escapedBackslash, backslash)
b = bytes.ReplaceAll(b, escapedEqualsSign, equalsSign)
func replaceExtensionEscapes(b string) string {
if strings.Index(b, escapedBackslash) != -1 || strings.Index(b, escapedEqualsSign) != -1 {
return extensionEscapes.Replace(b)
}
return b
}
26 changes: 13 additions & 13 deletions x-pack/filebeat/processors/decode_cef/cef/cef.rl
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
}%%

// unpack unpacks a CEF message.
func (e *Event) unpack(data []byte) error {
func (e *Event) unpack(data string) error {
cs, p, pe, eof := 0, 0, len(data), len(data)
mark := 0

// Extension key.
var extKey []byte
var extKey string

// Extension value start and end indices.
extValueStart, extValueEnd := 0, 0
Expand All @@ -30,39 +30,39 @@ func (e *Event) unpack(data []byte) error {
// recover from (though the parsing might not be "correct").
var recoveredErrs []error

e.init()
e.init(data)

%%{
# Actions to execute while executing state machine.
action mark {
mark = p
}
action version {
e.Version, _ = strconv.Atoi(string(data[mark:p]))
e.Version, _ = strconv.Atoi(data[mark:p])
}
action device_vendor {
e.DeviceVendor = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceVendor = replaceHeaderEscapes(data[mark:p])
}
action device_product {
e.DeviceProduct = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceProduct = replaceHeaderEscapes(data[mark:p])
}
action device_version {
e.DeviceVersion = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceVersion = replaceHeaderEscapes(data[mark:p])
}
action device_event_class_id {
e.DeviceEventClassID = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceEventClassID = replaceHeaderEscapes(data[mark:p])
}
action name {
e.Name = string(replaceHeaderEscapes(data[mark:p]))
e.Name = replaceHeaderEscapes(data[mark:p])
}
action severity {
e.Severity = string(data[mark:p])
e.Severity = data[mark:p]
}
action extension_key {
// A new extension key marks the end of the last extension value.
if len(extKey) > 0 && extValueStart <= mark - 1 {
e.pushExtension(extKey, replaceExtensionEscapes(data[extValueStart:mark-1]))
extKey, extValueStart, extValueEnd = nil, 0, 0
extKey, extValueStart, extValueEnd = "", 0, 0
}
extKey = data[mark:p]
}
Expand All @@ -77,15 +77,15 @@ func (e *Event) unpack(data []byte) error {
// Reaching the EOF marks the end of the final extension value.
if len(extKey) > 0 && extValueStart <= extValueEnd {
e.pushExtension(extKey, replaceExtensionEscapes(data[extValueStart:extValueEnd]))
extKey, extValueStart, extValueEnd = nil, 0, 0
extKey, extValueStart, extValueEnd = "", 0, 0
}
}
action extension_err {
recoveredErrs = append(recoveredErrs, fmt.Errorf("malformed value for %s at pos %d", extKey, p+1))
fhold; fgoto gobble_extension;
}
action recover_next_extension {
extKey, extValueStart, extValueEnd = nil, 0, 0
extKey, extValueStart, extValueEnd = "", 0, 0
// Resume processing at p, the start of the next extension key.
p = mark;
fgoto extensions;
Expand Down
34 changes: 17 additions & 17 deletions x-pack/filebeat/processors/decode_cef/cef/cef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestGenerateFuzzCorpus(t *testing.T) {
func TestEventUnpack(t *testing.T) {
t.Run("standardMessage", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(standardMessage))
err := e.Unpack(standardMessage)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -98,7 +98,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("headerOnly", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(headerOnly))
err := e.Unpack(headerOnly)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -112,7 +112,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("escapedPipeInHeader", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(escapedPipeInHeader))
err := e.Unpack(escapedPipeInHeader)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -130,7 +130,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("equalsSignInHeader", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(equalsSignInHeader))
err := e.Unpack(equalsSignInHeader)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -148,7 +148,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("emptyExtensionValue", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(emptyExtensionValue))
err := e.Unpack(emptyExtensionValue)
assert.Error(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -165,7 +165,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("emptyDeviceFields", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(emptyDeviceFields))
err := e.Unpack(emptyDeviceFields)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "", e.DeviceVendor)
Expand All @@ -183,7 +183,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorEscapedPipeInExtension", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(escapedPipeInExtension))
err := e.Unpack(escapedPipeInExtension)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
assert.Equal(t, "threatmanager", e.DeviceProduct)
Expand All @@ -199,7 +199,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("leadingWhitespace", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(leadingWhitespace))
err := e.Unpack(leadingWhitespace)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -217,7 +217,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("pipeInMessage", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(pipeInMessage))
err := e.Unpack(pipeInMessage)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -233,7 +233,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorEqualsInMessage", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(equalsInMessage))
err := e.Unpack(equalsInMessage)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
assert.Equal(t, "threatmanager", e.DeviceProduct)
Expand All @@ -249,7 +249,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("escapesInExtension", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(escapesInExtension))
err := e.Unpack(escapesInExtension)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -266,7 +266,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorMalformedExtensionEscape", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(malformedExtensionEscape))
err := e.Unpack(malformedExtensionEscape)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "FooBar", e.DeviceVendor)
assert.Equal(t, "Web Gateway", e.DeviceProduct)
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorMultipleMalformedExtensionValues", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(multipleMalformedExtensionValues))
err := e.Unpack(multipleMalformedExtensionValues)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "vendor", e.DeviceVendor)
assert.Equal(t, "product", e.DeviceProduct)
Expand All @@ -319,14 +319,14 @@ func TestEventUnpack(t *testing.T) {

t.Run("empty", func(t *testing.T) {
var e Event
err := e.Unpack([]byte("CEF:0|||||||a="))
err := e.Unpack("CEF:0|||||||a=")
assert.NoError(t, err)
})
}

func TestEventUnpackWithFullExtensionNames(t *testing.T) {
var e Event
err := e.Unpack([]byte(standardMessage), WithFullExtensionNames())
err := e.Unpack(standardMessage, WithFullExtensionNames())
assert.NoError(t, err)
assert.Equal(t, map[string]*Field{
"sourceAddress": IPField("10.0.0.192"),
Expand All @@ -337,9 +337,9 @@ func TestEventUnpackWithFullExtensionNames(t *testing.T) {
}

func BenchmarkEventUnpack(b *testing.B) {
var messages [][]byte
var messages []string
for _, m := range testMessages {
messages = append(messages, []byte(m))
messages = append(messages, m)
}
b.ResetTimer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
line = line[begin:]

var e cef.Event
if err := e.Unpack(line, opts...); err != nil {
if err := e.Unpack(string(line), opts...); err != nil {
log.Println("ERROR:", err, "in:", string(line))
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/processors/decode_cef/cef/fuzz/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// Fuzz is the entry point that go-fuzz uses to fuzz the parser.
func Fuzz(data []byte) int {
var e cef2.Event
if err := e.Unpack(data); err != nil {
if err := e.Unpack(string(data)); err != nil {
return 1
}
return 0
Expand Down
Loading