Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add support for parsing multiline csv records #425

Merged
merged 5 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions operator/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,56 @@ func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) p
reader.Comma = fieldDelimiter
reader.FieldsPerRecord = len(headers)
reader.LazyQuotes = lazyQuotes
parsedValues := make(map[string]interface{})

// Typically only need one
lines := make([][]string, 0, 1)
for {
body, err := reader.Read()
line, err := reader.Read()
if err == io.EOF {
break
}

if err != nil {
return nil, err
if err != nil && len(line) == 0 {
return nil, errors.New("failed to parse entry")
}

for i, key := range headers {
parsedValues[key] = body[i]
lines = append(lines, line)
}

/*
This parser is parsing a single value, which came from a single log entry.
Therefore, if there are multiple lines here, it should be assumed that each
subsequent line contains a continuation of the last field in the previous line.

Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee",
expect reader.Read() to return bodies:
- ["aa","b"]
- ["b","cc","d"]
- ["d","ee"]
*/

joinedLine := lines[0]
for i := 1; i < len(lines); i++ {
nextLine := lines[i]

// The first element of the next line is a continuation of the previous line's last element
joinedLine[len(joinedLine)-1] += "\n" + nextLine[0]

// The remainder are separate elements
for n := 1; n < len(nextLine); n++ {
joinedLine = append(joinedLine, nextLine[n])
}
}

parsedValues := make(map[string]interface{})

if len(joinedLine) != len(headers) {
return nil, errors.New("wrong number of fields")
}

for i, val := range joinedLine {
parsedValues[headers[i]] = val
}
return parsedValues, nil
}
}
247 changes: 226 additions & 21 deletions operator/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func TestCSVParserByteFailure(t *testing.T) {
parser := newTestParser(t)
_, err := parser.parse([]byte("invalid"))
require.Error(t, err)
require.Contains(t, err.Error(), "record on line 1: wrong number of fields")
require.Contains(t, err.Error(), "wrong number of fields")
}

func TestCSVParserStringFailure(t *testing.T) {
parser := newTestParser(t)
_, err := parser.parse("invalid")
require.Error(t, err)
require.Contains(t, err.Error(), "record on line 1: wrong number of fields")
require.Contains(t, err.Error(), "wrong number of fields")
}

func TestCSVParserInvalidType(t *testing.T) {
Expand Down Expand Up @@ -572,29 +572,234 @@ func TestParserCSV(t *testing.T) {
}
}

func TestParserCSVMultipleBodies(t *testing.T) {
t.Run("basic", func(t *testing.T) {
cfg := NewCSVParserConfig("test")
cfg.OutputIDs = []string{"fake"}
cfg.Header = testHeader
func TestParserCSVMultiline(t *testing.T) {
cases := []struct {
name string
input string
expected map[string]interface{}
}{
{
"no_newlines",
"aaaa,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"first_field",
"aa\naa,bbbb,cccc,dddd,eeee",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test with no newlines as well, and several lines? I would like to make sure existing parsing still works

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a basic case to this set.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test with no newlines as well, and several lines? I would like to make sure existing parsing still works

Copy link

@atoulme atoulme Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test with escaped double quotes? Say

"foo","bar","""foobar
is the new foo-bar""","another entry"```

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some more test cases mixing quotes and returns.

map[string]interface{}{
"A": "aa\naa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"middle_field",
"aaaa,bbbb,cc\ncc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cc\ncc",
"D": "dddd",
"E": "eeee",
},
},
{
"last_field",
"aaaa,bbbb,cccc,dddd,e\neee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "e\neee",
},
},
{
"multiple_fields",
"aaaa,bb\nbb,ccc\nc,dddd,e\neee",
map[string]interface{}{
"A": "aaaa",
"B": "bb\nbb",
"C": "ccc\nc",
"D": "dddd",
"E": "e\neee",
},
},
{
"multiple_first_field",
"a\na\na\na,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "a\na\na\na",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"multiple_middle_field",
"aaaa,bbbb,c\nc\nc\nc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "c\nc\nc\nc",
"D": "dddd",
"E": "eeee",
},
},
{
"multiple_last_field",
"aaaa,bbbb,cccc,dddd,e\ne\ne\ne",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "e\ne\ne\ne",
},
},
{
"leading_newline",
"\naaaa,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"trailing_newline",
"aaaa,bbbb,cccc,dddd,eeee\n",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"leading_newline_field",
"aaaa,\nbbbb,\ncccc,\ndddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "\nbbbb",
"C": "\ncccc",
"D": "\ndddd",
"E": "eeee",
},
},
{
"trailing_newline_field",
"aaaa,bbbb\n,cccc\n,dddd\n,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb\n",
"C": "cccc\n",
"D": "dddd\n",
"E": "eeee",
},
},
{
"empty_lines_unquoted",
"aa\n\naa,bbbb,c\n\nccc,dddd,eee\n\ne",
map[string]interface{}{
"A": "aa\naa",
"B": "bbbb",
"C": "c\nccc",
"D": "dddd",
"E": "eee\ne",
},
},
{
"empty_lines_quoted",
"\"aa\n\naa\",bbbb,\"c\n\nccc\",dddd,\"eee\n\ne\"",
map[string]interface{}{
"A": "aa\n\naa",
"B": "bbbb",
"C": "c\n\nccc",
"D": "dddd",
"E": "eee\n\ne",
},
},
{
"everything",
"\n\na\na\n\naa,\n\nbb\nbb\n\n,\"cc\ncc\n\n\",\ndddd\n,eeee\n\n",
map[string]interface{}{
"A": "a\na\naa",
"B": "\nbb\nbb\n",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this value end with two \n?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, not in this case, but I added a case where the empty line is preserved. This is actually covered in RFC 4180 Fields containing line breaks (CRLF), double quotes, and commas should be enclosed in double-quotes

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this value end with two \n?

"C": "cc\ncc\n\n",
"D": "\ndddd\n",
"E": "eeee",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, should have two \n at the end of the value

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, should have two \n at the end of the value

},
},
{
"literal_return",
`aaaa,bb
bb,cccc,dd
dd,eeee`,
map[string]interface{}{
"A": "aaaa",
"B": "bb\nbb",
"C": "cccc",
"D": "dd\ndd",
"E": "eeee",
},
},
{
"return_in_quotes",
"aaaa,\"bbbb\",\"cc\ncc\",dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cc\ncc",
"D": "dddd",
"E": "eeee",
},
},
{
"return_in_double_quotes",
`aaaa,"""bbbb""","""cc
cc""",dddd,eeee`,
map[string]interface{}{
"A": "aaaa",
"B": "\"bbbb\"",
"C": "\"cc\ncc\"",
"D": "dddd",
"E": "eeee",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfg := NewCSVParserConfig("test")
cfg.OutputIDs = []string{"fake"}
cfg.Header = "A,B,C,D,E"

op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

fake := testutil.NewFakeOutput(t)
op.SetOutputs([]operator.Operator{fake})
fake := testutil.NewFakeOutput(t)
op.SetOutputs([]operator.Operator{fake})

entry := entry.New()
entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent"
err = op.Process(context.Background(), entry)
require.NoError(t, err)
fake.ExpectBody(t, map[string]interface{}{
"name": "stanza",
"sev": "DEBUG",
"msg": "started agent",
entry := entry.New()
entry.Body = tc.input
err = op.Process(context.Background(), entry)
require.NoError(t, err)
fake.ExpectBody(t, tc.expected)
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
}
}

func TestParserCSVInvalidJSONInput(t *testing.T) {
Expand Down