diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 42d99d44f..c3052e68f 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -1,19 +1,13 @@ package file import ( - "bufio" "fmt" - "regexp" - "strings" "time" "github.com/bmatcuk/doublestar/v2" "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" - "golang.org/x/text/encoding" - "golang.org/x/text/encoding/ianaindex" - "golang.org/x/text/encoding/unicode" ) func init() { @@ -35,7 +29,7 @@ func NewInputConfig(operatorID string) *InputConfig { StartAt: "end", MaxLogSize: defaultMaxLogSize, MaxConcurrentFiles: defaultMaxConcurrentFiles, - Encoding: "nop", + Encoding: helper.NewEncodingConfig(), } } @@ -46,21 +40,15 @@ type InputConfig struct { Include []string `json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` - Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` -} - -// MultilineConfig is the configuration a multiline operation -type MultilineConfig struct { - LineStartPattern string `json:"line_start_pattern" yaml:"line_start_pattern"` - LineEndPattern string `json:"line_end_pattern" yaml:"line_end_pattern"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` + IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` + Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -104,12 +92,12 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", minFingerprintSize) } - encoding, err := lookupEncoding(c.Encoding) + encoding, err := c.Encoding.Build(context) if err != nil { return nil, err } - splitFunc, err := c.getSplitFunc(encoding) + splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false) if err != nil { return nil, err } @@ -157,57 +145,3 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return []operator.Operator{op}, nil } - -var encodingOverrides = map[string]encoding.Encoding{ - "utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), - "utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), - "utf8": unicode.UTF8, - "ascii": unicode.UTF8, - "us-ascii": unicode.UTF8, - "nop": encoding.Nop, - "": encoding.Nop, -} - -func lookupEncoding(enc string) (encoding.Encoding, error) { - if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok { - return encoding, nil - } - encoding, err := ianaindex.IANA.Encoding(enc) - if err != nil { - return nil, fmt.Errorf("unsupported encoding '%s'", enc) - } - if encoding == nil { - return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc) - } - return encoding, nil -} - -// getSplitFunc will return the split function associated the configured mode. -func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { - if c.Multiline == nil { - return NewNewlineSplitFunc(encoding) - } - endPattern := c.Multiline.LineEndPattern - startPattern := c.Multiline.LineStartPattern - - switch { - case endPattern != "" && startPattern != "": - return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") - case endPattern == "" && startPattern == "": - return nil, fmt.Errorf("one of line_start_pattern or line_end_pattern must be set") - case endPattern != "": - re, err := regexp.Compile("(?m)" + c.Multiline.LineEndPattern) - if err != nil { - return nil, fmt.Errorf("compile line end regex: %s", err) - } - return NewLineEndSplitFunc(re), nil - case startPattern != "": - re, err := regexp.Compile("(?m)" + c.Multiline.LineStartPattern) - if err != nil { - return nil, fmt.Errorf("compile line start regex: %s", err) - } - return NewLineStartSplitFunc(re), nil - default: - return nil, fmt.Errorf("unreachable") - } -} diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go new file mode 100644 index 000000000..f7e919a30 --- /dev/null +++ b/operator/builtin/input/file/config_test.go @@ -0,0 +1,528 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "testing" + "time" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/operator/helper/operatortest" +) + +func TestConfig(t *testing.T) { + cases := []operatortest.ConfigUnmarshalTest{ + { + Name: "default", + ExpectErr: false, + Expect: defaultCfg(), + }, + { + + Name: "extra_field", + ExpectErr: false, + Expect: defaultCfg(), + }, + { + Name: "id_custom", + ExpectErr: false, + Expect: NewInputConfig("test_id"), + }, + { + Name: "include_one", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + return cfg + }(), + }, + { + Name: "include_multi", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log", "two.log", "three.log") + return cfg + }(), + }, + { + Name: "include_glob", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + return cfg + }(), + }, + { + Name: "include_glob_double_asterisk", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "**.log") + return cfg + }(), + }, + { + Name: "include_glob_double_asterisk_nested", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "directory/**/*.log") + return cfg + }(), + }, + { + Name: "include_glob_double_asterisk_prefix", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "**/directory/**/*.log") + return cfg + }(), + }, + { + Name: "include_inline", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "a.log", "b.log") + return cfg + }(), + }, + { + Name: "include_invalid", + ExpectErr: true, + Expect: nil, + }, + { + Name: "exclude_one", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "one.log") + return cfg + }(), + }, + { + Name: "exclude_multi", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "one.log", "two.log", "three.log") + return cfg + }(), + }, + { + Name: "exclude_glob", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "not*.log") + return cfg + }(), + }, + { + Name: "exclude_glob_double_asterisk", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "not**.log") + return cfg + }(), + }, + { + Name: "exclude_glob_double_asterisk_nested", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "directory/**/not*.log") + return cfg + }(), + }, + { + Name: "exclude_glob_double_asterisk_prefix", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "**/directory/**/not*.log") + return cfg + }(), + }, + { + Name: "exclude_inline", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "*.log") + cfg.Exclude = append(cfg.Exclude, "a.log", "b.log") + return cfg + }(), + }, + { + Name: "exclude_invalid", + ExpectErr: true, + Expect: nil, + }, + { + Name: "poll_interval_no_units", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.PollInterval = helper.NewDuration(time.Second) + return cfg + }(), + }, + { + Name: "poll_interval_1s", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.PollInterval = helper.NewDuration(time.Second) + return cfg + }(), + }, + { + Name: "poll_interval_1ms", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.PollInterval = helper.NewDuration(time.Millisecond) + return cfg + }(), + }, + { + Name: "poll_interval_1000ms", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.PollInterval = helper.NewDuration(time.Second) + return cfg + }(), + }, + { + Name: "fingerprint_size_no_units", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.FingerprintSize = helper.ByteSize(1000) + return cfg + }(), + }, + { + Name: "fingerprint_size_1kb_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.FingerprintSize = helper.ByteSize(1000) + return cfg + }(), + }, + { + Name: "fingerprint_size_1KB", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.FingerprintSize = helper.ByteSize(1000) + return cfg + }(), + }, + { + Name: "fingerprint_size_1kib_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.FingerprintSize = helper.ByteSize(1024) + return cfg + }(), + }, + { + Name: "fingerprint_size_1KiB", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.FingerprintSize = helper.ByteSize(1024) + return cfg + }(), + }, + { + Name: "fingerprint_size_float", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.FingerprintSize = helper.ByteSize(1100) + return cfg + }(), + }, + { + Name: "include_file_name_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFileName = true + return cfg + }(), + }, + { + Name: "include_file_name_upper", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFileName = true + return cfg + }(), + }, + { + Name: "include_file_name_on", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFileName = true + return cfg + }(), + }, + { + Name: "include_file_name_yes", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFileName = true + return cfg + }(), + }, + { + Name: "include_file_path_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFilePath = true + return cfg + }(), + }, + { + Name: "include_file_path_upper", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFilePath = true + return cfg + }(), + }, + { + Name: "include_file_path_on", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFilePath = true + return cfg + }(), + }, + { + Name: "include_file_path_yes", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFilePath = true + return cfg + }(), + }, + { + Name: "include_file_path_off", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFilePath = false + return cfg + }(), + }, + { + Name: "include_file_path_no", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Include = append(cfg.Include, "one.log") + cfg.IncludeFilePath = false + return cfg + }(), + }, + { + Name: "include_file_path_nonbool", + ExpectErr: true, + Expect: nil, + }, + { + Name: "multiline_line_start_string", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + newMulti := helper.MultilineConfig{} + newMulti.LineStartPattern = "Start" + cfg.Multiline = newMulti + return cfg + }(), + }, + { + Name: "multiline_line_start_special", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + newMulti := helper.MultilineConfig{} + newMulti.LineStartPattern = "%" + cfg.Multiline = newMulti + return cfg + }(), + }, + { + Name: "multiline_line_end_string", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + newMulti := helper.MultilineConfig{} + newMulti.LineEndPattern = "Start" + cfg.Multiline = newMulti + return cfg + }(), + }, + { + Name: "multiline_line_end_special", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + newMulti := helper.MultilineConfig{} + newMulti.LineEndPattern = "%" + cfg.Multiline = newMulti + return cfg + }(), + }, + { + Name: "multiline_random", + ExpectErr: true, + Expect: nil, + }, + { + Name: "start_at_string", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.StartAt = "beginning" + return cfg + }(), + }, + { + Name: "max_concurrent_large", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.MaxConcurrentFiles = 9223372036854775807 + return cfg + }(), + }, + { + Name: "max_log_size_mib_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.MaxLogSize = helper.ByteSize(1048576) + return cfg + }(), + }, + { + Name: "max_log_size_mib_upper", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.MaxLogSize = helper.ByteSize(1048576) + return cfg + }(), + }, + { + Name: "max_log_size_mb_upper", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.MaxLogSize = helper.ByteSize(1048576) + return cfg + }(), + }, + { + Name: "max_log_size_mb_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.MaxLogSize = helper.ByteSize(1048576) + return cfg + }(), + }, + { + Name: "encoding_lower", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Encoding = helper.EncodingConfig{Encoding: "utf-16le"} + return cfg + }(), + }, + { + Name: "encoding_upper", + ExpectErr: false, + Expect: func() *InputConfig { + cfg := defaultCfg() + cfg.Encoding = helper.EncodingConfig{Encoding: "UTF-16lE"} + return cfg + }(), + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + tc.Run(t, defaultCfg()) + }) + } +} + +func defaultCfg() *InputConfig { + return NewInputConfig("file_input") +} + +func NewTestInputConfig() *InputConfig { + cfg := NewInputConfig("config_test") + cfg.WriteTo = entry.Field{} + cfg.Include = []string{"i1", "i2"} + cfg.Exclude = []string{"e1", "e2"} + cfg.Multiline = helper.MultilineConfig{ + LineStartPattern: "start", + LineEndPattern: "end", + } + cfg.FingerprintSize = 1024 + cfg.Encoding = helper.EncodingConfig{Encoding: "utf16"} + return cfg +} diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 27e074b4f..211f7cdef 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -15,7 +15,6 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator/helper" "go.uber.org/zap" - "golang.org/x/text/encoding" ) // InputOperator is an operator that monitors files for entries @@ -41,7 +40,7 @@ type InputOperator struct { fingerprintSize int - encoding encoding.Encoding + encoding helper.Encoding wg sync.WaitGroup readerWg sync.WaitGroup diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 575d0d487..cc124578f 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -149,7 +149,7 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartAndEndPatterns", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -160,7 +160,7 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -170,7 +170,7 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -180,7 +180,7 @@ func TestBuild(t *testing.T) { { "InvalidEncoding", func(f *InputConfig) { - f.Encoding = "UTF-3233" + f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"} }, require.Error, nil, @@ -188,7 +188,7 @@ func TestBuild(t *testing.T) { { "LineStartAndEnd", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -199,15 +199,15 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *InputConfig) { - f.Multiline = &MultilineConfig{} + f.Multiline = helper.MultilineConfig{} }, - require.Error, - nil, + require.NoError, + func(t *testing.T, f *InputOperator) {}, }, { "InvalidLineStartRegex", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineStartPattern: "(", } }, @@ -217,7 +217,7 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineEndPattern: "(", } }, @@ -1293,7 +1293,7 @@ func TestEncodings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() operator, receivedEntries, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { - cfg.Encoding = tc.encoding + cfg.Encoding = helper.EncodingConfig{Encoding: tc.encoding} }, nil) // Popualte the file diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 4be51b46b..dfecfefc2 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -38,7 +38,7 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) ( Path: path, fileInput: f, SugaredLogger: f.SugaredLogger.With("path", path), - decoder: f.encoding.NewDecoder(), + decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), } return r, nil diff --git a/operator/builtin/input/file/testdata/default.yaml b/operator/builtin/input/file/testdata/default.yaml new file mode 100644 index 000000000..d24186c9e --- /dev/null +++ b/operator/builtin/input/file/testdata/default.yaml @@ -0,0 +1 @@ +type: file_input \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/encoding_lower.yaml b/operator/builtin/input/file/testdata/encoding_lower.yaml new file mode 100644 index 000000000..084650232 --- /dev/null +++ b/operator/builtin/input/file/testdata/encoding_lower.yaml @@ -0,0 +1,2 @@ +type: file_input +encoding: "utf-16le" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/encoding_upper.yaml b/operator/builtin/input/file/testdata/encoding_upper.yaml new file mode 100644 index 000000000..9cef31521 --- /dev/null +++ b/operator/builtin/input/file/testdata/encoding_upper.yaml @@ -0,0 +1,2 @@ +type: file_input +encoding: "UTF-16lE" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_glob.yaml b/operator/builtin/input/file/testdata/exclude_glob.yaml new file mode 100644 index 000000000..20707f95d --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_glob.yaml @@ -0,0 +1,5 @@ +type: file_input +include: + - "*.log" +exclude: + - "not*.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_glob_double_asterisk.yaml b/operator/builtin/input/file/testdata/exclude_glob_double_asterisk.yaml new file mode 100644 index 000000000..554ba10e3 --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_glob_double_asterisk.yaml @@ -0,0 +1,5 @@ +type: file_input +include: + - "*.log" +exclude: + - "not**.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_glob_double_asterisk_nested.yaml b/operator/builtin/input/file/testdata/exclude_glob_double_asterisk_nested.yaml new file mode 100644 index 000000000..534c5aba9 --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_glob_double_asterisk_nested.yaml @@ -0,0 +1,5 @@ +type: file_input +include: + - "*.log" +exclude: + - "directory/**/not*.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_glob_double_asterisk_prefix.yaml b/operator/builtin/input/file/testdata/exclude_glob_double_asterisk_prefix.yaml new file mode 100644 index 000000000..ef5b4ae8f --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_glob_double_asterisk_prefix.yaml @@ -0,0 +1,5 @@ +type: file_input +include: + - "*.log" +exclude: + - "**/directory/**/not*.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_inline.yaml b/operator/builtin/input/file/testdata/exclude_inline.yaml new file mode 100644 index 000000000..37729db83 --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_inline.yaml @@ -0,0 +1,3 @@ +type: file_input +include: [ "*.log" ] +exclude: [ "a.log", "b.log" ] \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_invalid.yaml b/operator/builtin/input/file/testdata/exclude_invalid.yaml new file mode 100644 index 000000000..6d9464306 --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_invalid.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - "*.log" +exclude: "aRandomString" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_multi.yaml b/operator/builtin/input/file/testdata/exclude_multi.yaml new file mode 100644 index 000000000..5c77b6772 --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_multi.yaml @@ -0,0 +1,7 @@ +type: file_input +include: + - "*.log" +exclude: + - one.log + - two.log + - three.log \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/exclude_one.yaml b/operator/builtin/input/file/testdata/exclude_one.yaml new file mode 100644 index 000000000..321bcde92 --- /dev/null +++ b/operator/builtin/input/file/testdata/exclude_one.yaml @@ -0,0 +1,5 @@ +type: file_input +include: + - "*.log" +exclude: + - one.log \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/extra_field.yaml b/operator/builtin/input/file/testdata/extra_field.yaml new file mode 100644 index 000000000..675324384 --- /dev/null +++ b/operator/builtin/input/file/testdata/extra_field.yaml @@ -0,0 +1,2 @@ +type: file_input +hello: world \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/fingerprint_size_1KB.yaml b/operator/builtin/input/file/testdata/fingerprint_size_1KB.yaml new file mode 100644 index 000000000..7510e37de --- /dev/null +++ b/operator/builtin/input/file/testdata/fingerprint_size_1KB.yaml @@ -0,0 +1,2 @@ +type: file_input +fingerprint_size: 1KB \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/fingerprint_size_1KiB.yaml b/operator/builtin/input/file/testdata/fingerprint_size_1KiB.yaml new file mode 100644 index 000000000..2d7d5b03d --- /dev/null +++ b/operator/builtin/input/file/testdata/fingerprint_size_1KiB.yaml @@ -0,0 +1,2 @@ +type: file_input +fingerprint_size: 1KiB \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/fingerprint_size_1kb_lower.yaml b/operator/builtin/input/file/testdata/fingerprint_size_1kb_lower.yaml new file mode 100644 index 000000000..a274a1abf --- /dev/null +++ b/operator/builtin/input/file/testdata/fingerprint_size_1kb_lower.yaml @@ -0,0 +1,2 @@ +type: file_input +fingerprint_size: 1kb \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/fingerprint_size_1kib_lower.yaml b/operator/builtin/input/file/testdata/fingerprint_size_1kib_lower.yaml new file mode 100644 index 000000000..106433bc6 --- /dev/null +++ b/operator/builtin/input/file/testdata/fingerprint_size_1kib_lower.yaml @@ -0,0 +1,2 @@ +type: file_input +fingerprint_size: 1kib \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/fingerprint_size_float.yaml b/operator/builtin/input/file/testdata/fingerprint_size_float.yaml new file mode 100644 index 000000000..18f601790 --- /dev/null +++ b/operator/builtin/input/file/testdata/fingerprint_size_float.yaml @@ -0,0 +1,2 @@ +type: file_input +fingerprint_size: 1.1kb \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/fingerprint_size_no_units.yaml b/operator/builtin/input/file/testdata/fingerprint_size_no_units.yaml new file mode 100644 index 000000000..ecc8d4cd6 --- /dev/null +++ b/operator/builtin/input/file/testdata/fingerprint_size_no_units.yaml @@ -0,0 +1,2 @@ +type: file_input +fingerprint_size: 1000 \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/id_custom.yaml b/operator/builtin/input/file/testdata/id_custom.yaml new file mode 100644 index 000000000..eaf3e7ec0 --- /dev/null +++ b/operator/builtin/input/file/testdata/id_custom.yaml @@ -0,0 +1,2 @@ +type: file_input +id: test_id \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_name_lower.yaml b/operator/builtin/input/file/testdata/include_file_name_lower.yaml new file mode 100644 index 000000000..943b25c92 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_name_lower.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_name: true \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_name_on.yaml b/operator/builtin/input/file/testdata/include_file_name_on.yaml new file mode 100644 index 000000000..737760a0c --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_name_on.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_name: on \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_name_upper.yaml b/operator/builtin/input/file/testdata/include_file_name_upper.yaml new file mode 100644 index 000000000..80c35088d --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_name_upper.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_name: TRUE \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_name_yes.yaml b/operator/builtin/input/file/testdata/include_file_name_yes.yaml new file mode 100644 index 000000000..97ee4d9e0 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_name_yes.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_name: yes \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_lower.yaml b/operator/builtin/input/file/testdata/include_file_path_lower.yaml new file mode 100644 index 000000000..300866997 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_lower.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: true \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_no.yaml b/operator/builtin/input/file/testdata/include_file_path_no.yaml new file mode 100644 index 000000000..58586568f --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_no.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: no \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_nonbool.yaml b/operator/builtin/input/file/testdata/include_file_path_nonbool.yaml new file mode 100644 index 000000000..69fead0c7 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_nonbool.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: asdf \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_off.yaml b/operator/builtin/input/file/testdata/include_file_path_off.yaml new file mode 100644 index 000000000..810d36d70 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_off.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: off \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_on.yaml b/operator/builtin/input/file/testdata/include_file_path_on.yaml new file mode 100644 index 000000000..192991b86 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_on.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: on \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_upper.yaml b/operator/builtin/input/file/testdata/include_file_path_upper.yaml new file mode 100644 index 000000000..8d18b9f36 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_upper.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: TRUE \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_file_path_yes.yaml b/operator/builtin/input/file/testdata/include_file_path_yes.yaml new file mode 100644 index 000000000..c27474575 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_file_path_yes.yaml @@ -0,0 +1,4 @@ +type: file_input +include: + - one.log +include_file_path: yes \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_glob.yaml b/operator/builtin/input/file/testdata/include_glob.yaml new file mode 100644 index 000000000..ae029c549 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_glob.yaml @@ -0,0 +1,3 @@ +type: file_input +include: + - "*.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_glob_double_asterisk.yaml b/operator/builtin/input/file/testdata/include_glob_double_asterisk.yaml new file mode 100644 index 000000000..7686a592a --- /dev/null +++ b/operator/builtin/input/file/testdata/include_glob_double_asterisk.yaml @@ -0,0 +1,3 @@ +type: file_input +include: + - "**.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_glob_double_asterisk_nested.yaml b/operator/builtin/input/file/testdata/include_glob_double_asterisk_nested.yaml new file mode 100644 index 000000000..57b10f671 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_glob_double_asterisk_nested.yaml @@ -0,0 +1,3 @@ +type: file_input +include: + - "directory/**/*.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_glob_double_asterisk_prefix.yaml b/operator/builtin/input/file/testdata/include_glob_double_asterisk_prefix.yaml new file mode 100644 index 000000000..675a7460a --- /dev/null +++ b/operator/builtin/input/file/testdata/include_glob_double_asterisk_prefix.yaml @@ -0,0 +1,3 @@ +type: file_input +include: + - "**/directory/**/*.log" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_inline.yaml b/operator/builtin/input/file/testdata/include_inline.yaml new file mode 100644 index 000000000..31dfcd393 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_inline.yaml @@ -0,0 +1,2 @@ +type: file_input +include: [ "a.log", "b.log" ] \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_invalid.yaml b/operator/builtin/input/file/testdata/include_invalid.yaml new file mode 100644 index 000000000..ccdbe8412 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_invalid.yaml @@ -0,0 +1,2 @@ +type: file_input +include: "justwords" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_multi.yaml b/operator/builtin/input/file/testdata/include_multi.yaml new file mode 100644 index 000000000..7349f4966 --- /dev/null +++ b/operator/builtin/input/file/testdata/include_multi.yaml @@ -0,0 +1,5 @@ +type: file_input +include: + - one.log + - two.log + - three.log \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/include_one.yaml b/operator/builtin/input/file/testdata/include_one.yaml new file mode 100644 index 000000000..d99b2b6cf --- /dev/null +++ b/operator/builtin/input/file/testdata/include_one.yaml @@ -0,0 +1,3 @@ +type: file_input +include: + - one.log \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/max_concurrent_large.yaml b/operator/builtin/input/file/testdata/max_concurrent_large.yaml new file mode 100644 index 000000000..09e1ae7fc --- /dev/null +++ b/operator/builtin/input/file/testdata/max_concurrent_large.yaml @@ -0,0 +1,2 @@ +type: file_input +max_concurrent_files: 9223372036854775807 \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/max_log_size_invalid_unit.yaml b/operator/builtin/input/file/testdata/max_log_size_invalid_unit.yaml new file mode 100644 index 000000000..a31e6bf0d --- /dev/null +++ b/operator/builtin/input/file/testdata/max_log_size_invalid_unit.yaml @@ -0,0 +1,2 @@ +type: file_input +max_log_size: 1TOFU \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/max_log_size_mb_lower.yaml b/operator/builtin/input/file/testdata/max_log_size_mb_lower.yaml new file mode 100644 index 000000000..63713a20e --- /dev/null +++ b/operator/builtin/input/file/testdata/max_log_size_mb_lower.yaml @@ -0,0 +1,2 @@ +type: file_input +max_log_size: 1mib \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/max_log_size_mb_upper.yaml b/operator/builtin/input/file/testdata/max_log_size_mb_upper.yaml new file mode 100644 index 000000000..f66c30cfa --- /dev/null +++ b/operator/builtin/input/file/testdata/max_log_size_mb_upper.yaml @@ -0,0 +1,2 @@ +type: file_input +max_log_size: 1MiB \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/max_log_size_mib_lower.yaml b/operator/builtin/input/file/testdata/max_log_size_mib_lower.yaml new file mode 100644 index 000000000..63713a20e --- /dev/null +++ b/operator/builtin/input/file/testdata/max_log_size_mib_lower.yaml @@ -0,0 +1,2 @@ +type: file_input +max_log_size: 1mib \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/max_log_size_mib_upper.yaml b/operator/builtin/input/file/testdata/max_log_size_mib_upper.yaml new file mode 100644 index 000000000..f66c30cfa --- /dev/null +++ b/operator/builtin/input/file/testdata/max_log_size_mib_upper.yaml @@ -0,0 +1,2 @@ +type: file_input +max_log_size: 1MiB \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/multiline_extra_field.yaml b/operator/builtin/input/file/testdata/multiline_extra_field.yaml new file mode 100644 index 000000000..0a1296889 --- /dev/null +++ b/operator/builtin/input/file/testdata/multiline_extra_field.yaml @@ -0,0 +1,3 @@ +type: file_input +multiline: + that_random_field: "this should go nowhere" \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/multiline_line_end_special.yaml b/operator/builtin/input/file/testdata/multiline_line_end_special.yaml new file mode 100644 index 000000000..5dc38b67e --- /dev/null +++ b/operator/builtin/input/file/testdata/multiline_line_end_special.yaml @@ -0,0 +1,3 @@ +type: file_input +multiline: + line_end_pattern: '%' \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/multiline_line_end_string.yaml b/operator/builtin/input/file/testdata/multiline_line_end_string.yaml new file mode 100644 index 000000000..1437a4c21 --- /dev/null +++ b/operator/builtin/input/file/testdata/multiline_line_end_string.yaml @@ -0,0 +1,3 @@ +type: file_input +multiline: + line_end_pattern: 'Start' \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/multiline_line_start_special.yaml b/operator/builtin/input/file/testdata/multiline_line_start_special.yaml new file mode 100644 index 000000000..be7dafdf9 --- /dev/null +++ b/operator/builtin/input/file/testdata/multiline_line_start_special.yaml @@ -0,0 +1,3 @@ +type: file_input +multiline: + line_start_pattern: '%' \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/multiline_line_start_string.yaml b/operator/builtin/input/file/testdata/multiline_line_start_string.yaml new file mode 100644 index 000000000..ce9744a82 --- /dev/null +++ b/operator/builtin/input/file/testdata/multiline_line_start_string.yaml @@ -0,0 +1,3 @@ +type: file_input +multiline: + line_start_pattern: 'Start' \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/poll_interval_1000ms.yaml b/operator/builtin/input/file/testdata/poll_interval_1000ms.yaml new file mode 100644 index 000000000..e66665e23 --- /dev/null +++ b/operator/builtin/input/file/testdata/poll_interval_1000ms.yaml @@ -0,0 +1,2 @@ +type: file_input +poll_interval: 1000ms \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/poll_interval_1ms.yaml b/operator/builtin/input/file/testdata/poll_interval_1ms.yaml new file mode 100644 index 000000000..4dc4a0ec0 --- /dev/null +++ b/operator/builtin/input/file/testdata/poll_interval_1ms.yaml @@ -0,0 +1,2 @@ +type: file_input +poll_interval: 1ms \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/poll_interval_1s.yaml b/operator/builtin/input/file/testdata/poll_interval_1s.yaml new file mode 100644 index 000000000..7b47fa569 --- /dev/null +++ b/operator/builtin/input/file/testdata/poll_interval_1s.yaml @@ -0,0 +1,2 @@ +type: file_input +poll_interval: 1s \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/poll_interval_no_units.yaml b/operator/builtin/input/file/testdata/poll_interval_no_units.yaml new file mode 100644 index 000000000..6eb85d0f5 --- /dev/null +++ b/operator/builtin/input/file/testdata/poll_interval_no_units.yaml @@ -0,0 +1,2 @@ +type: file_input +poll_interval: 1 \ No newline at end of file diff --git a/operator/builtin/input/file/testdata/start_at_string.yaml b/operator/builtin/input/file/testdata/start_at_string.yaml new file mode 100644 index 000000000..e11f24d2f --- /dev/null +++ b/operator/builtin/input/file/testdata/start_at_string.yaml @@ -0,0 +1,2 @@ +type: file_input +start_at: "beginning" \ No newline at end of file diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go new file mode 100644 index 000000000..0d8d0c762 --- /dev/null +++ b/operator/helper/encoding.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "fmt" + "strings" + + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/ianaindex" + "golang.org/x/text/encoding/unicode" + "golang.org/x/text/transform" + + "github.com/observiq/stanza/operator" +) + +// NewBasicConfig creates a new Encoding config +func NewEncodingConfig() EncodingConfig { + return EncodingConfig{ + Encoding: "nop", + } +} + +// EncodingConfig is the configuration of a Encoding helper +type EncodingConfig struct { + Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"` +} + +// Build will build an Encoding operator. +func (c EncodingConfig) Build(context operator.BuildContext) (Encoding, error) { + enc, err := lookupEncoding(c.Encoding) + if err != nil { + return Encoding{}, err + } + + return Encoding{ + Encoding: enc, + }, nil +} + +type Encoding struct { + Encoding encoding.Encoding +} + +// decode converts the bytes in msgBuf to utf-8 from the configured encoding +func (e *Encoding) Decode(msgBuf []byte) (string, error) { + decodeBuffer := make([]byte, 1<<12) + decoder := e.Encoding.NewDecoder() + + for { + decoder.Reset() + nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true) + if err == nil { + return string(decodeBuffer[:nDst]), nil + } + if err == transform.ErrShortDst { + decodeBuffer = make([]byte, len(decodeBuffer)*2) + continue + } + return "", fmt.Errorf("transform encoding: %s", err) + } +} + +var encodingOverrides = map[string]encoding.Encoding{ + "utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), + "utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), + "utf8": unicode.UTF8, + "ascii": unicode.UTF8, + "us-ascii": unicode.UTF8, + "nop": encoding.Nop, + "": encoding.Nop, +} + +func lookupEncoding(enc string) (encoding.Encoding, error) { + if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok { + return encoding, nil + } + encoding, err := ianaindex.IANA.Encoding(enc) + if err != nil { + return nil, fmt.Errorf("unsupported encoding '%s'", enc) + } + if encoding == nil { + return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc) + } + return encoding, nil +} diff --git a/operator/builtin/input/file/line_splitter.go b/operator/helper/multiline.go similarity index 56% rename from operator/builtin/input/file/line_splitter.go rename to operator/helper/multiline.go index 440216bc1..cd882b070 100644 --- a/operator/builtin/input/file/line_splitter.go +++ b/operator/helper/multiline.go @@ -1,16 +1,65 @@ -package file +package helper import ( "bufio" "bytes" + "fmt" "regexp" + "github.com/observiq/stanza/operator" + "golang.org/x/text/encoding" ) +// NewBasicConfig creates a new Multiline config +func NewMultilineConfig() MultilineConfig { + return MultilineConfig{ + LineStartPattern: "", + LineEndPattern: "", + } +} + +// MultilineConfig is the configuration of a multiline helper +type MultilineConfig struct { + LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"` + LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"` +} + +// Build will build a Multiline operator. +func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { + return c.getSplitFunc(encoding, flushAtEOF) +} + +// getSplitFunc returns split function for bufio.Scanner basing on configured pattern +func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { + endPattern := c.LineEndPattern + startPattern := c.LineStartPattern + + switch { + case endPattern != "" && startPattern != "": + return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") + case endPattern == "" && startPattern == "": + return NewNewlineSplitFunc(encoding, flushAtEOF) + case endPattern != "": + re, err := regexp.Compile("(?m)" + c.LineEndPattern) + if err != nil { + return nil, fmt.Errorf("compile line end regex: %s", err) + } + return NewLineEndSplitFunc(re, flushAtEOF), nil + case startPattern != "": + re, err := regexp.Compile("(?m)" + c.LineStartPattern) + if err != nil { + return nil, fmt.Errorf("compile line start regex: %s", err) + } + return NewLineStartSplitFunc(re, flushAtEOF), nil + default: + return nil, fmt.Errorf("unreachable") + } +} + // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { @@ -31,6 +80,11 @@ func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { return 0, nil, nil } + // Flush if no more data is expected + if atEOF && flushAtEOF { + return len(data), data, nil + } + secondLocOffset := firstMatchEnd + 1 secondLoc := re.FindIndex(data[secondLocOffset:]) if secondLoc == nil { @@ -47,10 +101,14 @@ func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc { +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { + // Flush if no more data is expected + if len(data) != 0 && atEOF && flushAtEOF { + return len(data), data, nil + } return 0, nil, nil // read more data and try again } @@ -69,7 +127,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc { // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { +func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { newline, err := encodedNewline(encoding) if err != nil { return nil, err @@ -90,6 +148,11 @@ func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil } + // Flush if no more data is expected + if atEOF && flushAtEOF { + return len(data), data, nil + } + // Request more data. return 0, nil, nil }, nil diff --git a/operator/builtin/input/file/line_splitter_test.go b/operator/helper/multiline_test.go similarity index 96% rename from operator/builtin/input/file/line_splitter_test.go rename to operator/helper/multiline_test.go index c0c31a642..fd58b04b8 100644 --- a/operator/builtin/input/file/line_splitter_test.go +++ b/operator/helper/multiline_test.go @@ -1,4 +1,4 @@ -package file +package helper import ( "bufio" @@ -123,17 +123,16 @@ func TestLineStartSplitFunc(t *testing.T) { } for _, tc := range testCases { - cfg := NewInputConfig("") - cfg.Multiline = &MultilineConfig{ + cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART")) + splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -232,11 +231,10 @@ func TestLineEndSplitFunc(t *testing.T) { } for _, tc := range testCases { - cfg := NewInputConfig("") - cfg.Multiline = &MultilineConfig{ + cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -314,7 +312,7 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8) + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -367,7 +365,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding) + splitFunc, err := NewNewlineSplitFunc(tc.encoding, false) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) diff --git a/operator/helper/operatortest/operatortest.go b/operator/helper/operatortest/operatortest.go new file mode 100644 index 000000000..c48ef6939 --- /dev/null +++ b/operator/helper/operatortest/operatortest.go @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package operatortest + +import ( + "fmt" + "io/ioutil" + "path" + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +// ConfigUnmarshalTest is used for testing golden configs +type ConfigUnmarshalTest struct { + Name string + Expect interface{} + ExpectErr bool +} + +func configFromFileViaYaml(file string, config interface{}) error { + bytes, err := ioutil.ReadFile(file) + if err != nil { + return fmt.Errorf("could not find config file: %s", err) + } + if err := yaml.Unmarshal(bytes, config); err != nil { + return fmt.Errorf("failed to read config file as yaml: %s", err) + } + + return nil +} + +// Run Unmarshalls yaml files and compares them against the expected. +func (c ConfigUnmarshalTest) Run(t *testing.T, config interface{}) { + yamlConfig := config + yamlErr := configFromFileViaYaml(path.Join(".", "testdata", fmt.Sprintf("%s.yaml", c.Name)), yamlConfig) + + if c.ExpectErr { + require.Error(t, yamlErr) + } else { + require.NoError(t, yamlErr) + require.Equal(t, c.Expect, yamlConfig) + } +}