diff --git a/operator/parser/csv/csv.go b/operator/parser/csv/csv.go index c4915758..de0e3498 100644 --- a/operator/parser/csv/csv.go +++ b/operator/parser/csv/csv.go @@ -145,23 +145,56 @@ func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) p reader.Comma = fieldDelimiter reader.FieldsPerRecord = len(headers) reader.LazyQuotes = lazyQuotes - parsedValues := make(map[string]interface{}) + // Typically only need one + lines := make([][]string, 0, 1) for { - body, err := reader.Read() + line, err := reader.Read() if err == io.EOF { break } - if err != nil { - return nil, err + if err != nil && len(line) == 0 { + return nil, errors.New("failed to parse entry") } - for i, key := range headers { - parsedValues[key] = body[i] + lines = append(lines, line) + } + + /* + This parser is parsing a single value, which came from a single log entry. + Therefore, if there are multiple lines here, it should be assumed that each + subsequent line contains a continuation of the last field in the previous line. + + Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", + expect reader.Read() to return bodies: + - ["aa","b"] + - ["b","cc","d"] + - ["d","ee"] + */ + + joinedLine := lines[0] + for i := 1; i < len(lines); i++ { + nextLine := lines[i] + + // The first element of the next line is a continuation of the previous line's last element + joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] + + // The remainder are separate elements + for n := 1; n < len(nextLine); n++ { + joinedLine = append(joinedLine, nextLine[n]) } } + parsedValues := make(map[string]interface{}) + + if len(joinedLine) != len(headers) { + return nil, errors.New("wrong number of fields") + } + + for i, val := range joinedLine { + parsedValues[headers[i]] = val + } return parsedValues, nil } } diff --git a/operator/parser/csv/csv_test.go b/operator/parser/csv/csv_test.go index 48fcd364..d08502e0 100644 --- a/operator/parser/csv/csv_test.go +++ b/operator/parser/csv/csv_test.go @@ -65,14 +65,14 @@ func TestCSVParserByteFailure(t *testing.T) { parser := newTestParser(t) _, err := parser.parse([]byte("invalid")) require.Error(t, err) - require.Contains(t, err.Error(), "record on line 1: wrong number of fields") + require.Contains(t, err.Error(), "wrong number of fields") } func TestCSVParserStringFailure(t *testing.T) { parser := newTestParser(t) _, err := parser.parse("invalid") require.Error(t, err) - require.Contains(t, err.Error(), "record on line 1: wrong number of fields") + require.Contains(t, err.Error(), "wrong number of fields") } func TestCSVParserInvalidType(t *testing.T) { @@ -572,29 +572,234 @@ func TestParserCSV(t *testing.T) { } } -func TestParserCSVMultipleBodies(t *testing.T) { - t.Run("basic", func(t *testing.T) { - cfg := NewCSVParserConfig("test") - cfg.OutputIDs = []string{"fake"} - cfg.Header = testHeader +func TestParserCSVMultiline(t *testing.T) { + cases := []struct { + name string + input string + expected map[string]interface{} + }{ + { + "no_newlines", + "aaaa,bbbb,cccc,dddd,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "first_field", + "aa\naa,bbbb,cccc,dddd,eeee", + map[string]interface{}{ + "A": "aa\naa", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "middle_field", + "aaaa,bbbb,cc\ncc,dddd,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cc\ncc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "last_field", + "aaaa,bbbb,cccc,dddd,e\neee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "e\neee", + }, + }, + { + "multiple_fields", + "aaaa,bb\nbb,ccc\nc,dddd,e\neee", + map[string]interface{}{ + "A": "aaaa", + "B": "bb\nbb", + "C": "ccc\nc", + "D": "dddd", + "E": "e\neee", + }, + }, + { + "multiple_first_field", + "a\na\na\na,bbbb,cccc,dddd,eeee", + map[string]interface{}{ + "A": "a\na\na\na", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "multiple_middle_field", + "aaaa,bbbb,c\nc\nc\nc,dddd,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "c\nc\nc\nc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "multiple_last_field", + "aaaa,bbbb,cccc,dddd,e\ne\ne\ne", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "e\ne\ne\ne", + }, + }, + { + "leading_newline", + "\naaaa,bbbb,cccc,dddd,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "trailing_newline", + "aaaa,bbbb,cccc,dddd,eeee\n", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cccc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "leading_newline_field", + "aaaa,\nbbbb,\ncccc,\ndddd,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "\nbbbb", + "C": "\ncccc", + "D": "\ndddd", + "E": "eeee", + }, + }, + { + "trailing_newline_field", + "aaaa,bbbb\n,cccc\n,dddd\n,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb\n", + "C": "cccc\n", + "D": "dddd\n", + "E": "eeee", + }, + }, + { + "empty_lines_unquoted", + "aa\n\naa,bbbb,c\n\nccc,dddd,eee\n\ne", + map[string]interface{}{ + "A": "aa\naa", + "B": "bbbb", + "C": "c\nccc", + "D": "dddd", + "E": "eee\ne", + }, + }, + { + "empty_lines_quoted", + "\"aa\n\naa\",bbbb,\"c\n\nccc\",dddd,\"eee\n\ne\"", + map[string]interface{}{ + "A": "aa\n\naa", + "B": "bbbb", + "C": "c\n\nccc", + "D": "dddd", + "E": "eee\n\ne", + }, + }, + { + "everything", + "\n\na\na\n\naa,\n\nbb\nbb\n\n,\"cc\ncc\n\n\",\ndddd\n,eeee\n\n", + map[string]interface{}{ + "A": "a\na\naa", + "B": "\nbb\nbb\n", + "C": "cc\ncc\n\n", + "D": "\ndddd\n", + "E": "eeee", + }, + }, + { + "literal_return", + `aaaa,bb +bb,cccc,dd +dd,eeee`, + map[string]interface{}{ + "A": "aaaa", + "B": "bb\nbb", + "C": "cccc", + "D": "dd\ndd", + "E": "eeee", + }, + }, + { + "return_in_quotes", + "aaaa,\"bbbb\",\"cc\ncc\",dddd,eeee", + map[string]interface{}{ + "A": "aaaa", + "B": "bbbb", + "C": "cc\ncc", + "D": "dddd", + "E": "eeee", + }, + }, + { + "return_in_double_quotes", + `aaaa,"""bbbb""","""cc +cc""",dddd,eeee`, + map[string]interface{}{ + "A": "aaaa", + "B": "\"bbbb\"", + "C": "\"cc\ncc\"", + "D": "dddd", + "E": "eeee", + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cfg := NewCSVParserConfig("test") + cfg.OutputIDs = []string{"fake"} + cfg.Header = "A,B,C,D,E" - op, err := cfg.Build(testutil.Logger(t)) - require.NoError(t, err) + op, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) - fake := testutil.NewFakeOutput(t) - op.SetOutputs([]operator.Operator{fake}) + fake := testutil.NewFakeOutput(t) + op.SetOutputs([]operator.Operator{fake}) - entry := entry.New() - entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent" - err = op.Process(context.Background(), entry) - require.NoError(t, err) - fake.ExpectBody(t, map[string]interface{}{ - "name": "stanza", - "sev": "DEBUG", - "msg": "started agent", + entry := entry.New() + entry.Body = tc.input + err = op.Process(context.Background(), entry) + require.NoError(t, err) + fake.ExpectBody(t, tc.expected) + fake.ExpectNoEntry(t, 100*time.Millisecond) }) - fake.ExpectNoEntry(t, 100*time.Millisecond) - }) + } } func TestParserCSVInvalidJSONInput(t *testing.T) {