Skip to content

Commit

Permalink
Reduce memory allocations in decode_cef processor (#16587) (#16946)
Browse files Browse the repository at this point in the history
* Add decode_cef processor benchmarks

* Take advantage of string slicing to avoid allocations

Change the input of the cef parser to accept a string instead of []byte to avoid unnecessary copies.
Data is still copied from the input message when it contains escape sequences.

Another minor improvement is to allocate the map for the CEF extensions up front based on the
estimated number of extension fields.

Results from `go test -run none -bench . -benchtime 5s -benchmem .` before and after:

$ benchcmp before.txt after.txt
benchmark                              old ns/op     new ns/op     delta
BenchmarkProcessorRun/short_msg-12     4833          4684          -3.08%
BenchmarkProcessorRun/long_msg-12      55724         52493         -5.80%

benchmark                              old allocs     new allocs     delta
BenchmarkProcessorRun/short_msg-12     55             41             -25.45%
BenchmarkProcessorRun/long_msg-12      349            219            -37.25%

benchmark                              old bytes     new bytes     delta
BenchmarkProcessorRun/short_msg-12     3728          3424          -8.15%
BenchmarkProcessorRun/long_msg-12      26929         21173         -21.37%

(cherry picked from commit 7e7bc2e)
  • Loading branch information
andrewkroh committed Mar 11, 2020
1 parent 8516f0e commit 83b19c8
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow users to override pipeline ID in fileset input config. {issue}9531[9531] {pull}16561[16561]
- Improve ECS categorization field mappings in logstash module. {issue}16169[16169] {pull}16668[16668]
- Improve ECS categorization field mappings in iis module. {issue}16165[16165] {pull}16618[16618]
- Improve the decode_cef processor by reducing the number of memory allocations. {pull}16587[16587]
- Improve ECS categorization field mapping in kafka module. {issue}16167[16167] {pull}16645[16645]

*Heartbeat*
Expand Down
55 changes: 35 additions & 20 deletions x-pack/filebeat/processors/decode_cef/cef/cef.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package cef

import (
"bytes"
"strings"

"github.com/pkg/errors"
"go.uber.org/multierr"
Expand Down Expand Up @@ -59,7 +59,7 @@ type Event struct {
Extensions map[string]*Field `json:"extensions,omitempty"`
}

func (e *Event) init() {
func (e *Event) init(data string) {
e.Version = -1
e.DeviceVendor = ""
e.DeviceProduct = ""
Expand All @@ -68,13 +68,25 @@ func (e *Event) init() {
e.Name = ""
e.Severity = ""
e.Extensions = nil

// Estimate length of the extensions. But limit the allocation because
// it's based on user input. This doesn't account for escaped equals.
if n := strings.Count(data, "="); n > 0 {
const maxLen = 50
if n <= maxLen {
e.Extensions = make(map[string]*Field, n)
} else {
e.Extensions = make(map[string]*Field, maxLen)
}
}
}

func (e *Event) pushExtension(key []byte, value []byte) {
func (e *Event) pushExtension(key, value string) {
if e.Extensions == nil {
e.Extensions = map[string]*Field{}
}
e.Extensions[string(key)] = &Field{String: string(value)}
field := &Field{String: value}
e.Extensions[key] = field
}

// Unpack unpacks a common event format (CEF) message. The data is expected to
Expand All @@ -99,7 +111,7 @@ func (e *Event) pushExtension(key []byte, value []byte) {
// and may contain alphanumeric, underscore (_), period (.), comma (,), and
// brackets ([) (]). This is less strict than the CEF specification, but aligns
// the key names used in practice.
func (e *Event) Unpack(data []byte, opts ...Option) error {
func (e *Event) Unpack(data string, opts ...Option) error {
var settings Settings
for _, opt := range opts {
opt.Apply(&settings)
Expand Down Expand Up @@ -137,29 +149,32 @@ func (e *Event) Unpack(data []byte, opts ...Option) error {
return multierr.Combine(errs...)
}

var (
backslash = []byte(`\`)
escapedBackslash = []byte(`\\`)
const (
backslash = `\`
escapedBackslash = `\\`

pipe = []byte(`|`)
escapedPipe = []byte(`\|`)
pipe = `|`
escapedPipe = `\|`

equalsSign = []byte(`=`)
escapedEqualsSign = []byte(`\=`)
equalsSign = `=`
escapedEqualsSign = `\=`
)

var (
headerEscapes = strings.NewReplacer(escapedBackslash, backslash, escapedPipe, pipe)
extensionEscapes = strings.NewReplacer(escapedBackslash, backslash, escapedEqualsSign, equalsSign)
)

func replaceHeaderEscapes(b []byte) []byte {
if bytes.IndexByte(b, '\\') != -1 {
b = bytes.ReplaceAll(b, escapedBackslash, backslash)
b = bytes.ReplaceAll(b, escapedPipe, pipe)
func replaceHeaderEscapes(b string) string {
if strings.Index(b, escapedBackslash) != -1 || strings.Index(b, escapedPipe) != -1 {
return headerEscapes.Replace(b)
}
return b
}

func replaceExtensionEscapes(b []byte) []byte {
if bytes.IndexByte(b, '\\') != -1 {
b = bytes.ReplaceAll(b, escapedBackslash, backslash)
b = bytes.ReplaceAll(b, escapedEqualsSign, equalsSign)
func replaceExtensionEscapes(b string) string {
if strings.Index(b, escapedBackslash) != -1 || strings.Index(b, escapedEqualsSign) != -1 {
return extensionEscapes.Replace(b)
}
return b
}
26 changes: 13 additions & 13 deletions x-pack/filebeat/processors/decode_cef/cef/cef.rl
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
}%%

// unpack unpacks a CEF message.
func (e *Event) unpack(data []byte) error {
func (e *Event) unpack(data string) error {
cs, p, pe, eof := 0, 0, len(data), len(data)
mark := 0

// Extension key.
var extKey []byte
var extKey string

// Extension value start and end indices.
extValueStart, extValueEnd := 0, 0
Expand All @@ -30,39 +30,39 @@ func (e *Event) unpack(data []byte) error {
// recover from (though the parsing might not be "correct").
var recoveredErrs []error

e.init()
e.init(data)

%%{
# Actions to execute while executing state machine.
action mark {
mark = p
}
action version {
e.Version, _ = strconv.Atoi(string(data[mark:p]))
e.Version, _ = strconv.Atoi(data[mark:p])
}
action device_vendor {
e.DeviceVendor = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceVendor = replaceHeaderEscapes(data[mark:p])
}
action device_product {
e.DeviceProduct = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceProduct = replaceHeaderEscapes(data[mark:p])
}
action device_version {
e.DeviceVersion = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceVersion = replaceHeaderEscapes(data[mark:p])
}
action device_event_class_id {
e.DeviceEventClassID = string(replaceHeaderEscapes(data[mark:p]))
e.DeviceEventClassID = replaceHeaderEscapes(data[mark:p])
}
action name {
e.Name = string(replaceHeaderEscapes(data[mark:p]))
e.Name = replaceHeaderEscapes(data[mark:p])
}
action severity {
e.Severity = string(data[mark:p])
e.Severity = data[mark:p]
}
action extension_key {
// A new extension key marks the end of the last extension value.
if len(extKey) > 0 && extValueStart <= mark - 1 {
e.pushExtension(extKey, replaceExtensionEscapes(data[extValueStart:mark-1]))
extKey, extValueStart, extValueEnd = nil, 0, 0
extKey, extValueStart, extValueEnd = "", 0, 0
}
extKey = data[mark:p]
}
Expand All @@ -77,15 +77,15 @@ func (e *Event) unpack(data []byte) error {
// Reaching the EOF marks the end of the final extension value.
if len(extKey) > 0 && extValueStart <= extValueEnd {
e.pushExtension(extKey, replaceExtensionEscapes(data[extValueStart:extValueEnd]))
extKey, extValueStart, extValueEnd = nil, 0, 0
extKey, extValueStart, extValueEnd = "", 0, 0
}
}
action extension_err {
recoveredErrs = append(recoveredErrs, fmt.Errorf("malformed value for %s at pos %d", extKey, p+1))
fhold; fgoto gobble_extension;
}
action recover_next_extension {
extKey, extValueStart, extValueEnd = nil, 0, 0
extKey, extValueStart, extValueEnd = "", 0, 0
// Resume processing at p, the start of the next extension key.
p = mark;
fgoto extensions;
Expand Down
38 changes: 19 additions & 19 deletions x-pack/filebeat/processors/decode_cef/cef/cef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestGenerateFuzzCorpus(t *testing.T) {
func TestEventUnpack(t *testing.T) {
t.Run("standardMessage", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(standardMessage))
err := e.Unpack(standardMessage)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -98,7 +98,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("headerOnly", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(headerOnly))
err := e.Unpack(headerOnly)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -112,7 +112,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("escapedPipeInHeader", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(escapedPipeInHeader))
err := e.Unpack(escapedPipeInHeader)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -130,7 +130,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("equalsSignInHeader", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(equalsSignInHeader))
err := e.Unpack(equalsSignInHeader)
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -148,7 +148,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("emptyExtensionValue", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(emptyExtensionValue))
err := e.Unpack(emptyExtensionValue)
assert.Error(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -165,7 +165,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("emptyDeviceFields", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(emptyDeviceFields))
err := e.Unpack(emptyDeviceFields)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "", e.DeviceVendor)
Expand All @@ -183,23 +183,23 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorEscapedPipeInExtension", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(escapedPipeInExtension))
err := e.Unpack(escapedPipeInExtension)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
assert.Equal(t, "threatmanager", e.DeviceProduct)
assert.Equal(t, "1.0", e.DeviceVersion)
assert.Equal(t, "100", e.DeviceEventClassID)
assert.Equal(t, "trojan successfully stopped", e.Name)
assert.Equal(t, "10", e.Severity)
assert.Nil(t, e.Extensions)
assert.Empty(t, e.Extensions)

// Pipes in extensions should not be escaped.
assert.Error(t, err)
})

t.Run("leadingWhitespace", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(leadingWhitespace))
err := e.Unpack(leadingWhitespace)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -217,7 +217,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("pipeInMessage", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(pipeInMessage))
err := e.Unpack(pipeInMessage)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -233,23 +233,23 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorEqualsInMessage", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(equalsInMessage))
err := e.Unpack(equalsInMessage)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
assert.Equal(t, "threatmanager", e.DeviceProduct)
assert.Equal(t, "1.0", e.DeviceVersion)
assert.Equal(t, "100", e.DeviceEventClassID)
assert.Equal(t, "trojan successfully stopped", e.Name)
assert.Equal(t, "10", e.Severity)
assert.Nil(t, e.Extensions)
assert.Empty(t, e.Extensions)

// moo contains unescaped equals signs.
assert.Error(t, err)
})

t.Run("escapesInExtension", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(escapesInExtension))
err := e.Unpack(escapesInExtension)
assert.NoError(t, err)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
Expand All @@ -266,7 +266,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorMalformedExtensionEscape", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(malformedExtensionEscape))
err := e.Unpack(malformedExtensionEscape)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "FooBar", e.DeviceVendor)
assert.Equal(t, "Web Gateway", e.DeviceProduct)
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestEventUnpack(t *testing.T) {

t.Run("errorMultipleMalformedExtensionValues", func(t *testing.T) {
var e Event
err := e.Unpack([]byte(multipleMalformedExtensionValues))
err := e.Unpack(multipleMalformedExtensionValues)
assert.Equal(t, 0, e.Version)
assert.Equal(t, "vendor", e.DeviceVendor)
assert.Equal(t, "product", e.DeviceProduct)
Expand All @@ -319,14 +319,14 @@ func TestEventUnpack(t *testing.T) {

t.Run("empty", func(t *testing.T) {
var e Event
err := e.Unpack([]byte("CEF:0|||||||a="))
err := e.Unpack("CEF:0|||||||a=")
assert.NoError(t, err)
})
}

func TestEventUnpackWithFullExtensionNames(t *testing.T) {
var e Event
err := e.Unpack([]byte(standardMessage), WithFullExtensionNames())
err := e.Unpack(standardMessage, WithFullExtensionNames())
assert.NoError(t, err)
assert.Equal(t, map[string]*Field{
"sourceAddress": IPField("10.0.0.192"),
Expand All @@ -337,9 +337,9 @@ func TestEventUnpackWithFullExtensionNames(t *testing.T) {
}

func BenchmarkEventUnpack(b *testing.B) {
var messages [][]byte
var messages []string
for _, m := range testMessages {
messages = append(messages, []byte(m))
messages = append(messages, m)
}
b.ResetTimer()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
line = line[begin:]

var e cef.Event
if err := e.Unpack(line, opts...); err != nil {
if err := e.Unpack(string(line), opts...); err != nil {
log.Println("ERROR:", err, "in:", string(line))
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/processors/decode_cef/cef/fuzz/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// Fuzz is the entry point that go-fuzz uses to fuzz the parser.
func Fuzz(data []byte) int {
var e cef2.Event
if err := e.Unpack(data); err != nil {
if err := e.Unpack(string(data)); err != nil {
return 1
}
return 0
Expand Down
Loading

0 comments on commit 83b19c8

Please sign in to comment.