From aec44079a59960dfb6d06a6ee0da88484be873fb Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 5 Feb 2024 23:18:46 -0500 Subject: [PATCH 01/34] add parse CSV function and tests --- pkg/ottl/ottlfuncs/func_parse_csv.go | 212 ++++++++++ pkg/ottl/ottlfuncs/func_parse_csv_test.go | 447 ++++++++++++++++++++++ pkg/ottl/ottlfuncs/functions.go | 1 + 3 files changed, 660 insertions(+) create mode 100644 pkg/ottl/ottlfuncs/func_parse_csv.go create mode 100644 pkg/ottl/ottlfuncs/func_parse_csv_test.go diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go new file mode 100644 index 000000000000..901dc0d593ad --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -0,0 +1,212 @@ +package ottlfuncs + +import ( + "context" + "encoding/csv" + "errors" + "fmt" + "io" + "strings" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +const ( + parseCSVModeStrict = "strict" + parseCSVModeLazyQuotes = "lazyQuotes" + parseCSVModeIgnoreQuotes = "ignoreQuotes" +) + +const ( + parseCSVDefaultDelimiter = ',' + parseCSVDefaultMode = parseCSVModeStrict +) + +type ParseCSVArguments[K any] struct { + Target ottl.StringGetter[K] + Header ottl.StringGetter[K] + Delimiter ottl.Optional[string] + HeaderDelimiter ottl.Optional[string] + Mode ottl.Optional[string] +} + +func (p ParseCSVArguments[K]) validate() error { + if !p.Delimiter.IsEmpty() { + if len([]rune(p.Delimiter.Get())) != 1 { + return errors.New("delimiter must be a single character") + } + } + + if !p.HeaderDelimiter.IsEmpty() { + if len([]rune(p.HeaderDelimiter.Get())) != 1 { + return errors.New("header_delimiter must be a single character") + } + } + + return nil +} + +func NewParseCSVFactory[K any]() ottl.Factory[K] { + return ottl.NewFactory("ParseCSV", &ParseCSVArguments[K]{}, createParseCSVFunction[K]) +} + +func createParseCSVFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) { + args, ok := oArgs.(*ParseCSVArguments[K]) + if !ok { + return nil, fmt.Errorf("ParseCSVFactory args must be of type *ParseCSVArguments[K]") + } + + if err := args.validate(); err != nil { + return nil, fmt.Errorf("invalid arguments: %w", err) + } + + delimiter := parseCSVDefaultDelimiter + if !args.Delimiter.IsEmpty() { + delimiter = []rune(args.Delimiter.Get())[0] + } + + // headerDelimiter defaults to the chosen delimter, + // since in most cases headerDelimiter == delmiter. + headerDelimiter := delimiter + if !args.HeaderDelimiter.IsEmpty() { + headerDelimiter = []rune(args.HeaderDelimiter.Get())[0] + } + + mode := parseCSVDefaultMode + if !args.Mode.IsEmpty() { + mode = args.Mode.Get() + } + + switch mode { + case parseCSVModeStrict: + return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, false), nil + case parseCSVModeLazyQuotes: + return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, true), nil + case parseCSVModeIgnoreQuotes: + return parseCSVIgnoreQuotes(args.Target, args.Header, delimiter, headerDelimiter), nil + } + + return nil, fmt.Errorf("unknown mode: %s", mode) +} + +func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelimiter rune, lazyQuotes bool) ottl.ExprFunc[K] { + headerDelimiterString := string([]rune{headerDelimiter}) + + return func(ctx context.Context, tCtx K) (any, error) { + targetStr, err := target.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("get target: %w", err) + } + + headerStr, err := header.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("get header: %w", err) + } + + headers := strings.Split(headerStr, headerDelimiterString) + + csvReader := csv.NewReader(strings.NewReader(targetStr)) + csvReader.Comma = delimiter + csvReader.FieldsPerRecord = len(headers) + csvReader.LazyQuotes = lazyQuotes + + fields, err := csvReadLine(csvReader) + if err != nil { + return nil, fmt.Errorf("read csv line: %w", err) + } + + return csvHeadersMap(headers, fields) + } +} + +func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter, headerDelimiter rune) ottl.ExprFunc[K] { + headerDelimiterString := string([]rune{headerDelimiter}) + delimiterString := string([]rune{delimiter}) + + return func(ctx context.Context, tCtx K) (any, error) { + targetStr, err := target.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("get target: %w", err) + } + + headerStr, err := header.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("get header: %w", err) + } + + headers := strings.Split(headerStr, headerDelimiterString) + + // Ignoring quotes makes CSV parseable with just string.Split + fields := strings.Split(targetStr, delimiterString) + return csvHeadersMap(headers, fields) + } +} + +// csvHeadersMap creates a map of headers[i] -> fields[i]. +func csvHeadersMap(headers []string, fields []string) (pcommon.Map, error) { + pMap := pcommon.NewMap() + parsedValues := make(map[string]any) + + if len(fields) != len(headers) { + return pMap, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) + } + + for i, val := range fields { + parsedValues[headers[i]] = val + } + + err := pMap.FromRaw(parsedValues) + if err != nil { + return pMap, fmt.Errorf("create pcommon.Map: %w", err) + } + + return pMap, nil +} + +// csvReadLine reads a CSV line from the csv reader, returning the fields parsed from the line. +// We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields. +// However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated), +// so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls. +func csvReadLine(csvReader *csv.Reader) ([]string, error) { + lines := make([][]string, 0, 1) + for { + line, err := csvReader.Read() + if errors.Is(err, io.EOF) { + break + } + + if err != nil && len(line) == 0 { + return nil, fmt.Errorf("read csv line: %w", err) + } + + lines = append(lines, line) + } + + /* + This parser is parsing a single value, which came from a single log entry. + Therefore, if there are multiple lines here, it should be assumed that each + subsequent line contains a continuation of the last field in the previous line. + + Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", + expect reader.Read() to return bodies: + - ["aa","b"] + - ["b","cc","d"] + - ["d","ee"] + */ + + joinedLine := lines[0] + for i := 1; i < len(lines); i++ { + nextLine := lines[i] + + // The first element of the next line is a continuation of the previous line's last element + joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] + + // The remainder are separate elements + for n := 1; n < len(nextLine); n++ { + joinedLine = append(joinedLine, nextLine[n]) + } + } + + return joinedLine, nil +} diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go new file mode 100644 index 000000000000..bb456d5c15db --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -0,0 +1,447 @@ +package ottlfuncs + +import ( + "context" + "errors" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func Test_ParseCSV(t *testing.T) { + tests := []struct { + name string + oArgs ottl.Arguments + want map[string]any + createError string + parseError string + }{ + /* Test default mode */ + { + name: "Parse comma separated values", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with newline in first field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\nnewline,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1\nnewline", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with newline in middle field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2\nnewline,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2\nnewline", + "col3": "val3", + }, + }, + { + name: "Parse with newline in last field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3\nnewline", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3\nnewline", + }, + }, + { + name: "Parse with newline in multiple fields", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\nnewline1,val2\nnewline2,val3\nnewline3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1\nnewline1", + "col2": "val2\nnewline2", + "col3": "val3\nnewline3", + }, + }, + { + name: "Parse comma separated values with explicit mode", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Mode: ottl.NewTestingOptional("strict"), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse tab separated values", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\tval2\tval3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1\tcol2\tcol3", nil + }, + }, + Delimiter: ottl.NewTestingOptional("\t"), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Header delimiter is different from row delimiter", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\tval2\tval3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1 col2 col3", nil + }, + }, + Delimiter: ottl.NewTestingOptional("\t"), + HeaderDelimiter: ottl.NewTestingOptional(" "), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Invalid target (strict mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + }, + parseError: "get target: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Invalid header (strict mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + }, + parseError: "get header: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Invalid args", + oArgs: nil, + createError: "ParseCSVFactory args must be of type *ParseCSVArguments[K]", + }, + { + name: "Parse fails due to header/row column mismatch", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,val3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + }, + parseError: "wrong number of fields: expected 2, found 3", + }, + { + name: "Parse fails due to header/row column mismatch", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,val3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + }, + parseError: "wrong number of fields: expected 2, found 3", + }, + { + name: "Parse fails for row with bare quotes", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,v"al3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + parseError: "wrong number of fields: expected 3, found 2", + }, + + /* Test parsing with lazy quotes */ + { + name: "Parse lazyQuotes with quote in row", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,v"al3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Mode: ottl.NewTestingOptional("lazyQuotes"), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": `v"al3`, + }, + }, + { + name: "Parse lazyQuotes invalid csv", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,"val2,"val3,val4"`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3,col4", nil + }, + }, + Mode: ottl.NewTestingOptional("lazyQuotes"), + }, + parseError: "wrong number of fields: expected 4, found 2", + }, + /* Test parsing ignoring quotes */ + { + name: "Parse quotes invalid csv", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,"val2,"val3,val4"`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3,col4", nil + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + want: map[string]any{ + "col1": "val1", + "col2": `"val2`, + "col3": `"val3`, + "col4": `val4"`, + }, + }, + { + name: "Invalid target (ignoreQuotes mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + parseError: "get target: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Invalid header (ignoreQuotes mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + parseError: "get header: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + /* Validation tests */ + { + name: "Delimiter is greater than one character", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Delimiter: ottl.NewTestingOptional("bad_delim"), + }, + createError: "invalid arguments: delimiter must be a single character", + }, + { + name: "HeaderDelimiter is greater than one character", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + HeaderDelimiter: ottl.NewTestingOptional("bad_delim"), + }, + createError: "invalid arguments: header_delimiter must be a single character", + }, + { + name: "Bad mode", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Mode: ottl.NewTestingOptional("fake-mode"), + }, + createError: "unknown mode: fake-mode", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exprFunc, err := createParseCSVFunction[any](ottl.FunctionContext{}, tt.oArgs) + if tt.createError != "" { + require.ErrorContains(t, err, tt.createError) + return + } + + require.NoError(t, err) + + result, err := exprFunc(context.Background(), nil) + if tt.parseError != "" { + require.ErrorContains(t, err, tt.parseError) + return + } + + assert.NoError(t, err) + + resultMap, ok := result.(pcommon.Map) + require.True(t, ok) + + require.Equal(t, tt.want, resultMap.AsRaw()) + }) + } +} diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index 657b88280367..786e270bae36 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -56,6 +56,7 @@ func converters[K any]() []ottl.Factory[K] { NewMinutesFactory[K](), NewNanosecondsFactory[K](), NewNowFactory[K](), + NewParseCSVFactory[K](), NewParseJSONFactory[K](), NewParseKeyValueFactory[K](), NewSecondsFactory[K](), From bda815f98586652757d54fd42f6e8c52a0d7b5a6 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 5 Feb 2024 23:19:45 -0500 Subject: [PATCH 02/34] make gci --- pkg/ottl/ottlfuncs/func_parse_csv.go | 3 ++- pkg/ottl/ottlfuncs/func_parse_csv_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 901dc0d593ad..b10556e576f5 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -8,8 +8,9 @@ import ( "io" "strings" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) const ( diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index bb456d5c15db..c18ee6dab6e1 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -5,10 +5,11 @@ import ( "errors" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) func Test_ParseCSV(t *testing.T) { From 7fadbe59114a50f8937834d552fc9b233c66123c Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 5 Feb 2024 23:27:27 -0500 Subject: [PATCH 03/34] Add e2e test --- pkg/ottl/e2e/e2e_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/ottl/e2e/e2e_test.go b/pkg/ottl/e2e/e2e_test.go index ceab9dcc306a..ed4fc1ca86d3 100644 --- a/pkg/ottl/e2e/e2e_test.go +++ b/pkg/ottl/e2e/e2e_test.go @@ -430,6 +430,15 @@ func Test_e2e_converters(t *testing.T) { tCtx.GetLogRecord().Attributes().PutStr("test", "pass") }, }, + { + statement: `set(attributes["test"], ParseCSV("val1;val2;val3","header1|header2|header3",";","|","strict"))`, + want: func(tCtx ottllog.TransformContext) { + m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test") + m.PutStr("header1", "val1") + m.PutStr("header2", "val2") + m.PutStr("header3", "val3") + }, + }, { statement: `set(attributes["test"], ParseJSON("{\"id\":1}"))`, want: func(tCtx ottllog.TransformContext) { From 617cfba357b27dd6a19fab0fc71b67bef8548ca5 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 5 Feb 2024 23:43:01 -0500 Subject: [PATCH 04/34] Add docs --- pkg/ottl/ottlfuncs/README.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index 32a2d801ba4d..c1f9325da877 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -799,6 +799,35 @@ Examples: - `UnixSeconds(Now())` - `set(start_time, Now())` +### ParseCSV + +`ParseCSV(target, headers, Optional[delimiter], Optional[headerDelimiter], Optional[mode])` + +The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the target string as CSV. The resultant map is structured such that it is a mapping of field name -> field value. + +`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. + +`headers` is a Getter that returns a string. This string should be a CSV header, specifying the names of the CSV fields. + +`delimiter` is an optional string parameter that specifies the delimiter used to split `target` into fields. By default, it is set to `,`. + +`headerDelimiter` is an optional string parameter that specified the delimiter used to split `headers` into fields. By default, it is set to the value of `delimiter`. + +`mode` is an optional string paramater that specifies the parsing mode. Valid values are `strict`, `lazyQuotes`, and `ignoreQuotes`. By default, it is set to `strict`. + +Examples: + +- `ParseCSV("999-999-9999,Joe Smith,joe.smith@example.com", "phone,name,email")` + + +- `ParseCSV(body, "phone|name|email", delimiter="|")` + + +- `ParseCSV(attributes["csv_line"], attributes["csv_headers"], delimiter="|", headerDelimiter=",", mode="lazyQuotes")` + + +- `ParseCSV(body, "phone|name|email", mode="ignoreQuotes")` + ### ParseJSON `ParseJSON(target)` From 650e79fddb363fa01c37a05c857307a90d774d65 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 5 Feb 2024 23:43:42 -0500 Subject: [PATCH 05/34] add license --- pkg/ottl/ottlfuncs/func_parse_csv.go | 3 +++ pkg/ottl/ottlfuncs/func_parse_csv_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index b10556e576f5..31236d02314a 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package ottlfuncs import ( diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index c18ee6dab6e1..9b1e1b4545a7 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package ottlfuncs import ( From d881bf79853560dca0ea488b3d409368562bf9b2 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 11:45:42 -0500 Subject: [PATCH 06/34] tweak readme --- pkg/ottl/ottlfuncs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index c1f9325da877..afa62f37c06c 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -803,7 +803,7 @@ Examples: `ParseCSV(target, headers, Optional[delimiter], Optional[headerDelimiter], Optional[mode])` -The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the target string as CSV. The resultant map is structured such that it is a mapping of field name -> field value. +The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the `target` string as CSV. The resultant map is structured such that it is a mapping of field name -> field value. `target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. From 879b1edafbebf75eb1e34dcf50ab662556892369 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 12:34:50 -0500 Subject: [PATCH 07/34] add parsecsv to table of contents --- pkg/ottl/ottlfuncs/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index afa62f37c06c..72861c568258 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -401,6 +401,7 @@ Available Converters: - [Minutes](#minutes) - [Nanoseconds](#nanoseconds) - [Now](#now) +- [ParseCSV](#parsecsv) - [ParseJSON](#parsejson) - [ParseKeyValue](#parsekeyvalue) - [Seconds](#seconds) From 5e1c3650ebab97cbaa39b876f984da1d2c243128 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 12:46:29 -0500 Subject: [PATCH 08/34] explain modes --- pkg/ottl/ottlfuncs/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index 72861c568258..8b0f45ae241e 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -815,6 +815,9 @@ The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result `headerDelimiter` is an optional string parameter that specified the delimiter used to split `headers` into fields. By default, it is set to the value of `delimiter`. `mode` is an optional string paramater that specifies the parsing mode. Valid values are `strict`, `lazyQuotes`, and `ignoreQuotes`. By default, it is set to `strict`. +- The `strict` mode provides typical CSV parsing. +- The `lazyQotes` mode provides a relaxed version of CSV parsing where a quote may appear in the middle of a unquoted field. +- The `ignoreQuotes` completely ignores any quoting rules for CSV and just splits the row on the delimiter. Examples: From a4c2e60e7fb17e84833cdd7b8f962d4ccda2c5ee Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 13:08:50 -0500 Subject: [PATCH 09/34] add e2e test for specifying optionals --- pkg/ottl/e2e/e2e_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/ottl/e2e/e2e_test.go b/pkg/ottl/e2e/e2e_test.go index ed4fc1ca86d3..26dbaf85be18 100644 --- a/pkg/ottl/e2e/e2e_test.go +++ b/pkg/ottl/e2e/e2e_test.go @@ -439,6 +439,15 @@ func Test_e2e_converters(t *testing.T) { m.PutStr("header3", "val3") }, }, + { + statement: `set(attributes["test"], ParseCSV("val1,val2,val3","header1|header2|header3",headerDelimiter="|",mode="strict"))`, + want: func(tCtx ottllog.TransformContext) { + m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test") + m.PutStr("header1", "val1") + m.PutStr("header2", "val2") + m.PutStr("header3", "val3") + }, + }, { statement: `set(attributes["test"], ParseJSON("{\"id\":1}"))`, want: func(tCtx ottllog.TransformContext) { From 60ef3259ffe433b9536d0e754a1c097061d76815 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 13:25:31 -0500 Subject: [PATCH 10/34] edit ignore quotes example to show why you'd use it --- pkg/ottl/ottlfuncs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index 8b0f45ae241e..c5082be23d96 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -830,7 +830,7 @@ Examples: - `ParseCSV(attributes["csv_line"], attributes["csv_headers"], delimiter="|", headerDelimiter=",", mode="lazyQuotes")` -- `ParseCSV(body, "phone|name|email", mode="ignoreQuotes")` +- `ParseCSV("\"555-555-5556,Joe Smith\",joe.smith@example.com", "phone,name,email", mode="ignoreQuotes")` ### ParseJSON From 52545537cb50ed9e629c99ea59c20f77a190f769 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 13:29:06 -0500 Subject: [PATCH 11/34] add changelog --- .chloggen/feat_ottl_csv-parse-function.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100755 .chloggen/feat_ottl_csv-parse-function.yaml diff --git a/.chloggen/feat_ottl_csv-parse-function.yaml b/.chloggen/feat_ottl_csv-parse-function.yaml new file mode 100755 index 000000000000..0cf88fb8455a --- /dev/null +++ b/.chloggen/feat_ottl_csv-parse-function.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds a new ParseCSV converter that can be used to parse CSV strings. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30921] From 91dbe5e5f41dbf4626b1f9db566a49d32ffecc7b Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 13:32:28 -0500 Subject: [PATCH 12/34] make goporto --- pkg/ottl/ottlfuncs/func_parse_csv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 31236d02314a..4d4cad282e73 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package ottlfuncs +package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" import ( "context" From 074f32ac84d073dfd23ba10108948aa2a721eba0 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 6 Feb 2024 13:35:17 -0500 Subject: [PATCH 13/34] add missing word. --- pkg/ottl/ottlfuncs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index c5082be23d96..cd376c88d801 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -817,7 +817,7 @@ The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result `mode` is an optional string paramater that specifies the parsing mode. Valid values are `strict`, `lazyQuotes`, and `ignoreQuotes`. By default, it is set to `strict`. - The `strict` mode provides typical CSV parsing. - The `lazyQotes` mode provides a relaxed version of CSV parsing where a quote may appear in the middle of a unquoted field. -- The `ignoreQuotes` completely ignores any quoting rules for CSV and just splits the row on the delimiter. +- The `ignoreQuotes` mode completely ignores any quoting rules for CSV and just splits the row on the delimiter. Examples: From 14f06ec2857aa6eaec40882c8f2ff6dd27982dd9 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 9 Feb 2024 14:11:40 -0500 Subject: [PATCH 14/34] remove double wrapping of error --- pkg/ottl/ottlfuncs/func_parse_csv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 4d4cad282e73..faffbfbda58b 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -117,7 +117,7 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim fields, err := csvReadLine(csvReader) if err != nil { - return nil, fmt.Errorf("read csv line: %w", err) + return nil, err } return csvHeadersMap(headers, fields) From 571ee1a084454eff9edcaf4f3aa669f5a55a1b55 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 9 Feb 2024 14:21:38 -0500 Subject: [PATCH 15/34] test empty row and empty header, tweak to fix bugs with that --- pkg/ottl/ottlfuncs/func_parse_csv.go | 5 ++++ pkg/ottl/ottlfuncs/func_parse_csv_test.go | 34 +++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index faffbfbda58b..d57fdbaf643c 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -187,6 +187,11 @@ func csvReadLine(csvReader *csv.Reader) ([]string, error) { lines = append(lines, line) } + // If the input is empty, we might not get any lines + if len(lines) == 0 { + return nil, errors.New("no csv lines found") + } + /* This parser is parsing a single value, which came from a single log entry. Therefore, if there are multiple lines here, it should be assumed that each diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index 9b1e1b4545a7..f53f57722c60 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -257,6 +257,40 @@ func Test_ParseCSV(t *testing.T) { }, parseError: "wrong number of fields: expected 2, found 3", }, + { + name: "Empty header string", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "", nil + }, + }, + }, + want: map[string]any{ + "": "val1", + }, + }, + { + name: "Parse fails due to empty row", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + parseError: "no csv lines found", + }, { name: "Parse fails for row with bare quotes", oArgs: &ParseCSVArguments[any]{ From 19fd5638efe1dbadbcf7851bf4e4acdc06b709b4 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 9 Feb 2024 14:25:07 -0500 Subject: [PATCH 16/34] clarify newline behaviour in README --- pkg/ottl/ottlfuncs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index cd376c88d801..2d2392b6d100 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -806,7 +806,7 @@ Examples: The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the `target` string as CSV. The resultant map is structured such that it is a mapping of field name -> field value. -`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. +`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. Newlines in target are not treated as row delimiters during parsing, and will be treated as though they are part of the field that are placed in. `headers` is a Getter that returns a string. This string should be a CSV header, specifying the names of the CSV fields. From 6a376ce480244cb22a1d583b7daada97241a8d69 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 9 Feb 2024 14:40:13 -0500 Subject: [PATCH 17/34] return error if header string is empty. --- pkg/ottl/ottlfuncs/func_parse_csv.go | 8 ++++++++ pkg/ottl/ottlfuncs/func_parse_csv_test.go | 23 +++++++++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index d57fdbaf643c..6be4fbd67004 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -108,6 +108,10 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim return nil, fmt.Errorf("get header: %w", err) } + if headerStr == "" { + return nil, errors.New("headers must not be an empty string") + } + headers := strings.Split(headerStr, headerDelimiterString) csvReader := csv.NewReader(strings.NewReader(targetStr)) @@ -139,6 +143,10 @@ func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter, return nil, fmt.Errorf("get header: %w", err) } + if headerStr == "" { + return nil, errors.New("headers must not be an empty string") + } + headers := strings.Split(headerStr, headerDelimiterString) // Ignoring quotes makes CSV parseable with just string.Split diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index f53f57722c60..3177a6a3bbe0 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -258,7 +258,7 @@ func Test_ParseCSV(t *testing.T) { parseError: "wrong number of fields: expected 2, found 3", }, { - name: "Empty header string", + name: "Empty header string (strict)", oArgs: &ParseCSVArguments[any]{ Target: ottl.StandardStringGetter[any]{ Getter: func(ctx context.Context, tCtx any) (any, error) { @@ -271,9 +271,7 @@ func Test_ParseCSV(t *testing.T) { }, }, }, - want: map[string]any{ - "": "val1", - }, + parseError: "headers must not be an empty string", }, { name: "Parse fails due to empty row", @@ -404,6 +402,23 @@ func Test_ParseCSV(t *testing.T) { }, parseError: "get header: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, + { + name: "Empty header string (ignoreQuotes)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "", nil + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + parseError: "headers must not be an empty string", + }, /* Validation tests */ { name: "Delimiter is greater than one character", From 48a54157f203232172815c499b7bf35d826fd1d4 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Feb 2024 10:46:21 -0500 Subject: [PATCH 18/34] add some more newline tests --- pkg/ottl/ottlfuncs/func_parse_csv_test.go | 60 +++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index 3177a6a3bbe0..52d714a29a8e 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -124,6 +124,66 @@ func Test_ParseCSV(t *testing.T) { "col3": "val3\nnewline3", }, }, + { + name: "Parse with leading newline", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "\nval1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with trailing newline", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3\n", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with newline at end of field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2\n,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2\n", + "col3": "val3", + }, + }, { name: "Parse comma separated values with explicit mode", oArgs: &ParseCSVArguments[any]{ From 00aed9c174984f0d62d3fcd359be78e0ac983917 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Feb 2024 10:50:00 -0500 Subject: [PATCH 19/34] Update readme newline behavior --- pkg/ottl/ottlfuncs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index 2d2392b6d100..8a4ffafd3050 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -806,7 +806,7 @@ Examples: The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the `target` string as CSV. The resultant map is structured such that it is a mapping of field name -> field value. -`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. Newlines in target are not treated as row delimiters during parsing, and will be treated as though they are part of the field that are placed in. +`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. Leading and trailing newlines in `target` will be stripped. Newlines elswhere in `target` are not treated as row delimiters during parsing, and will be treated as though they are part of the field that are placed in. `headers` is a Getter that returns a string. This string should be a CSV header, specifying the names of the CSV fields. From a4896fca98a8a29b7ab2dcb4dfe9675f8dada849 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 12 Feb 2024 10:54:09 -0500 Subject: [PATCH 20/34] minor tweak to add newline at end of field test --- pkg/ottl/ottlfuncs/func_parse_csv_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index 52d714a29a8e..a09421c6ee7b 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -169,7 +169,7 @@ func Test_ParseCSV(t *testing.T) { oArgs: &ParseCSVArguments[any]{ Target: ottl.StandardStringGetter[any]{ Getter: func(ctx context.Context, tCtx any) (any, error) { - return "val1,val2\n,val3", nil + return "val1\n,val2,val3", nil }, }, Header: ottl.StandardStringGetter[any]{ @@ -179,8 +179,8 @@ func Test_ParseCSV(t *testing.T) { }, }, want: map[string]any{ - "col1": "val1", - "col2": "val2\n", + "col1": "val1\n", + "col2": "val2", "col3": "val3", }, }, From ca71cb8a32100575b13a99e7fca5971aaf03417c Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 12:37:13 -0500 Subject: [PATCH 21/34] pull common logic to coreinternal --- internal/coreinternal/parseutils/csv.go | 75 +++++++++++++++++++ pkg/ottl/ottlfuncs/func_parse_csv.go | 97 +++++++------------------ pkg/stanza/operator/parser/csv/csv.go | 61 ++-------------- 3 files changed, 108 insertions(+), 125 deletions(-) create mode 100644 internal/coreinternal/parseutils/csv.go diff --git a/internal/coreinternal/parseutils/csv.go b/internal/coreinternal/parseutils/csv.go new file mode 100644 index 000000000000..93f3f4d9194d --- /dev/null +++ b/internal/coreinternal/parseutils/csv.go @@ -0,0 +1,75 @@ +package parseutils + +import ( + "encoding/csv" + "errors" + "fmt" + "io" +) + +// ReadCSVRow reads a CSV row from the csv reader, returning the fields parsed from the line. +// We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields. +// However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated), +// so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls. +func ReadCSVRow(csvReader *csv.Reader) ([]string, error) { + lines := make([][]string, 0, 1) + for { + line, err := csvReader.Read() + if errors.Is(err, io.EOF) { + break + } + + if err != nil && len(line) == 0 { + return nil, fmt.Errorf("read csv line: %w", err) + } + + lines = append(lines, line) + } + + // If the input is empty, we might not get any lines + if len(lines) == 0 { + return nil, errors.New("no csv lines found") + } + + /* + This parser is parsing a single value, which came from a single log entry. + Therefore, if there are multiple lines here, it should be assumed that each + subsequent line contains a continuation of the last field in the previous line. + + Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", + expect reader.Read() to return bodies: + - ["aa","b"] + - ["b","cc","d"] + - ["d","ee"] + */ + + joinedLine := lines[0] + for i := 1; i < len(lines); i++ { + nextLine := lines[i] + + // The first element of the next line is a continuation of the previous line's last element + joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] + + // The remainder are separate elements + for n := 1; n < len(nextLine); n++ { + joinedLine = append(joinedLine, nextLine[n]) + } + } + + return joinedLine, nil +} + +// MapCSVHeaders creates a map of headers[i] -> fields[i]. +func MapCSVHeaders(headers []string, fields []string) (map[string]any, error) { + parsedValues := make(map[string]any) + + if len(fields) != len(headers) { + return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) + } + + for i, val := range fields { + parsedValues[headers[i]] = val + } + + return parsedValues, nil +} diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 6be4fbd67004..d64d2eee1afb 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -8,11 +8,11 @@ import ( "encoding/csv" "errors" "fmt" - "io" "strings" "go.opentelemetry.io/collector/pdata/pcommon" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) @@ -119,12 +119,23 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim csvReader.FieldsPerRecord = len(headers) csvReader.LazyQuotes = lazyQuotes - fields, err := csvReadLine(csvReader) + fields, err := parseutils.ReadCSVRow(csvReader) if err != nil { return nil, err } - return csvHeadersMap(headers, fields) + headersToFields, err := parseutils.MapCSVHeaders(headers, fields) + if err != nil { + return nil, fmt.Errorf("map csv headers: %w", err) + } + + pMap := pcommon.NewMap() + err = pMap.FromRaw(headersToFields) + if err != nil { + return nil, fmt.Errorf("create pcommon.Map: %w", err) + } + + return pMap, nil } } @@ -151,79 +162,25 @@ func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter, // Ignoring quotes makes CSV parseable with just string.Split fields := strings.Split(targetStr, delimiterString) - return csvHeadersMap(headers, fields) - } -} - -// csvHeadersMap creates a map of headers[i] -> fields[i]. -func csvHeadersMap(headers []string, fields []string) (pcommon.Map, error) { - pMap := pcommon.NewMap() - parsedValues := make(map[string]any) - - if len(fields) != len(headers) { - return pMap, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) - } - - for i, val := range fields { - parsedValues[headers[i]] = val - } - - err := pMap.FromRaw(parsedValues) - if err != nil { - return pMap, fmt.Errorf("create pcommon.Map: %w", err) - } - return pMap, nil -} - -// csvReadLine reads a CSV line from the csv reader, returning the fields parsed from the line. -// We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields. -// However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated), -// so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls. -func csvReadLine(csvReader *csv.Reader) ([]string, error) { - lines := make([][]string, 0, 1) - for { - line, err := csvReader.Read() - if errors.Is(err, io.EOF) { - break + headersToFields, err := parseutils.MapCSVHeaders(headers, fields) + if err != nil { + return nil, fmt.Errorf("map csv headers: %w", err) } - if err != nil && len(line) == 0 { - return nil, fmt.Errorf("read csv line: %w", err) + pMap := pcommon.NewMap() + err = pMap.FromRaw(headersToFields) + if err != nil { + return nil, fmt.Errorf("create pcommon.Map: %w", err) } - lines = append(lines, line) + return pMap, nil } +} - // If the input is empty, we might not get any lines - if len(lines) == 0 { - return nil, errors.New("no csv lines found") - } - - /* - This parser is parsing a single value, which came from a single log entry. - Therefore, if there are multiple lines here, it should be assumed that each - subsequent line contains a continuation of the last field in the previous line. - - Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", - expect reader.Read() to return bodies: - - ["aa","b"] - - ["b","cc","d"] - - ["d","ee"] - */ - - joinedLine := lines[0] - for i := 1; i < len(lines); i++ { - nextLine := lines[i] - - // The first element of the next line is a continuation of the previous line's last element - joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] - - // The remainder are separate elements - for n := 1; n < len(nextLine); n++ { - joinedLine = append(joinedLine, nextLine[n]) - } - } +// csvHeadersMap creates a map of headers[i] -> fields[i]. +func csvHeadersMap(headers []string, fields []string) (pcommon.Map, error) { + pMap := pcommon.NewMap() - return joinedLine, nil + return pMap, nil } diff --git a/pkg/stanza/operator/parser/csv/csv.go b/pkg/stanza/operator/parser/csv/csv.go index 72348498b8d6..a93e0463ffbe 100644 --- a/pkg/stanza/operator/parser/csv/csv.go +++ b/pkg/stanza/operator/parser/csv/csv.go @@ -7,11 +7,11 @@ import ( csvparser "encoding/csv" "errors" "fmt" - "io" "strings" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -163,47 +163,12 @@ func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool reader.FieldsPerRecord = len(headers) reader.LazyQuotes = lazyQuotes - // Typically only need one - lines := make([][]string, 0, 1) - for { - line, err := reader.Read() - if errors.Is(err, io.EOF) { - break - } - - if err != nil && len(line) == 0 { - return nil, errors.New("failed to parse entry") - } - - lines = append(lines, line) - } - - /* - This parser is parsing a single value, which came from a single log entry. - Therefore, if there are multiple lines here, it should be assumed that each - subsequent line contains a continuation of the last field in the previous line. - - Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", - expect reader.Read() to return bodies: - - ["aa","b"] - - ["b","cc","d"] - - ["d","ee"] - */ - - joinedLine := lines[0] - for i := 1; i < len(lines); i++ { - nextLine := lines[i] - - // The first element of the next line is a continuation of the previous line's last element - joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] - - // The remainder are separate elements - for n := 1; n < len(nextLine); n++ { - joinedLine = append(joinedLine, nextLine[n]) - } + joinedLine, err := parseutils.ReadCSVRow(reader) + if err != nil { + return nil, err } - return headersMap(headers, joinedLine) + return parseutils.MapCSVHeaders(headers, joinedLine) } } @@ -217,7 +182,7 @@ func generateSplitParseFunc(headers []string, fieldDelimiter rune) parseFunc { // This parse function does not do any special quote handling; Splitting on the delimiter is sufficient. fields := strings.Split(csvLine, string(fieldDelimiter)) - return headersMap(headers, fields) + return parseutils.MapCSVHeaders(headers, fields) } } @@ -235,17 +200,3 @@ func valueAsString(value any) (string, error) { return s, nil } - -// headersMap creates a map of headers[i] -> fields[i]. -func headersMap(headers []string, fields []string) (map[string]any, error) { - parsedValues := make(map[string]any) - - if len(fields) != len(headers) { - return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) - } - - for i, val := range fields { - parsedValues[headers[i]] = val - } - return parsedValues, nil -} From 0e973f5d1cde5dd5bd1babdf9e7405c76fa349e2 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 12:45:51 -0500 Subject: [PATCH 22/34] move CSV reader creation into common as well --- internal/coreinternal/parseutils/csv.go | 10 ++++++++-- internal/coreinternal/parseutils/csv_test.go | 11 +++++++++++ pkg/ottl/ottlfuncs/func_parse_csv.go | 16 +--------------- pkg/stanza/operator/parser/csv/csv.go | 8 +------- 4 files changed, 21 insertions(+), 24 deletions(-) create mode 100644 internal/coreinternal/parseutils/csv_test.go diff --git a/internal/coreinternal/parseutils/csv.go b/internal/coreinternal/parseutils/csv.go index 93f3f4d9194d..603039e76766 100644 --- a/internal/coreinternal/parseutils/csv.go +++ b/internal/coreinternal/parseutils/csv.go @@ -5,16 +5,22 @@ import ( "errors" "fmt" "io" + "strings" ) // ReadCSVRow reads a CSV row from the csv reader, returning the fields parsed from the line. // We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields. // However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated), // so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls. -func ReadCSVRow(csvReader *csv.Reader) ([]string, error) { +func ReadCSVRow(row string, delimiter rune, headers []string, lazyQuotes bool) ([]string, error) { + reader := csv.NewReader(strings.NewReader(row)) + reader.Comma = delimiter + reader.FieldsPerRecord = len(headers) + reader.LazyQuotes = lazyQuotes + lines := make([][]string, 0, 1) for { - line, err := csvReader.Read() + line, err := reader.Read() if errors.Is(err, io.EOF) { break } diff --git a/internal/coreinternal/parseutils/csv_test.go b/internal/coreinternal/parseutils/csv_test.go new file mode 100644 index 000000000000..5ad59beb5b9e --- /dev/null +++ b/internal/coreinternal/parseutils/csv_test.go @@ -0,0 +1,11 @@ +package parseutils + +import "testing" + +func Test_ParseCSV(t *testing.T) { + +} + +func Test_MapCSVHeaders(t *testing.T) { + +} diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index d64d2eee1afb..5f40aa1582bf 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -5,7 +5,6 @@ package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-c import ( "context" - "encoding/csv" "errors" "fmt" "strings" @@ -113,13 +112,7 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim } headers := strings.Split(headerStr, headerDelimiterString) - - csvReader := csv.NewReader(strings.NewReader(targetStr)) - csvReader.Comma = delimiter - csvReader.FieldsPerRecord = len(headers) - csvReader.LazyQuotes = lazyQuotes - - fields, err := parseutils.ReadCSVRow(csvReader) + fields, err := parseutils.ReadCSVRow(targetStr, delimiter, headers, lazyQuotes) if err != nil { return nil, err } @@ -177,10 +170,3 @@ func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter, return pMap, nil } } - -// csvHeadersMap creates a map of headers[i] -> fields[i]. -func csvHeadersMap(headers []string, fields []string) (pcommon.Map, error) { - pMap := pcommon.NewMap() - - return pMap, nil -} diff --git a/pkg/stanza/operator/parser/csv/csv.go b/pkg/stanza/operator/parser/csv/csv.go index a93e0463ffbe..47a962e2c77a 100644 --- a/pkg/stanza/operator/parser/csv/csv.go +++ b/pkg/stanza/operator/parser/csv/csv.go @@ -4,7 +4,6 @@ package csv // import "github.com/open-telemetry/opentelemetry-collector-contrib import ( "context" - csvparser "encoding/csv" "errors" "fmt" "strings" @@ -158,12 +157,7 @@ func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool return nil, err } - reader := csvparser.NewReader(strings.NewReader(csvLine)) - reader.Comma = fieldDelimiter - reader.FieldsPerRecord = len(headers) - reader.LazyQuotes = lazyQuotes - - joinedLine, err := parseutils.ReadCSVRow(reader) + joinedLine, err := parseutils.ReadCSVRow(csvLine, fieldDelimiter, headers, lazyQuotes) if err != nil { return nil, err } From c1915ca2d1e9621af8d424915b13b95a0e15e80f Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 13:17:31 -0500 Subject: [PATCH 23/34] add tests for shared parsing, minor tweaks --- internal/coreinternal/parseutils/csv.go | 9 +- internal/coreinternal/parseutils/csv_test.go | 163 ++++++++++++++++++- pkg/ottl/ottlfuncs/func_parse_csv.go | 2 +- pkg/stanza/operator/parser/csv/csv.go | 2 +- 4 files changed, 169 insertions(+), 7 deletions(-) diff --git a/internal/coreinternal/parseutils/csv.go b/internal/coreinternal/parseutils/csv.go index 603039e76766..a2311ac63652 100644 --- a/internal/coreinternal/parseutils/csv.go +++ b/internal/coreinternal/parseutils/csv.go @@ -12,10 +12,11 @@ import ( // We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields. // However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated), // so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls. -func ReadCSVRow(row string, delimiter rune, headers []string, lazyQuotes bool) ([]string, error) { +func ReadCSVRow(row string, delimiter rune, lazyQuotes bool) ([]string, error) { reader := csv.NewReader(strings.NewReader(row)) reader.Comma = delimiter - reader.FieldsPerRecord = len(headers) + // -1 indicates a variable length of fields + reader.FieldsPerRecord = -1 reader.LazyQuotes = lazyQuotes lines := make([][]string, 0, 1) @@ -67,12 +68,12 @@ func ReadCSVRow(row string, delimiter rune, headers []string, lazyQuotes bool) ( // MapCSVHeaders creates a map of headers[i] -> fields[i]. func MapCSVHeaders(headers []string, fields []string) (map[string]any, error) { - parsedValues := make(map[string]any) - if len(fields) != len(headers) { return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) } + parsedValues := make(map[string]any, len(headers)) + for i, val := range fields { parsedValues[headers[i]] = val } diff --git a/internal/coreinternal/parseutils/csv_test.go b/internal/coreinternal/parseutils/csv_test.go index 5ad59beb5b9e..462f63e0636c 100644 --- a/internal/coreinternal/parseutils/csv_test.go +++ b/internal/coreinternal/parseutils/csv_test.go @@ -1,11 +1,172 @@ package parseutils -import "testing" +import ( + "testing" + + "github.com/stretchr/testify/require" +) func Test_ParseCSV(t *testing.T) { + testCases := []struct { + name string + row string + delimiter rune + lazyQuotes bool + expectedRow []string + expectedErr string + }{ + { + name: "Typical CSV row", + row: "field1,field2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2", "field3"}, + }, + { + name: "Quoted CSV row", + row: `field1,"field2,contains delimiter",field3`, + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2,contains delimiter", "field3"}, + }, + { + name: "Bare quote in field (strict)", + row: `field1,field"2,field3`, + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1"}, + }, + { + name: "Bare quote in field (lazy quotes)", + row: `field1,field"2,field3`, + delimiter: ',', + lazyQuotes: true, + expectedRow: []string{"field1", `field"2`, "field3"}, + }, + { + name: "Empty row", + row: "", + delimiter: ',', + lazyQuotes: false, + expectedErr: "no csv lines found", + }, + { + name: "Newlines in field", + row: "field1,fie\nld2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "fie\nld2", "field3"}, + }, + { + name: "Newlines prefix field", + row: "field1,\nfield2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "\nfield2", "field3"}, + }, + { + name: "Newlines suffix field", + row: "field1,field2\n,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2\n", "field3"}, + }, + { + name: "Newlines prefix row", + row: "\nfield1,field2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2", "field3"}, + }, + { + name: "Newlines suffix row", + row: "field1,field2,field3\n", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2", "field3"}, + }, + { + name: "Newlines in first row", + row: "fiel\nd1,field2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"fiel\nd1", "field2", "field3"}, + }, + { + name: "Newlines in all rows", + row: "\nfiel\nd1,fie\nld2,fie\nld3\n", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"fiel\nd1", "fie\nld2", "fie\nld3"}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + s, err := ReadCSVRow(tc.row, tc.delimiter, tc.lazyQuotes) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.Equal(t, tc.expectedRow, s) + } + }) + } } func Test_MapCSVHeaders(t *testing.T) { + testCases := []struct { + name string + headers []string + fields []string + expectedMap map[string]any + expectedErr string + }{ + { + name: "Map headers to fields", + headers: []string{"Col1", "Col2", "Col3"}, + fields: []string{"Val1", "Val2", "Val3"}, + expectedMap: map[string]any{ + "Col1": "Val1", + "Col2": "Val2", + "Col3": "Val3", + }, + }, + { + name: "Missing field", + headers: []string{"Col1", "Col2", "Col3"}, + fields: []string{"Val1", "Val2"}, + expectedErr: "wrong number of fields: expected 3, found 2", + }, + { + name: "Too many fields", + headers: []string{"Col1", "Col2", "Col3"}, + fields: []string{"Val1", "Val2", "Val3", "Val4"}, + expectedErr: "wrong number of fields: expected 3, found 4", + }, + { + name: "Single field", + headers: []string{"Col1"}, + fields: []string{"Val1"}, + expectedMap: map[string]any{ + "Col1": "Val1", + }, + }, + { + name: "No fields", + headers: []string{}, + fields: []string{}, + expectedMap: map[string]any{}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m, err := MapCSVHeaders(tc.headers, tc.fields) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.Equal(t, tc.expectedMap, m) + } + }) + } } diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 5f40aa1582bf..dd5213b8c96c 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -112,7 +112,7 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim } headers := strings.Split(headerStr, headerDelimiterString) - fields, err := parseutils.ReadCSVRow(targetStr, delimiter, headers, lazyQuotes) + fields, err := parseutils.ReadCSVRow(targetStr, delimiter, lazyQuotes) if err != nil { return nil, err } diff --git a/pkg/stanza/operator/parser/csv/csv.go b/pkg/stanza/operator/parser/csv/csv.go index 47a962e2c77a..f9546fa79d41 100644 --- a/pkg/stanza/operator/parser/csv/csv.go +++ b/pkg/stanza/operator/parser/csv/csv.go @@ -157,7 +157,7 @@ func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool return nil, err } - joinedLine, err := parseutils.ReadCSVRow(csvLine, fieldDelimiter, headers, lazyQuotes) + joinedLine, err := parseutils.ReadCSVRow(csvLine, fieldDelimiter, lazyQuotes) if err != nil { return nil, err } From d286d1cf799ad8390d94ee14d3a010cfc881808b Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 13:18:47 -0500 Subject: [PATCH 24/34] add license --- internal/coreinternal/parseutils/csv.go | 3 +++ internal/coreinternal/parseutils/csv_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/internal/coreinternal/parseutils/csv.go b/internal/coreinternal/parseutils/csv.go index a2311ac63652..77eeae427586 100644 --- a/internal/coreinternal/parseutils/csv.go +++ b/internal/coreinternal/parseutils/csv.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package parseutils import ( diff --git a/internal/coreinternal/parseutils/csv_test.go b/internal/coreinternal/parseutils/csv_test.go index 462f63e0636c..1d93b89bcf3e 100644 --- a/internal/coreinternal/parseutils/csv_test.go +++ b/internal/coreinternal/parseutils/csv_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package parseutils import ( From 1037fb024a4cbe534f5d0e82f7accf4e0fbdc180 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 13:25:56 -0500 Subject: [PATCH 25/34] make goporto --- internal/coreinternal/parseutils/csv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/coreinternal/parseutils/csv.go b/internal/coreinternal/parseutils/csv.go index 77eeae427586..5354213f2dde 100644 --- a/internal/coreinternal/parseutils/csv.go +++ b/internal/coreinternal/parseutils/csv.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package parseutils +package parseutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils" import ( "encoding/csv" From aa9c9246002adb929415d31b68af42833555a5e9 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 13:44:51 -0500 Subject: [PATCH 26/34] add @djaglowski as codeowner for parseutils --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3172505aa2c8..88fd2df30e55 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -116,6 +116,7 @@ extension/sumologicextension/ @open-telemetry/collect internal/aws/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @mxiamxia internal/collectd/ @open-telemetry/collector-contrib-approvers @atoulme internal/coreinternal/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers +internal/coreinternal/parseutils/ @open-telemetry/collector-contrib-approvers @djaglowski internal/datadog/ @open-telemetry/collector-contrib-approvers @mx-psi @dineshg13 internal/docker/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick @jamesmoessis internal/filter/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers From 2c692b7db93cb83407472831edd59ed27c03888d Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 14 Feb 2024 15:00:03 -0500 Subject: [PATCH 27/34] remove parseutils from codeowners --- .github/CODEOWNERS | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 88fd2df30e55..3172505aa2c8 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -116,7 +116,6 @@ extension/sumologicextension/ @open-telemetry/collect internal/aws/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @mxiamxia internal/collectd/ @open-telemetry/collector-contrib-approvers @atoulme internal/coreinternal/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers -internal/coreinternal/parseutils/ @open-telemetry/collector-contrib-approvers @djaglowski internal/datadog/ @open-telemetry/collector-contrib-approvers @mx-psi @dineshg13 internal/docker/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick @jamesmoessis internal/filter/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers From 415cb0c20ac9f2e097623252365664d464c75e02 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 09:20:37 -0500 Subject: [PATCH 28/34] checkout csv.go from main --- pkg/stanza/operator/parser/csv/csv.go | 67 ++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 6 deletions(-) diff --git a/pkg/stanza/operator/parser/csv/csv.go b/pkg/stanza/operator/parser/csv/csv.go index f9546fa79d41..72348498b8d6 100644 --- a/pkg/stanza/operator/parser/csv/csv.go +++ b/pkg/stanza/operator/parser/csv/csv.go @@ -4,13 +4,14 @@ package csv // import "github.com/open-telemetry/opentelemetry-collector-contrib import ( "context" + csvparser "encoding/csv" "errors" "fmt" + "io" "strings" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -157,12 +158,52 @@ func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool return nil, err } - joinedLine, err := parseutils.ReadCSVRow(csvLine, fieldDelimiter, lazyQuotes) - if err != nil { - return nil, err + reader := csvparser.NewReader(strings.NewReader(csvLine)) + reader.Comma = fieldDelimiter + reader.FieldsPerRecord = len(headers) + reader.LazyQuotes = lazyQuotes + + // Typically only need one + lines := make([][]string, 0, 1) + for { + line, err := reader.Read() + if errors.Is(err, io.EOF) { + break + } + + if err != nil && len(line) == 0 { + return nil, errors.New("failed to parse entry") + } + + lines = append(lines, line) + } + + /* + This parser is parsing a single value, which came from a single log entry. + Therefore, if there are multiple lines here, it should be assumed that each + subsequent line contains a continuation of the last field in the previous line. + + Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", + expect reader.Read() to return bodies: + - ["aa","b"] + - ["b","cc","d"] + - ["d","ee"] + */ + + joinedLine := lines[0] + for i := 1; i < len(lines); i++ { + nextLine := lines[i] + + // The first element of the next line is a continuation of the previous line's last element + joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] + + // The remainder are separate elements + for n := 1; n < len(nextLine); n++ { + joinedLine = append(joinedLine, nextLine[n]) + } } - return parseutils.MapCSVHeaders(headers, joinedLine) + return headersMap(headers, joinedLine) } } @@ -176,7 +217,7 @@ func generateSplitParseFunc(headers []string, fieldDelimiter rune) parseFunc { // This parse function does not do any special quote handling; Splitting on the delimiter is sufficient. fields := strings.Split(csvLine, string(fieldDelimiter)) - return parseutils.MapCSVHeaders(headers, fields) + return headersMap(headers, fields) } } @@ -194,3 +235,17 @@ func valueAsString(value any) (string, error) { return s, nil } + +// headersMap creates a map of headers[i] -> fields[i]. +func headersMap(headers []string, fields []string) (map[string]any, error) { + parsedValues := make(map[string]any) + + if len(fields) != len(headers) { + return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) + } + + for i, val := range fields { + parsedValues[headers[i]] = val + } + return parsedValues, nil +} From 201b53372baa1343d57ae78da5d3ca0d6c1142de Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 14:33:13 -0500 Subject: [PATCH 29/34] make header delimiter a string, not a rune --- pkg/ottl/ottlfuncs/func_parse_csv.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index dd5213b8c96c..2fe8ec5d14ca 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -71,9 +71,9 @@ func createParseCSVFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) // headerDelimiter defaults to the chosen delimter, // since in most cases headerDelimiter == delmiter. - headerDelimiter := delimiter + headerDelimiter := string(delimiter) if !args.HeaderDelimiter.IsEmpty() { - headerDelimiter = []rune(args.HeaderDelimiter.Get())[0] + headerDelimiter = args.HeaderDelimiter.Get() } mode := parseCSVDefaultMode @@ -93,9 +93,7 @@ func createParseCSVFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) return nil, fmt.Errorf("unknown mode: %s", mode) } -func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelimiter rune, lazyQuotes bool) ottl.ExprFunc[K] { - headerDelimiterString := string([]rune{headerDelimiter}) - +func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string, lazyQuotes bool) ottl.ExprFunc[K] { return func(ctx context.Context, tCtx K) (any, error) { targetStr, err := target.Get(ctx, tCtx) if err != nil { @@ -111,7 +109,7 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim return nil, errors.New("headers must not be an empty string") } - headers := strings.Split(headerStr, headerDelimiterString) + headers := strings.Split(headerStr, headerDelimiter) fields, err := parseutils.ReadCSVRow(targetStr, delimiter, lazyQuotes) if err != nil { return nil, err @@ -132,8 +130,7 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter, headerDelim } } -func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter, headerDelimiter rune) ottl.ExprFunc[K] { - headerDelimiterString := string([]rune{headerDelimiter}) +func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string) ottl.ExprFunc[K] { delimiterString := string([]rune{delimiter}) return func(ctx context.Context, tCtx K) (any, error) { @@ -151,7 +148,7 @@ func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter, return nil, errors.New("headers must not be an empty string") } - headers := strings.Split(headerStr, headerDelimiterString) + headers := strings.Split(headerStr, headerDelimiter) // Ignoring quotes makes CSV parseable with just string.Split fields := strings.Split(targetStr, delimiterString) From f9c5c038492dbd0b841128bb599bd7a3442ac804 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 14:56:54 -0500 Subject: [PATCH 30/34] reduce error wrapping --- pkg/ottl/ottlfuncs/func_parse_csv.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 2fe8ec5d14ca..a7a48f857cd3 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -97,12 +97,12 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, header return func(ctx context.Context, tCtx K) (any, error) { targetStr, err := target.Get(ctx, tCtx) if err != nil { - return nil, fmt.Errorf("get target: %w", err) + return nil, err } headerStr, err := header.Get(ctx, tCtx) if err != nil { - return nil, fmt.Errorf("get header: %w", err) + return nil, err } if headerStr == "" { @@ -122,11 +122,7 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, header pMap := pcommon.NewMap() err = pMap.FromRaw(headersToFields) - if err != nil { - return nil, fmt.Errorf("create pcommon.Map: %w", err) - } - - return pMap, nil + return pMap, err } } @@ -136,12 +132,12 @@ func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter return func(ctx context.Context, tCtx K) (any, error) { targetStr, err := target.Get(ctx, tCtx) if err != nil { - return nil, fmt.Errorf("get target: %w", err) + return nil, err } headerStr, err := header.Get(ctx, tCtx) if err != nil { - return nil, fmt.Errorf("get header: %w", err) + return nil, err } if headerStr == "" { @@ -160,10 +156,6 @@ func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter pMap := pcommon.NewMap() err = pMap.FromRaw(headersToFields) - if err != nil { - return nil, fmt.Errorf("create pcommon.Map: %w", err) - } - - return pMap, nil + return pMap, err } } From cf250c423a968b68429c220d675de882d4fa091a Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 15:11:21 -0500 Subject: [PATCH 31/34] Collapse parseCSV and parseCSVIgnoreQuotes together --- pkg/ottl/ottlfuncs/func_parse_csv.go | 69 ++++++++--------------- pkg/ottl/ottlfuncs/func_parse_csv_test.go | 8 +-- 2 files changed, 26 insertions(+), 51 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index a7a48f857cd3..3335490e3f6e 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -47,6 +47,15 @@ func (p ParseCSVArguments[K]) validate() error { } } + if !p.Mode.IsEmpty() { + mode := p.Mode.Get() + switch mode { + case parseCSVModeStrict, parseCSVModeLazyQuotes, parseCSVModeIgnoreQuotes: //OK + default: + return fmt.Errorf("unknown mode: %s", mode) + } + } + return nil } @@ -81,19 +90,10 @@ func createParseCSVFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) mode = args.Mode.Get() } - switch mode { - case parseCSVModeStrict: - return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, false), nil - case parseCSVModeLazyQuotes: - return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, true), nil - case parseCSVModeIgnoreQuotes: - return parseCSVIgnoreQuotes(args.Target, args.Header, delimiter, headerDelimiter), nil - } - - return nil, fmt.Errorf("unknown mode: %s", mode) + return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, mode), nil } -func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string, lazyQuotes bool) ottl.ExprFunc[K] { +func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string, mode string) ottl.ExprFunc[K] { return func(ctx context.Context, tCtx K) (any, error) { targetStr, err := target.Get(ctx, tCtx) if err != nil { @@ -110,45 +110,20 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, header } headers := strings.Split(headerStr, headerDelimiter) - fields, err := parseutils.ReadCSVRow(targetStr, delimiter, lazyQuotes) - if err != nil { - return nil, err - } - - headersToFields, err := parseutils.MapCSVHeaders(headers, fields) - if err != nil { - return nil, fmt.Errorf("map csv headers: %w", err) - } - - pMap := pcommon.NewMap() - err = pMap.FromRaw(headersToFields) - return pMap, err - } -} - -func parseCSVIgnoreQuotes[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string) ottl.ExprFunc[K] { - delimiterString := string([]rune{delimiter}) - - return func(ctx context.Context, tCtx K) (any, error) { - targetStr, err := target.Get(ctx, tCtx) - if err != nil { - return nil, err - } - - headerStr, err := header.Get(ctx, tCtx) - if err != nil { - return nil, err - } - if headerStr == "" { - return nil, errors.New("headers must not be an empty string") + var fields []string + switch mode { + case parseCSVModeStrict, parseCSVModeLazyQuotes: + lazyQuotes := mode == parseCSVModeLazyQuotes + fields, err = parseutils.ReadCSVRow(targetStr, delimiter, lazyQuotes) + if err != nil { + return nil, err + } + case parseCSVModeIgnoreQuotes: + // Ignoring quotes makes CSV parseable with just string.Split + fields = strings.Split(targetStr, string([]rune{delimiter})) } - headers := strings.Split(headerStr, headerDelimiter) - - // Ignoring quotes makes CSV parseable with just string.Split - fields := strings.Split(targetStr, delimiterString) - headersToFields, err := parseutils.MapCSVHeaders(headers, fields) if err != nil { return nil, fmt.Errorf("map csv headers: %w", err) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index a09421c6ee7b..e597ad4c7cbd 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -262,7 +262,7 @@ func Test_ParseCSV(t *testing.T) { }, }, }, - parseError: "get target: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Invalid header (strict mode)", @@ -278,7 +278,7 @@ func Test_ParseCSV(t *testing.T) { }, }, }, - parseError: "get header: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Invalid args", @@ -443,7 +443,7 @@ func Test_ParseCSV(t *testing.T) { }, Mode: ottl.NewTestingOptional("ignoreQuotes"), }, - parseError: "get target: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Invalid header (ignoreQuotes mode)", @@ -460,7 +460,7 @@ func Test_ParseCSV(t *testing.T) { }, Mode: ottl.NewTestingOptional("ignoreQuotes"), }, - parseError: "get header: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Empty header string (ignoreQuotes)", From 2e2acf58cb5f597ee87d4d0902a382ac8f40b5f8 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 15:24:37 -0500 Subject: [PATCH 32/34] swap to using callback for parsing the row --- pkg/ottl/ottlfuncs/func_parse_csv.go | 53 ++++++++++++++++------------ 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index 3335490e3f6e..c8af977fe444 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -47,15 +47,6 @@ func (p ParseCSVArguments[K]) validate() error { } } - if !p.Mode.IsEmpty() { - mode := p.Mode.Get() - switch mode { - case parseCSVModeStrict, parseCSVModeLazyQuotes, parseCSVModeIgnoreQuotes: //OK - default: - return fmt.Errorf("unknown mode: %s", mode) - } - } - return nil } @@ -90,10 +81,24 @@ func createParseCSVFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) mode = args.Mode.Get() } - return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, mode), nil + var parseRow parseCSVRowFunc + switch mode { + case parseCSVModeStrict: + parseRow = parseCSVRow(false) + case parseCSVModeLazyQuotes: + parseRow = parseCSVRow(true) + case parseCSVModeIgnoreQuotes: + parseRow = parseCSVRowIgnoreQuotes() + default: + return nil, fmt.Errorf("unknown mode: %s", mode) + } + + return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, parseRow), nil } -func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string, mode string) ottl.ExprFunc[K] { +type parseCSVRowFunc func(row string, delimiter rune) ([]string, error) + +func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string, parseRow parseCSVRowFunc) ottl.ExprFunc[K] { return func(ctx context.Context, tCtx K) (any, error) { targetStr, err := target.Get(ctx, tCtx) if err != nil { @@ -111,17 +116,9 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, header headers := strings.Split(headerStr, headerDelimiter) - var fields []string - switch mode { - case parseCSVModeStrict, parseCSVModeLazyQuotes: - lazyQuotes := mode == parseCSVModeLazyQuotes - fields, err = parseutils.ReadCSVRow(targetStr, delimiter, lazyQuotes) - if err != nil { - return nil, err - } - case parseCSVModeIgnoreQuotes: - // Ignoring quotes makes CSV parseable with just string.Split - fields = strings.Split(targetStr, string([]rune{delimiter})) + fields, err := parseRow(targetStr, delimiter) + if err != nil { + return nil, err } headersToFields, err := parseutils.MapCSVHeaders(headers, fields) @@ -134,3 +131,15 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, header return pMap, err } } + +func parseCSVRow(lazyQuotes bool) parseCSVRowFunc { + return func(row string, delimiter rune) ([]string, error) { + return parseutils.ReadCSVRow(row, delimiter, lazyQuotes) + } +} + +func parseCSVRowIgnoreQuotes() parseCSVRowFunc { + return func(row string, delimiter rune) ([]string, error) { + return strings.Split(row, string([]rune{delimiter})), nil + } +} From eed4db3e6107d8261699f5d40f365b9e7131f2dc Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 15:27:32 -0500 Subject: [PATCH 33/34] wrap get error with human-friendly error message --- pkg/ottl/ottlfuncs/func_parse_csv.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go index c8af977fe444..dd0a88dc2343 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -102,12 +102,12 @@ func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, header return func(ctx context.Context, tCtx K) (any, error) { targetStr, err := target.Get(ctx, tCtx) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting value for target in ParseCSV: %w", err) } headerStr, err := header.Get(ctx, tCtx) if err != nil { - return nil, err + return nil, fmt.Errorf("error getting value for header in ParseCSV: %w", err) } if headerStr == "" { From 481f57785521faa7a4b577ce248642f1b436e4e1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 15 Feb 2024 15:28:34 -0500 Subject: [PATCH 34/34] fix tests after modifying error message --- pkg/ottl/ottlfuncs/func_parse_csv_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go index e597ad4c7cbd..859df2d82cb0 100644 --- a/pkg/ottl/ottlfuncs/func_parse_csv_test.go +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -262,7 +262,7 @@ func Test_ParseCSV(t *testing.T) { }, }, }, - parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value for target in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Invalid header (strict mode)", @@ -278,7 +278,7 @@ func Test_ParseCSV(t *testing.T) { }, }, }, - parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value for header in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Invalid args", @@ -443,7 +443,7 @@ func Test_ParseCSV(t *testing.T) { }, Mode: ottl.NewTestingOptional("ignoreQuotes"), }, - parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value for target in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Invalid header (ignoreQuotes mode)", @@ -460,7 +460,7 @@ func Test_ParseCSV(t *testing.T) { }, Mode: ottl.NewTestingOptional("ignoreQuotes"), }, - parseError: "error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + parseError: "error getting value for header in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", }, { name: "Empty header string (ignoreQuotes)",