From d200d2e50fa0e086ea91fbcb76764294040754cf Mon Sep 17 00:00:00 2001 From: splunkertimn <109310003+timannguyen@users.noreply.github.com> Date: Tue, 27 Jun 2023 16:01:07 -0400 Subject: [PATCH] [receiver/syslog] fixed syslog octet frame parsing (#23645) Parse multiple syslog events from a single packet when octet framing is enabled. --- .chloggen/issue-23577.yaml | 20 + pkg/stanza/operator/helper/multiline.go | 1 - pkg/stanza/operator/helper/multiline_test.go | 795 +++++++++--------- .../operator/input/syslog/config_test.go | 1 + pkg/stanza/operator/input/syslog/syslog.go | 46 +- .../operator/input/syslog/syslog_test.go | 76 ++ .../input/syslog/testdata/config.yaml | 1 + pkg/stanza/operator/input/tcp/tcp.go | 17 +- pkg/stanza/operator/internal/test_common.go | 119 +++ 9 files changed, 679 insertions(+), 397 deletions(-) create mode 100644 .chloggen/issue-23577.yaml create mode 100644 pkg/stanza/operator/internal/test_common.go diff --git a/.chloggen/issue-23577.yaml b/.chloggen/issue-23577.yaml new file mode 100644 index 000000000000..120815f42032 --- /dev/null +++ b/.chloggen/issue-23577.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: adding octet counting event breaking for syslog parser + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23577] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index 0470708b8a39..224a9b2b4540 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -56,7 +56,6 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, fo return SplitNone(maxLogSize), nil case endPattern == "" && startPattern == "": splitFunc, err = NewNewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) - if err != nil { return nil, err } diff --git a/pkg/stanza/operator/helper/multiline_test.go b/pkg/stanza/operator/helper/multiline_test.go index dd756949e6b1..19abb1eebc0a 100644 --- a/pkg/stanza/operator/helper/multiline_test.go +++ b/pkg/stanza/operator/helper/multiline_test.go @@ -8,7 +8,6 @@ import ( "bytes" "errors" "fmt" - "io" "regexp" "testing" "time" @@ -17,6 +16,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/text/encoding" "golang.org/x/text/encoding/unicode" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/internal" ) const ( @@ -25,250 +26,196 @@ const ( forcePeriod time.Duration = time.Millisecond * 40 ) -// state is going to keep processing state of the reader -type state struct { - ReadFrom int - Processed int -} - -// reader is a reader which keeps state of readed and processed data -type reader struct { - State *state - Data []byte -} - -// newReader creates reader with empty state -func newReader(data []byte) reader { - return reader{ - State: &state{ - ReadFrom: 0, - Processed: 0, - }, - Data: data, - } -} - -// Read reads data from reader and remebers where reading has been finished -func (r reader) Read(p []byte) (n int, err error) { - // return eof if data has been fully readed - if len(r.Data)-r.State.ReadFrom == 0 { - return 0, io.EOF - } - - // iterate over data char by char and write into p - // until p is full or no more data left to read - i := 0 - for ; i < len(r.Data)-r.State.ReadFrom; i++ { - if i == len(p) { - break - } - p[i] = r.Data[r.State.ReadFrom+i] - } - - // update state - r.State.ReadFrom += i - return i, nil -} - -// Reset resets reader state (sets last readed position to last processed position) -func (r *reader) Reset() { - r.State.ReadFrom = r.State.Processed -} - -func (r *reader) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) - r.State.Processed += advance - return - } -} - -type tokenizerTestCase struct { - Name string - Pattern string - Raw []byte - ExpectedTokenized []string - ExpectedError error - Flusher *Flusher - Sleep time.Duration - AdditionalIterations int - PreserveLeadingWhitespaces bool - PreserveTrailingWhitespaces bool -} - -func (tc tokenizerTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { - reader := newReader(tc.Raw) - - return func(t *testing.T) { - var tokenized []string - for i := 0; i < 1+tc.AdditionalIterations; i++ { - // sleep before next iterations - if i > 0 { - time.Sleep(tc.Sleep) - } - reader.Reset() - scanner := bufio.NewScanner(reader) - scanner.Split(reader.SplitFunc(splitFunc)) - for { - ok := scanner.Scan() - if !ok { - assert.Equal(t, tc.ExpectedError, scanner.Err()) - break - } - tokenized = append(tokenized, scanner.Text()) - } - } - - assert.Equal(t, tc.ExpectedTokenized, tokenized) - } +type MultiLineTokenizerTestCase struct { + internal.TokenizerTestCase + Flusher *Flusher } func TestLineStartSplitFunc(t *testing.T) { - testCases := []tokenizerTestCase{ - { - Name: "OneLogSimple", - Pattern: `LOGSTART \d+ `, - Raw: []byte("LOGSTART 123 log1LOGSTART 123 a"), - ExpectedTokenized: []string{ - `LOGSTART 123 log1`, + testCases := []MultiLineTokenizerTestCase{ + { + internal.TokenizerTestCase{ + Name: "OneLogSimple", + Pattern: `LOGSTART \d+ `, + Raw: []byte("LOGSTART 123 log1LOGSTART 123 a"), + ExpectedTokenized: []string{ + `LOGSTART 123 log1`, + }, + }, + nil, + }, + { + internal.TokenizerTestCase{ + Name: "TwoLogsSimple", + Pattern: `LOGSTART \d+ `, + Raw: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), + ExpectedTokenized: []string{ + `LOGSTART 123 log1`, + `LOGSTART 234 log2`, + }, }, + nil, }, { - Name: "TwoLogsSimple", - Pattern: `LOGSTART \d+ `, - Raw: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), - ExpectedTokenized: []string{ - `LOGSTART 123 log1`, - `LOGSTART 234 log2`, + internal.TokenizerTestCase{ + Name: "TwoLogsLineStart", + Pattern: `^LOGSTART \d+ `, + Raw: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"), + ExpectedTokenized: []string{ + "LOGSTART 123 LOGSTART 345 log1", + "LOGSTART 234 log2", + }, }, + nil, }, { - Name: "TwoLogsLineStart", - Pattern: `^LOGSTART \d+ `, - Raw: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"), - ExpectedTokenized: []string{ - "LOGSTART 123 LOGSTART 345 log1", - "LOGSTART 234 log2", + internal.TokenizerTestCase{ + Name: "NoMatches", + Pattern: `LOGSTART \d+ `, + Raw: []byte(`file that has no matches in it`), }, + nil, }, { - Name: "NoMatches", - Pattern: `LOGSTART \d+ `, - Raw: []byte(`file that has no matches in it`), - }, - { - Name: "PrecedingNonMatches", - Pattern: `LOGSTART \d+ `, - Raw: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), - ExpectedTokenized: []string{ - `part that doesn't match`, - `LOGSTART 123 part that matches`, + internal.TokenizerTestCase{ + Name: "PrecedingNonMatches", + Pattern: `LOGSTART \d+ `, + Raw: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), + ExpectedTokenized: []string{ + `part that doesn't match`, + `LOGSTART 123 part that matches`, + }, }, + nil, }, { - Name: "HugeLog100", - Pattern: `LOGSTART \d+ `, - Raw: func() []byte { - newRaw := []byte(`LOGSTART 123 `) - newRaw = append(newRaw, generatedByteSliceOfLength(100)...) - newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) - return newRaw - }(), - ExpectedTokenized: []string{ - `LOGSTART 123 ` + string(generatedByteSliceOfLength(100)), + internal.TokenizerTestCase{ + Name: "HugeLog100", + Pattern: `LOGSTART \d+ `, + Raw: func() []byte { + newRaw := []byte(`LOGSTART 123 `) + newRaw = append(newRaw, internal.GeneratedByteSliceOfLength(100)...) + newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) + return newRaw + }(), + ExpectedTokenized: []string{ + `LOGSTART 123 ` + string(internal.GeneratedByteSliceOfLength(100)), + }, }, + nil, }, { - Name: "HugeLog10000", - Pattern: `LOGSTART \d+ `, - Raw: func() []byte { - newRaw := []byte(`LOGSTART 123 `) - newRaw = append(newRaw, generatedByteSliceOfLength(10000)...) - newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) - return newRaw - }(), - ExpectedTokenized: []string{ - `LOGSTART 123 ` + string(generatedByteSliceOfLength(10000)), + internal.TokenizerTestCase{ + Name: "HugeLog10000", + Pattern: `LOGSTART \d+ `, + Raw: func() []byte { + newRaw := []byte(`LOGSTART 123 `) + newRaw = append(newRaw, internal.GeneratedByteSliceOfLength(10000)...) + newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) + return newRaw + }(), + ExpectedTokenized: []string{ + `LOGSTART 123 ` + string(internal.GeneratedByteSliceOfLength(10000)), + }, }, + nil, }, { - Name: "ErrTooLong", - Pattern: `LOGSTART \d+ `, - Raw: func() []byte { - newRaw := []byte(`LOGSTART 123 `) - newRaw = append(newRaw, generatedByteSliceOfLength(1000000)...) - newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) - return newRaw - }(), - ExpectedError: errors.New("bufio.Scanner: token too long"), - }, - { - Name: "MultipleMultilineLogs", - Pattern: `^LOGSTART \d+`, - Raw: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"), - ExpectedTokenized: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1", - "LOGSTART 17 log2\nLOGPART log2\nanother line", + internal.TokenizerTestCase{ + Name: "ErrTooLong", + Pattern: `LOGSTART \d+ `, + Raw: func() []byte { + newRaw := []byte(`LOGSTART 123 `) + newRaw = append(newRaw, internal.GeneratedByteSliceOfLength(1000000)...) + newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) + return newRaw + }(), + ExpectedError: errors.New("bufio.Scanner: token too long"), }, + nil, }, - { - Name: "LogsWithoutFlusher", - Pattern: `^LOGSTART \d+`, - Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + { + internal.TokenizerTestCase{ + Name: "MultipleMultilineLogs", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"), + ExpectedTokenized: []string{ + "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1", + "LOGSTART 17 log2\nLOGPART log2\nanother line", + }, + }, + nil, + }, + { + internal.TokenizerTestCase{ + Name: "LogsWithoutFlusher", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + }, + nil, }, { - Name: "LogsWithFlusher", - Pattern: `^LOGSTART \d+`, - Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), - ExpectedTokenized: []string{ - "LOGPART log1\nLOGPART log1", + internal.TokenizerTestCase{ + Name: "LogsWithFlusher", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGPART log1", + }, + + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, { - Name: "LogsWithFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGSTART \d+`, - Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), - ExpectedTokenized: []string{ - "LOGPART log1", - "LOGSTART 123\nLOGPART log1", + internal.TokenizerTestCase{ + Name: "LogsWithFlusherWithMultipleLogsInBuffer", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1", + "LOGSTART 123\nLOGPART log1", + }, + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, { - Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGSTART \d+`, - Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), - ExpectedTokenized: []string{ - "LOGPART log1", + internal.TokenizerTestCase{ + Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1", + }, + AdditionalIterations: 1, + Sleep: forcePeriod / 4, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod * 16, }, - AdditionalIterations: 1, - Sleep: forcePeriod / 4, }, { - Name: "LogsWithFlusherWithLogStartingWithWhiteChars", - Pattern: `^LOGSTART \d+`, - Raw: []byte("\nLOGSTART 333"), - ExpectedTokenized: []string{ - "", - "LOGSTART 333", + internal.TokenizerTestCase{ + Name: "LogsWithFlusherWithLogStartingWithWhiteChars", + Pattern: `^LOGSTART \d+`, + Raw: []byte("\nLOGSTART 333"), + ExpectedTokenized: []string{ + "", + "LOGSTART 333", + }, + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, } @@ -303,147 +250,188 @@ func TestLineStartSplitFunc(t *testing.T) { } func TestLineEndSplitFunc(t *testing.T) { - testCases := []tokenizerTestCase{ + testCases := []MultiLineTokenizerTestCase{ { - Name: "OneLogSimple", - Pattern: `LOGEND \d+`, - Raw: []byte(`my log LOGEND 123`), - ExpectedTokenized: []string{ - `my log LOGEND 123`, + internal.TokenizerTestCase{ + Name: "OneLogSimple", + Pattern: `LOGEND \d+`, + Raw: []byte(`my log LOGEND 123`), + ExpectedTokenized: []string{ + `my log LOGEND 123`, + }, }, + nil, }, { - Name: "TwoLogsSimple", - Pattern: `LOGEND \d+`, - Raw: []byte(`log1 LOGEND 123log2 LOGEND 234`), - ExpectedTokenized: []string{ - `log1 LOGEND 123`, - `log2 LOGEND 234`, + internal.TokenizerTestCase{ + Name: "TwoLogsSimple", + Pattern: `LOGEND \d+`, + Raw: []byte(`log1 LOGEND 123log2 LOGEND 234`), + ExpectedTokenized: []string{ + `log1 LOGEND 123`, + `log2 LOGEND 234`, + }, }, + nil, }, { - Name: "TwoLogsLineEndSimple", - Pattern: `LOGEND$`, - Raw: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"), - ExpectedTokenized: []string{ - "log1 LOGEND LOGEND", - "log2 LOGEND", + internal.TokenizerTestCase{ + Name: "TwoLogsLineEndSimple", + Pattern: `LOGEND$`, + Raw: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"), + ExpectedTokenized: []string{ + "log1 LOGEND LOGEND", + "log2 LOGEND", + }, }, + nil, }, { - Name: "NoMatches", - Pattern: `LOGEND \d+`, - Raw: []byte(`file that has no matches in it`), + internal.TokenizerTestCase{ + Name: "NoMatches", + Pattern: `LOGEND \d+`, + Raw: []byte(`file that has no matches in it`), + }, + nil, }, { - Name: "NonMatchesAfter", - Pattern: `LOGEND \d+`, - Raw: []byte(`part that matches LOGEND 123 part that doesn't match`), - ExpectedTokenized: []string{ - `part that matches LOGEND 123`, + internal.TokenizerTestCase{ + Name: "NonMatchesAfter", + Pattern: `LOGEND \d+`, + Raw: []byte(`part that matches LOGEND 123 part that doesn't match`), + ExpectedTokenized: []string{ + `part that matches LOGEND 123`, + }, }, + nil, }, { - Name: "HugeLog100", - Pattern: `LOGEND \d`, - Raw: func() []byte { - newRaw := generatedByteSliceOfLength(100) - newRaw = append(newRaw, []byte(`LOGEND 1 `)...) - return newRaw - }(), - ExpectedTokenized: []string{ - string(generatedByteSliceOfLength(100)) + `LOGEND 1`, + internal.TokenizerTestCase{ + Name: "HugeLog100", + Pattern: `LOGEND \d`, + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(100) + newRaw = append(newRaw, []byte(`LOGEND 1 `)...) + return newRaw + }(), + ExpectedTokenized: []string{ + string(internal.GeneratedByteSliceOfLength(100)) + `LOGEND 1`, + }, }, + nil, }, { - Name: "HugeLog10000", - Pattern: `LOGEND \d`, - Raw: func() []byte { - newRaw := generatedByteSliceOfLength(10000) - newRaw = append(newRaw, []byte(`LOGEND 1 `)...) - return newRaw - }(), - ExpectedTokenized: []string{ - string(generatedByteSliceOfLength(10000)) + `LOGEND 1`, + internal.TokenizerTestCase{ + Name: "HugeLog10000", + Pattern: `LOGEND \d`, + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(10000) + newRaw = append(newRaw, []byte(`LOGEND 1 `)...) + return newRaw + }(), + ExpectedTokenized: []string{ + string(internal.GeneratedByteSliceOfLength(10000)) + `LOGEND 1`, + }, }, + nil, }, { - Name: "HugeLog1000000", - Pattern: `LOGEND \d`, - Raw: func() []byte { - newRaw := generatedByteSliceOfLength(1000000) - newRaw = append(newRaw, []byte(`LOGEND 1 `)...) - return newRaw - }(), - ExpectedError: errors.New("bufio.Scanner: token too long"), + internal.TokenizerTestCase{ + Name: "HugeLog1000000", + Pattern: `LOGEND \d`, + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(1000000) + newRaw = append(newRaw, []byte(`LOGEND 1 `)...) + return newRaw + }(), + ExpectedError: errors.New("bufio.Scanner: token too long"), + }, + nil, }, { - Name: "MultipleMultilineLogs", - Pattern: `^LOGEND.*$`, - Raw: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"), - ExpectedTokenized: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1", - "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", + internal.TokenizerTestCase{ + Name: "MultipleMultilineLogs", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"), + ExpectedTokenized: []string{ + "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1", + "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", + }, }, + nil, }, { - Name: "LogsWithoutFlusher", - Pattern: `^LOGEND.*$`, - Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), - Flusher: &Flusher{}, + internal.TokenizerTestCase{ + Name: "LogsWithoutFlusher", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + }, + &Flusher{}, }, { - Name: "LogsWithFlusher", - Pattern: `^LOGEND.*$`, - Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), - ExpectedTokenized: []string{ - "LOGPART log1\nLOGPART log1", + internal.TokenizerTestCase{ + Name: "LogsWithFlusher", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGPART log1", + }, + + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, { - Name: "LogsWithFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGEND.*$`, - Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), - ExpectedTokenized: []string{ - "LOGPART log1\nLOGEND", - "LOGPART log1", + internal.TokenizerTestCase{ + Name: "LogsWithFlusherWithMultipleLogsInBuffer", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGEND", + "LOGPART log1", + }, + + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, { - Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGEND.*$`, - Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), - ExpectedTokenized: []string{ - "LOGPART log1\nLOGEND", + internal.TokenizerTestCase{ + Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGEND", + }, + + AdditionalIterations: 1, + Sleep: forcePeriod / 4, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod * 16, }, - AdditionalIterations: 1, - Sleep: forcePeriod / 4, }, { - Name: "LogsWithFlusherWithLogStartingWithWhiteChars", - Pattern: `LOGEND \d+$`, - Raw: []byte("\nLOGEND 333"), - ExpectedTokenized: []string{ - "LOGEND 333", + internal.TokenizerTestCase{ + Name: "LogsWithFlusherWithLogStartingWithWhiteChars", + Pattern: `LOGEND \d+$`, + Raw: []byte("\nLOGEND 333"), + ExpectedTokenized: []string{ + "LOGEND 333", + }, + + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, } @@ -459,132 +447,160 @@ func TestLineEndSplitFunc(t *testing.T) { } func TestNewlineSplitFunc(t *testing.T) { - testCases := []tokenizerTestCase{ + testCases := []MultiLineTokenizerTestCase{ { - Name: "OneLogSimple", - Raw: []byte("my log\n"), - ExpectedTokenized: []string{ - `my log`, - }, + internal.TokenizerTestCase{Name: "OneLogSimple", + Raw: []byte("my log\n"), + ExpectedTokenized: []string{ + `my log`, + }, + }, nil, }, { - Name: "OneLogCarriageReturn", - Raw: []byte("my log\r\n"), - ExpectedTokenized: []string{ - `my log`, + internal.TokenizerTestCase{Name: "OneLogCarriageReturn", + Raw: []byte("my log\r\n"), + ExpectedTokenized: []string{ + `my log`, + }, }, + nil, }, { - Name: "TwoLogsSimple", - Raw: []byte("log1\nlog2\n"), - ExpectedTokenized: []string{ - `log1`, - `log2`, + internal.TokenizerTestCase{Name: "TwoLogsSimple", + Raw: []byte("log1\nlog2\n"), + ExpectedTokenized: []string{ + `log1`, + `log2`, + }, }, + nil, }, { - Name: "TwoLogsCarriageReturn", - Raw: []byte("log1\r\nlog2\r\n"), - ExpectedTokenized: []string{ - `log1`, - `log2`, + internal.TokenizerTestCase{Name: "TwoLogsCarriageReturn", + Raw: []byte("log1\r\nlog2\r\n"), + ExpectedTokenized: []string{ + `log1`, + `log2`, + }, }, + nil, }, { - Name: "NoTailingNewline", - Raw: []byte(`foo`), + internal.TokenizerTestCase{Name: "NoTailingNewline", + Raw: []byte(`foo`), + }, + nil, }, { - Name: "HugeLog100", - Raw: func() []byte { - newRaw := generatedByteSliceOfLength(100) - newRaw = append(newRaw, '\n') - return newRaw - }(), - ExpectedTokenized: []string{ - string(generatedByteSliceOfLength(100)), + internal.TokenizerTestCase{Name: "HugeLog100", + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(100) + newRaw = append(newRaw, '\n') + return newRaw + }(), + ExpectedTokenized: []string{ + string(internal.GeneratedByteSliceOfLength(100)), + }, }, + nil, }, { - Name: "HugeLog10000", - Raw: func() []byte { - newRaw := generatedByteSliceOfLength(10000) - newRaw = append(newRaw, '\n') - return newRaw - }(), - ExpectedTokenized: []string{ - string(generatedByteSliceOfLength(10000)), + internal.TokenizerTestCase{Name: "HugeLog10000", + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(10000) + newRaw = append(newRaw, '\n') + return newRaw + }(), + ExpectedTokenized: []string{ + string(internal.GeneratedByteSliceOfLength(10000)), + }, }, + nil, }, { - Name: "HugeLog1000000", - Raw: func() []byte { - newRaw := generatedByteSliceOfLength(1000000) - newRaw = append(newRaw, '\n') - return newRaw - }(), - ExpectedError: errors.New("bufio.Scanner: token too long"), + internal.TokenizerTestCase{Name: "HugeLog1000000", + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(1000000) + newRaw = append(newRaw, '\n') + return newRaw + }(), + ExpectedError: errors.New("bufio.Scanner: token too long"), + }, + nil, }, { - Name: "LogsWithoutFlusher", - Raw: []byte("LOGPART log1"), - Flusher: &Flusher{}, + internal.TokenizerTestCase{Name: "LogsWithoutFlusher", + Raw: []byte("LOGPART log1"), + }, + + &Flusher{}, }, { - Name: "LogsWithFlusher", - Raw: []byte("LOGPART log1"), - ExpectedTokenized: []string{ - "LOGPART log1", + internal.TokenizerTestCase{Name: "LogsWithFlusher", + Raw: []byte("LOGPART log1"), + ExpectedTokenized: []string{ + "LOGPART log1", + }, + AdditionalIterations: 1, + Sleep: sleepDuration, }, - Flusher: &Flusher{ + &Flusher{ forcePeriod: forcePeriod, }, - AdditionalIterations: 1, - Sleep: sleepDuration, }, { - Name: "DefaultFlusherSplits", - Raw: []byte("log1\nlog2\n"), - ExpectedTokenized: []string{ - "log1", - "log2", + internal.TokenizerTestCase{Name: "DefaultFlusherSplits", + Raw: []byte("log1\nlog2\n"), + ExpectedTokenized: []string{ + "log1", + "log2", + }, }, + nil, }, { - Name: "LogsWithLogStartingWithWhiteChars", - Raw: []byte("\nLOGEND 333\nAnother one"), - ExpectedTokenized: []string{ - "", - "LOGEND 333", + internal.TokenizerTestCase{Name: "LogsWithLogStartingWithWhiteChars", + Raw: []byte("\nLOGEND 333\nAnother one"), + ExpectedTokenized: []string{ + "", + "LOGEND 333", + }, }, + nil, }, { - Name: "PreserveLeadingWhitespaces", - Raw: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokenized: []string{ - "", - " LOGEND 333", + internal.TokenizerTestCase{Name: "PreserveLeadingWhitespaces", + Raw: []byte("\n LOGEND 333 \nAnother one "), + ExpectedTokenized: []string{ + "", + " LOGEND 333", + }, + PreserveLeadingWhitespaces: true, }, - PreserveLeadingWhitespaces: true, + nil, }, { - Name: "PreserveTrailingWhitespaces", - Raw: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokenized: []string{ - "", - "LOGEND 333 ", + internal.TokenizerTestCase{Name: "PreserveTrailingWhitespaces", + Raw: []byte("\n LOGEND 333 \nAnother one "), + ExpectedTokenized: []string{ + "", + "LOGEND 333 ", + }, + PreserveTrailingWhitespaces: true, }, - PreserveTrailingWhitespaces: true, + nil, }, { - Name: "PreserveBothLeadingAndTrailingWhitespaces", - Raw: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokenized: []string{ - "", - " LOGEND 333 ", + internal.TokenizerTestCase{Name: "PreserveBothLeadingAndTrailingWhitespaces", + Raw: []byte("\n LOGEND 333 \nAnother one "), + ExpectedTokenized: []string{ + "", + " LOGEND 333 ", + }, + PreserveLeadingWhitespaces: true, + PreserveTrailingWhitespaces: true, }, - PreserveLeadingWhitespaces: true, - PreserveTrailingWhitespaces: true, + nil, }, } @@ -653,16 +669,16 @@ func TestNoSplitFunc(t *testing.T) { { Name: "HugeLog100", Raw: func() []byte { - return generatedByteSliceOfLength(largeLogSize) + return internal.GeneratedByteSliceOfLength(largeLogSize) }(), ExpectedTokenized: [][]byte{ - generatedByteSliceOfLength(100), + internal.GeneratedByteSliceOfLength(100), }, }, { Name: "HugeLog300", Raw: func() []byte { - return generatedByteSliceOfLength(largeLogSize * 3) + return internal.GeneratedByteSliceOfLength(largeLogSize * 3) }(), ExpectedTokenized: [][]byte{ []byte("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv"), @@ -673,7 +689,7 @@ func TestNoSplitFunc(t *testing.T) { { Name: "EOFBeforeMaxLogSize", Raw: func() []byte { - return generatedByteSliceOfLength(largeLogSize * 3.5) + return internal.GeneratedByteSliceOfLength(largeLogSize * 3.5) }(), ExpectedTokenized: [][]byte{ []byte("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv"), @@ -783,12 +799,3 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { }) } } - -func generatedByteSliceOfLength(length int) []byte { - chars := []byte(`abcdefghijklmnopqrstuvwxyz`) - newSlice := make([]byte, length) - for i := 0; i < length; i++ { - newSlice[i] = chars[i%len(chars)] - } - return newSlice -} diff --git a/pkg/stanza/operator/input/syslog/config_test.go b/pkg/stanza/operator/input/syslog/config_test.go index bbb9daa0500e..f1564481b651 100644 --- a/pkg/stanza/operator/input/syslog/config_test.go +++ b/pkg/stanza/operator/input/syslog/config_test.go @@ -32,6 +32,7 @@ func TestUnmarshal(t *testing.T) { cfg := NewConfig() cfg.Protocol = "rfc3164" cfg.Location = "foo" + cfg.EnableOctetCounting = true cfg.TCP = &tcp.NewConfig().BaseConfig cfg.TCP.MaxLogSize = 1000000 cfg.TCP.ListenAddress = "10.0.0.1:9000" diff --git a/pkg/stanza/operator/input/syslog/syslog.go b/pkg/stanza/operator/input/syslog/syslog.go index 71329af7dc9e..6c43ce0bec67 100644 --- a/pkg/stanza/operator/input/syslog/syslog.go +++ b/pkg/stanza/operator/input/syslog/syslog.go @@ -4,8 +4,11 @@ package syslog // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/syslog" import ( + "bufio" "errors" "fmt" + "regexp" + "strconv" "go.uber.org/zap" @@ -58,8 +61,11 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { if c.TCP != nil { tcpInputCfg := tcp.NewConfigWithID(inputBase.ID() + "_internal_tcp") - tcpInputCfg.BaseConfig = *c.TCP + if syslogParserCfg.EnableOctetCounting { + tcpInputCfg.MultiLineBuilder = OctetMultiLineBuilder + } + tcpInputCfg.BaseConfig = *c.TCP tcpInput, err := tcpInputCfg.Build(logger) if err != nil { return nil, fmt.Errorf("failed to resolve tcp config: %w", err) @@ -136,3 +142,41 @@ func (t *Input) SetOutputs(operators []operator.Operator) error { t.parser.SetOutputIDs(t.GetOutputIDs()) return t.parser.SetOutputs(operators) } + +func OctetMultiLineBuilder(_ helper.Encoding) (bufio.SplitFunc, error) { + return newOctetFrameSplitFunc(true), nil +} + +func newOctetFrameSplitFunc(flushAtEOF bool) bufio.SplitFunc { + frameRegex := regexp.MustCompile(`^[1-9]\d*\s`) + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + frameLoc := frameRegex.FindIndex(data) + if frameLoc == nil { + // Flush if no more data is expected + if len(data) != 0 && atEOF && flushAtEOF { + token = data + advance = len(data) + return + } + return 0, nil, nil + } + + frameMaxIndex := frameLoc[1] + // delimit space between length and log + frameLenValue, err := strconv.Atoi(string(data[:frameMaxIndex-1])) + if err != nil { + return 0, nil, err // read more data and try again. + } + + advance = frameMaxIndex + frameLenValue + // the limitation here is that we can only line split within a single buffer + // the context of buffer length cannot be pass onto the next scan + capacity := cap(data) + if advance > capacity { + return capacity, data, nil + } + token = data[:advance] + err = nil + return + } +} diff --git a/pkg/stanza/operator/input/syslog/syslog_test.go b/pkg/stanza/operator/input/syslog/syslog_test.go index ada6d051c903..1315d3dc1f10 100644 --- a/pkg/stanza/operator/input/syslog/syslog_test.go +++ b/pkg/stanza/operator/input/syslog/syslog_test.go @@ -12,8 +12,10 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -137,3 +139,77 @@ func NewConfigWithUDP(syslogCfg *syslog.BaseConfig) *Config { cfg.OutputIDs = []string{"fake"} return cfg } + +func TestOctetFramingSplitFunc(t *testing.T) { + testCases := []internal.TokenizerTestCase{ + { + Name: "OneLogSimple", + Raw: []byte(`17 my log LOGEND 123`), + ExpectedTokenized: []string{ + `17 my log LOGEND 123`, + }, + }, + { + Name: "TwoLogsSimple", + Raw: []byte(`17 my log LOGEND 12317 my log LOGEND 123`), + ExpectedTokenized: []string{ + `17 my log LOGEND 123`, + `17 my log LOGEND 123`, + }, + }, + { + Name: "NoMatches", + Raw: []byte(`no matches in it`), + ExpectedTokenized: []string{ + `no matches in it`, + }, + }, + { + Name: "NonMatchesAfter", + Raw: []byte(`17 my log LOGEND 123my log LOGEND 12317 my log LOGEND 123`), + ExpectedTokenized: []string{ + `17 my log LOGEND 123`, + `my log LOGEND 12317 my log LOGEND 123`, + }, + }, + { + Name: "HugeLog100", + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(100) + newRaw = append([]byte(`100 `), newRaw...) + return newRaw + }(), + ExpectedTokenized: []string{ + `100 ` + string(internal.GeneratedByteSliceOfLength(100)), + }, + }, + { + Name: "maxCapacity", + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(4091) + newRaw = append([]byte(`4091 `), newRaw...) + return newRaw + }(), + ExpectedTokenized: []string{ + `4091 ` + string(internal.GeneratedByteSliceOfLength(4091)), + }, + }, + { + Name: "over capacity", + Raw: func() []byte { + newRaw := internal.GeneratedByteSliceOfLength(4092) + newRaw = append([]byte(`5000 `), newRaw...) + return newRaw + }(), + ExpectedTokenized: []string{ + `5000 ` + string(internal.GeneratedByteSliceOfLength(4091)), + `j`, + }, + }, + } + for _, tc := range testCases { + splitFunc, err := OctetMultiLineBuilder(helper.Encoding{}) + require.NoError(t, err) + t.Run(tc.Name, tc.RunFunc(splitFunc)) + } +} diff --git a/pkg/stanza/operator/input/syslog/testdata/config.yaml b/pkg/stanza/operator/input/syslog/testdata/config.yaml index b68a6ca4c031..59e40b5ecfd5 100644 --- a/pkg/stanza/operator/input/syslog/testdata/config.yaml +++ b/pkg/stanza/operator/input/syslog/testdata/config.yaml @@ -4,6 +4,7 @@ tcp: type: syslog_input protocol: rfc3164 location: foo + enable_octet_counting: true tcp: listen_address: 10.0.0.1:9000 max_log_size: 1MB diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 579c8ad051bc..e15786c186cd 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -70,6 +70,17 @@ type BaseConfig struct { Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"` PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` + MultiLineBuilder MultiLineBuilderFunc +} + +type MultiLineBuilderFunc func(encoding helper.Encoding) (bufio.SplitFunc, error) + +func (c Config) defaultMultilineBuilder(encoding helper.Encoding) (bufio.SplitFunc, error) { + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize)) + if err != nil { + return nil, err + } + return splitFunc, nil } // Build will build a tcp input operator. @@ -102,8 +113,12 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, err } + if c.MultiLineBuilder == nil { + c.MultiLineBuilder = c.defaultMultilineBuilder + } + // Build multiline - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize)) + splitFunc, err := c.MultiLineBuilder(encoding) if err != nil { return nil, err } diff --git a/pkg/stanza/operator/internal/test_common.go b/pkg/stanza/operator/internal/test_common.go new file mode 100644 index 000000000000..39210f43aef7 --- /dev/null +++ b/pkg/stanza/operator/internal/test_common.go @@ -0,0 +1,119 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/internal" + +import ( + "bufio" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// state is going to keep processing state of the TestReader +type state struct { + ReadFrom int + Processed int +} + +// TestReader is a TestReader which keeps state of readed and processed data +type TestReader struct { + State *state + Data []byte +} + +// NewTestReader creates TestReader with empty state +func NewTestReader(data []byte) TestReader { + return TestReader{ + State: &state{ + ReadFrom: 0, + Processed: 0, + }, + Data: data, + } +} + +// Read reads data from TestReader and remebers where reading has been finished +func (r TestReader) Read(p []byte) (n int, err error) { + // return eof if data has been fully readed + if len(r.Data)-r.State.ReadFrom == 0 { + return 0, io.EOF + } + + // iterate over data char by char and write into p + // until p is full or no more data left to read + i := 0 + for ; i < len(r.Data)-r.State.ReadFrom; i++ { + if i == len(p) { + break + } + p[i] = r.Data[r.State.ReadFrom+i] + } + + // update state + r.State.ReadFrom += i + return i, nil +} + +// Reset resets TestReader state (sets last readed position to last processed position) +func (r *TestReader) Reset() { + r.State.ReadFrom = r.State.Processed +} + +func (r *TestReader) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + r.State.Processed += advance + return + } +} + +type TokenizerTestCase struct { + Name string + Pattern string + Raw []byte + ExpectedTokenized []string + ExpectedError error + Sleep time.Duration + AdditionalIterations int + PreserveLeadingWhitespaces bool + PreserveTrailingWhitespaces bool +} + +func (tc TokenizerTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { + reader := NewTestReader(tc.Raw) + + return func(t *testing.T) { + var tokenized []string + for i := 0; i < 1+tc.AdditionalIterations; i++ { + // sleep before next iterations + if i > 0 { + time.Sleep(tc.Sleep) + } + reader.Reset() + scanner := bufio.NewScanner(reader) + scanner.Split(reader.SplitFunc(splitFunc)) + for { + ok := scanner.Scan() + if !ok { + assert.Equal(t, tc.ExpectedError, scanner.Err()) + break + } + tokenized = append(tokenized, scanner.Text()) + } + } + + assert.Equal(t, tc.ExpectedTokenized, tokenized) + } +} + +func GeneratedByteSliceOfLength(length int) []byte { + chars := []byte(`abcdefghijklmnopqrstuvwxyz`) + newSlice := make([]byte, length) + for i := 0; i < length; i++ { + newSlice[i] = chars[i%len(chars)] + } + return newSlice +}