Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Add support for parsing multiline csv records (#425)
Browse files Browse the repository at this point in the history
* Add support for parsing multiline csv records

The csv parser previously was unable to properly handle multiline values.
This change adds support. Newlines within csv records are preserved.
  • Loading branch information
djaglowski authored Mar 10, 2022
1 parent cc7421b commit a523052
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 27 deletions.
45 changes: 39 additions & 6 deletions operator/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,56 @@ func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) p
reader.Comma = fieldDelimiter
reader.FieldsPerRecord = len(headers)
reader.LazyQuotes = lazyQuotes
parsedValues := make(map[string]interface{})

// Typically only need one
lines := make([][]string, 0, 1)
for {
body, err := reader.Read()
line, err := reader.Read()
if err == io.EOF {
break
}

if err != nil {
return nil, err
if err != nil && len(line) == 0 {
return nil, errors.New("failed to parse entry")
}

for i, key := range headers {
parsedValues[key] = body[i]
lines = append(lines, line)
}

/*
This parser is parsing a single value, which came from a single log entry.
Therefore, if there are multiple lines here, it should be assumed that each
subsequent line contains a continuation of the last field in the previous line.
Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee",
expect reader.Read() to return bodies:
- ["aa","b"]
- ["b","cc","d"]
- ["d","ee"]
*/

joinedLine := lines[0]
for i := 1; i < len(lines); i++ {
nextLine := lines[i]

// The first element of the next line is a continuation of the previous line's last element
joinedLine[len(joinedLine)-1] += "\n" + nextLine[0]

// The remainder are separate elements
for n := 1; n < len(nextLine); n++ {
joinedLine = append(joinedLine, nextLine[n])
}
}

parsedValues := make(map[string]interface{})

if len(joinedLine) != len(headers) {
return nil, errors.New("wrong number of fields")
}

for i, val := range joinedLine {
parsedValues[headers[i]] = val
}
return parsedValues, nil
}
}
247 changes: 226 additions & 21 deletions operator/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func TestCSVParserByteFailure(t *testing.T) {
parser := newTestParser(t)
_, err := parser.parse([]byte("invalid"))
require.Error(t, err)
require.Contains(t, err.Error(), "record on line 1: wrong number of fields")
require.Contains(t, err.Error(), "wrong number of fields")
}

func TestCSVParserStringFailure(t *testing.T) {
parser := newTestParser(t)
_, err := parser.parse("invalid")
require.Error(t, err)
require.Contains(t, err.Error(), "record on line 1: wrong number of fields")
require.Contains(t, err.Error(), "wrong number of fields")
}

func TestCSVParserInvalidType(t *testing.T) {
Expand Down Expand Up @@ -572,29 +572,234 @@ func TestParserCSV(t *testing.T) {
}
}

func TestParserCSVMultipleBodies(t *testing.T) {
t.Run("basic", func(t *testing.T) {
cfg := NewCSVParserConfig("test")
cfg.OutputIDs = []string{"fake"}
cfg.Header = testHeader
func TestParserCSVMultiline(t *testing.T) {
cases := []struct {
name string
input string
expected map[string]interface{}
}{
{
"no_newlines",
"aaaa,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"first_field",
"aa\naa,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "aa\naa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"middle_field",
"aaaa,bbbb,cc\ncc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cc\ncc",
"D": "dddd",
"E": "eeee",
},
},
{
"last_field",
"aaaa,bbbb,cccc,dddd,e\neee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "e\neee",
},
},
{
"multiple_fields",
"aaaa,bb\nbb,ccc\nc,dddd,e\neee",
map[string]interface{}{
"A": "aaaa",
"B": "bb\nbb",
"C": "ccc\nc",
"D": "dddd",
"E": "e\neee",
},
},
{
"multiple_first_field",
"a\na\na\na,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "a\na\na\na",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"multiple_middle_field",
"aaaa,bbbb,c\nc\nc\nc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "c\nc\nc\nc",
"D": "dddd",
"E": "eeee",
},
},
{
"multiple_last_field",
"aaaa,bbbb,cccc,dddd,e\ne\ne\ne",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "e\ne\ne\ne",
},
},
{
"leading_newline",
"\naaaa,bbbb,cccc,dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"trailing_newline",
"aaaa,bbbb,cccc,dddd,eeee\n",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cccc",
"D": "dddd",
"E": "eeee",
},
},
{
"leading_newline_field",
"aaaa,\nbbbb,\ncccc,\ndddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "\nbbbb",
"C": "\ncccc",
"D": "\ndddd",
"E": "eeee",
},
},
{
"trailing_newline_field",
"aaaa,bbbb\n,cccc\n,dddd\n,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb\n",
"C": "cccc\n",
"D": "dddd\n",
"E": "eeee",
},
},
{
"empty_lines_unquoted",
"aa\n\naa,bbbb,c\n\nccc,dddd,eee\n\ne",
map[string]interface{}{
"A": "aa\naa",
"B": "bbbb",
"C": "c\nccc",
"D": "dddd",
"E": "eee\ne",
},
},
{
"empty_lines_quoted",
"\"aa\n\naa\",bbbb,\"c\n\nccc\",dddd,\"eee\n\ne\"",
map[string]interface{}{
"A": "aa\n\naa",
"B": "bbbb",
"C": "c\n\nccc",
"D": "dddd",
"E": "eee\n\ne",
},
},
{
"everything",
"\n\na\na\n\naa,\n\nbb\nbb\n\n,\"cc\ncc\n\n\",\ndddd\n,eeee\n\n",
map[string]interface{}{
"A": "a\na\naa",
"B": "\nbb\nbb\n",
"C": "cc\ncc\n\n",
"D": "\ndddd\n",
"E": "eeee",
},
},
{
"literal_return",
`aaaa,bb
bb,cccc,dd
dd,eeee`,
map[string]interface{}{
"A": "aaaa",
"B": "bb\nbb",
"C": "cccc",
"D": "dd\ndd",
"E": "eeee",
},
},
{
"return_in_quotes",
"aaaa,\"bbbb\",\"cc\ncc\",dddd,eeee",
map[string]interface{}{
"A": "aaaa",
"B": "bbbb",
"C": "cc\ncc",
"D": "dddd",
"E": "eeee",
},
},
{
"return_in_double_quotes",
`aaaa,"""bbbb""","""cc
cc""",dddd,eeee`,
map[string]interface{}{
"A": "aaaa",
"B": "\"bbbb\"",
"C": "\"cc\ncc\"",
"D": "dddd",
"E": "eeee",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfg := NewCSVParserConfig("test")
cfg.OutputIDs = []string{"fake"}
cfg.Header = "A,B,C,D,E"

op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

fake := testutil.NewFakeOutput(t)
op.SetOutputs([]operator.Operator{fake})
fake := testutil.NewFakeOutput(t)
op.SetOutputs([]operator.Operator{fake})

entry := entry.New()
entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent"
err = op.Process(context.Background(), entry)
require.NoError(t, err)
fake.ExpectBody(t, map[string]interface{}{
"name": "stanza",
"sev": "DEBUG",
"msg": "started agent",
entry := entry.New()
entry.Body = tc.input
err = op.Process(context.Background(), entry)
require.NoError(t, err)
fake.ExpectBody(t, tc.expected)
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
}
}

func TestParserCSVInvalidJSONInput(t *testing.T) {
Expand Down

0 comments on commit a523052

Please sign in to comment.