From 349fde7decbbdc543fcce0653b48bb69bdb5f4e9 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 4 May 2021 13:08:31 +0200 Subject: [PATCH 01/11] extract multiline from file input and move to helper Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 60 ++++------------- operator/builtin/input/file/config_test.go | 10 +-- operator/builtin/input/file/file_test.go | 18 ++--- .../line_splitter.go => helper/multiline.go} | 65 ++++++++++++++++++- .../multiline_test.go} | 8 +-- 5 files changed, 92 insertions(+), 69 deletions(-) rename operator/{builtin/input/file/line_splitter.go => helper/multiline.go} (65%) rename operator/{builtin/input/file/line_splitter_test.go => helper/multiline_test.go} (98%) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index fbac620d..e2ea4f98 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -15,9 +15,7 @@ package file import ( - "bufio" "fmt" - "regexp" "strings" "time" @@ -61,21 +59,15 @@ type InputConfig struct { Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline *MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` - Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"` -} - -// MultilineConfig is the configuration a multiline operation -type MultilineConfig struct { - LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"` - LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"` + PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` + Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -124,7 +116,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - splitFunc, err := c.getSplitFunc(encoding) + multiline, err := c.Multiline.Build(context, encoding) if err != nil { return nil, err } @@ -153,7 +145,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, InputOperator: inputOperator, Include: c.Include, Exclude: c.Exclude, - SplitFunc: splitFunc, + SplitFunc: multiline.SplitFunc, PollInterval: c.PollInterval.Raw(), FilePathField: filePathField, FileNameField: fileNameField, @@ -195,33 +187,3 @@ func lookupEncoding(enc string) (encoding.Encoding, error) { } return encoding, nil } - -// getSplitFunc will return the split function associated the configured mode. -func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { - if c.Multiline == nil { - return NewNewlineSplitFunc(encoding) - } - endPattern := c.Multiline.LineEndPattern - startPattern := c.Multiline.LineStartPattern - - switch { - case endPattern != "" && startPattern != "": - return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") - case endPattern == "" && startPattern == "": - return nil, fmt.Errorf("one of line_start_pattern or line_end_pattern must be set") - case endPattern != "": - re, err := regexp.Compile("(?m)" + c.Multiline.LineEndPattern) - if err != nil { - return nil, fmt.Errorf("compile line end regex: %s", err) - } - return NewLineEndSplitFunc(re), nil - case startPattern != "": - re, err := regexp.Compile("(?m)" + c.Multiline.LineStartPattern) - if err != nil { - return nil, fmt.Errorf("compile line start regex: %s", err) - } - return NewLineStartSplitFunc(re), nil - default: - return nil, fmt.Errorf("unreachable") - } -} diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index d98ba12b..4587d39e 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -387,7 +387,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := new(MultilineConfig) + newMulti := helper.MultilineConfig{} newMulti.LineStartPattern = "Start" cfg.Multiline = newMulti return cfg @@ -398,7 +398,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := new(MultilineConfig) + newMulti := helper.MultilineConfig{} newMulti.LineStartPattern = "%" cfg.Multiline = newMulti return cfg @@ -409,7 +409,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := new(MultilineConfig) + newMulti := helper.MultilineConfig{} newMulti.LineEndPattern = "Start" cfg.Multiline = newMulti return cfg @@ -420,7 +420,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := new(MultilineConfig) + newMulti := helper.MultilineConfig{} newMulti.LineEndPattern = "%" cfg.Multiline = newMulti return cfg @@ -521,7 +521,7 @@ func NewTestInputConfig() *InputConfig { cfg.WriteTo = entry.NewBodyField([]string{}...) cfg.Include = []string{"i1", "i2"} cfg.Exclude = []string{"e1", "e2"} - cfg.Multiline = &MultilineConfig{"start", "end"} + cfg.Multiline = helper.MultilineConfig{"start", "end"} cfg.FingerprintSize = 1024 cfg.Encoding = "utf16" return cfg diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 36302d89..d2b1b14f 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -164,7 +164,7 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartAndEndPatterns", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -175,7 +175,7 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -185,7 +185,7 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -203,7 +203,7 @@ func TestBuild(t *testing.T) { { "LineStartAndEnd", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -214,15 +214,15 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *InputConfig) { - f.Multiline = &MultilineConfig{} + f.Multiline = helper.MultilineConfig{} }, - require.Error, - nil, + require.NoError, + func(t *testing.T, f *InputOperator) {}, }, { "InvalidLineStartRegex", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineStartPattern: "(", } }, @@ -232,7 +232,7 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *InputConfig) { - f.Multiline = &MultilineConfig{ + f.Multiline = helper.MultilineConfig{ LineEndPattern: "(", } }, diff --git a/operator/builtin/input/file/line_splitter.go b/operator/helper/multiline.go similarity index 65% rename from operator/builtin/input/file/line_splitter.go rename to operator/helper/multiline.go index 63b3a07c..5fb4d879 100644 --- a/operator/builtin/input/file/line_splitter.go +++ b/operator/helper/multiline.go @@ -12,16 +12,79 @@ // See the License for the specific language governing permissions and // limitations under the License. -package file +package helper import ( "bufio" "bytes" + "fmt" "regexp" + "github.com/open-telemetry/opentelemetry-log-collection/operator" "golang.org/x/text/encoding" ) +// NewBasicConfig creates a new Multiline config +func NewMultilineConfig() MultilineConfig { + return MultilineConfig{ + LineStartPattern: "", + LineEndPattern: "", + } +} + +// MultilineConfig is the configuration of a multiline helper +type MultilineConfig struct { + LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"` + LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"` +} + +// Build will build a Multiline operator. +func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding) (MultilineOperator, error) { + + splitFunc, err := c.getSplitFunc(encoding) + if err != nil { + return MultilineOperator{}, err + } + + operator := MultilineOperator{ + SplitFunc: splitFunc, + } + + return operator, nil +} + +// MultilineOperator is an operator which handle SplitFunc for bufio.Scanner +type MultilineOperator struct { + SplitFunc bufio.SplitFunc +} + +// getSplitFunc returns split function for bufio.Scanner basing on configured pattern +func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { + endPattern := c.LineEndPattern + startPattern := c.LineStartPattern + + switch { + case endPattern != "" && startPattern != "": + return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") + case endPattern == "" && startPattern == "": + return NewNewlineSplitFunc(encoding) + case endPattern != "": + re, err := regexp.Compile("(?m)" + c.LineEndPattern) + if err != nil { + return nil, fmt.Errorf("compile line end regex: %s", err) + } + return NewLineEndSplitFunc(re), nil + case startPattern != "": + re, err := regexp.Compile("(?m)" + c.LineStartPattern) + if err != nil { + return nil, fmt.Errorf("compile line start regex: %s", err) + } + return NewLineStartSplitFunc(re), nil + default: + return nil, fmt.Errorf("unreachable") + } +} + // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { diff --git a/operator/builtin/input/file/line_splitter_test.go b/operator/helper/multiline_test.go similarity index 98% rename from operator/builtin/input/file/line_splitter_test.go rename to operator/helper/multiline_test.go index 1031b5c8..249ff756 100644 --- a/operator/builtin/input/file/line_splitter_test.go +++ b/operator/helper/multiline_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package file +package helper import ( "bufio" @@ -137,8 +137,7 @@ func TestLineStartSplitFunc(t *testing.T) { } for _, tc := range testCases { - cfg := NewInputConfig("") - cfg.Multiline = &MultilineConfig{ + cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } splitFunc, err := cfg.getSplitFunc(unicode.UTF8) @@ -246,8 +245,7 @@ func TestLineEndSplitFunc(t *testing.T) { } for _, tc := range testCases { - cfg := NewInputConfig("") - cfg.Multiline = &MultilineConfig{ + cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } splitFunc, err := cfg.getSplitFunc(unicode.UTF8) From c52428a4a74850706f8cd14cad05874fd401959a Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 4 May 2021 14:06:29 +0200 Subject: [PATCH 02/11] extract encoding from file input and move to helper Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 34 +---------- operator/builtin/input/file/config_test.go | 6 +- operator/builtin/input/file/file_test.go | 4 +- operator/helper/encoding.go | 66 ++++++++++++++++++++++ 4 files changed, 74 insertions(+), 36 deletions(-) create mode 100644 operator/helper/encoding.go diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index e2ea4f98..30ae9557 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -16,13 +16,9 @@ package file import ( "fmt" - "strings" "time" "github.com/bmatcuk/doublestar/v3" - "golang.org/x/text/encoding" - "golang.org/x/text/encoding/ianaindex" - "golang.org/x/text/encoding/unicode" "github.com/open-telemetry/opentelemetry-log-collection/entry" "github.com/open-telemetry/opentelemetry-log-collection/operator" @@ -48,7 +44,7 @@ func NewInputConfig(operatorID string) *InputConfig { StartAt: "end", MaxLogSize: defaultMaxLogSize, MaxConcurrentFiles: defaultMaxConcurrentFiles, - Encoding: "nop", + Encoding: helper.NewEncodingConfig(), } } @@ -67,7 +63,7 @@ type InputConfig struct { FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` - Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"` + Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -111,7 +107,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", minFingerprintSize) } - encoding, err := lookupEncoding(c.Encoding) + encoding, err := c.Encoding.Build(context) if err != nil { return nil, err } @@ -163,27 +159,3 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return []operator.Operator{op}, nil } - -var encodingOverrides = map[string]encoding.Encoding{ - "utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), - "utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), - "utf8": unicode.UTF8, - "ascii": unicode.UTF8, - "us-ascii": unicode.UTF8, - "nop": encoding.Nop, - "": encoding.Nop, -} - -func lookupEncoding(enc string) (encoding.Encoding, error) { - if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok { - return encoding, nil - } - encoding, err := ianaindex.IANA.Encoding(enc) - if err != nil { - return nil, fmt.Errorf("unsupported encoding '%s'", enc) - } - if encoding == nil { - return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc) - } - return encoding, nil -} diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index 4587d39e..49740f44 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -490,7 +490,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - cfg.Encoding = "utf-16le" + cfg.Encoding = helper.EncodingConfig{"utf-16le"} return cfg }(), }, @@ -499,7 +499,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - cfg.Encoding = "UTF-16lE" + cfg.Encoding = helper.EncodingConfig{"UTF-16lE"} return cfg }(), }, @@ -523,7 +523,7 @@ func NewTestInputConfig() *InputConfig { cfg.Exclude = []string{"e1", "e2"} cfg.Multiline = helper.MultilineConfig{"start", "end"} cfg.FingerprintSize = 1024 - cfg.Encoding = "utf16" + cfg.Encoding = helper.EncodingConfig{"utf16"} return cfg } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index d2b1b14f..810f1cb0 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -195,7 +195,7 @@ func TestBuild(t *testing.T) { { "InvalidEncoding", func(f *InputConfig) { - f.Encoding = "UTF-3233" + f.Encoding = helper.EncodingConfig{"UTF-3233"} }, require.Error, nil, @@ -1288,7 +1288,7 @@ func TestEncodings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() operator, receivedEntries, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { - cfg.Encoding = tc.encoding + cfg.Encoding = helper.EncodingConfig{tc.encoding} }, nil) // Popualte the file diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go new file mode 100644 index 00000000..193a2906 --- /dev/null +++ b/operator/helper/encoding.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "fmt" + "strings" + + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/ianaindex" + "golang.org/x/text/encoding/unicode" +) + +// NewBasicConfig creates a new Encoding config +func NewEncodingConfig() EncodingConfig { + return EncodingConfig{ + Encoding: "nop", + } +} + +// EncodingConfig is the configuration of a Encoding helper +type EncodingConfig struct { + Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"` +} + +// Build will build a Multiline operator. +func (c EncodingConfig) Build(context operator.BuildContext) (encoding.Encoding, error) { + return lookupEncoding(c.Encoding) +} + +var encodingOverrides = map[string]encoding.Encoding{ + "utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), + "utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), + "utf8": unicode.UTF8, + "ascii": unicode.UTF8, + "us-ascii": unicode.UTF8, + "nop": encoding.Nop, + "": encoding.Nop, +} + +func lookupEncoding(enc string) (encoding.Encoding, error) { + if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok { + return encoding, nil + } + encoding, err := ianaindex.IANA.Encoding(enc) + if err != nil { + return nil, fmt.Errorf("unsupported encoding '%s'", enc) + } + if encoding == nil { + return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc) + } + return encoding, nil +} From 6616f8c4af3d66c78e20c0847c8590cf6b5a6b28 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 4 May 2021 14:44:13 +0200 Subject: [PATCH 03/11] tcp: add multiline Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config_test.go | 11 +++++++---- operator/builtin/input/file/file_test.go | 4 ++-- operator/builtin/input/tcp/tcp.go | 23 ++++++++++++++++++++++ 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index 49740f44..172d6e49 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -490,7 +490,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - cfg.Encoding = helper.EncodingConfig{"utf-16le"} + cfg.Encoding = helper.EncodingConfig{Encoding: "utf-16le"} return cfg }(), }, @@ -499,7 +499,7 @@ func TestConfig(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - cfg.Encoding = helper.EncodingConfig{"UTF-16lE"} + cfg.Encoding = helper.EncodingConfig{Encoding: "UTF-16lE"} return cfg }(), }, @@ -521,9 +521,12 @@ func NewTestInputConfig() *InputConfig { cfg.WriteTo = entry.NewBodyField([]string{}...) cfg.Include = []string{"i1", "i2"} cfg.Exclude = []string{"e1", "e2"} - cfg.Multiline = helper.MultilineConfig{"start", "end"} + cfg.Multiline = helper.MultilineConfig{ + LineStartPattern: "start", + LineEndPattern: "end", + } cfg.FingerprintSize = 1024 - cfg.Encoding = helper.EncodingConfig{"utf16"} + cfg.Encoding = helper.EncodingConfig{Encoding: "utf16"} return cfg } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 810f1cb0..46583f2b 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -195,7 +195,7 @@ func TestBuild(t *testing.T) { { "InvalidEncoding", func(f *InputConfig) { - f.Encoding = helper.EncodingConfig{"UTF-3233"} + f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"} }, require.Error, nil, @@ -1288,7 +1288,7 @@ func TestEncodings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() operator, receivedEntries, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { - cfg.Encoding = helper.EncodingConfig{tc.encoding} + cfg.Encoding = helper.EncodingConfig{Encoding: tc.encoding} }, nil) // Popualte the file diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 4b4b2437..9b72faa6 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -26,6 +26,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-log-collection/operator" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" @@ -49,6 +50,8 @@ func init() { func NewTCPInputConfig(operatorID string) *TCPInputConfig { return &TCPInputConfig{ InputConfig: helper.NewInputConfig(operatorID, "tcp_input"), + Multiline: helper.NewMultilineConfig(), + Encoding: helper.NewEncodingConfig(), } } @@ -60,6 +63,8 @@ type TCPInputConfig struct { ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` TLS *helper.TLSServerConfig `json:"tls,omitempty" yaml:"tls,omitempty"` AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` + Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` } // Build will build a tcp input operator. @@ -88,11 +93,23 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, fmt.Errorf("failed to resolve listen_address: %s", err) } + encoding, err := c.Encoding.Build(context) + if err != nil { + return nil, err + } + + multiline, err := c.Multiline.Build(context, encoding) + if err != nil { + return nil, err + } + tcpInput := &TCPInput{ InputOperator: inputOperator, address: c.ListenAddress, maxBufferSize: int(c.MaxBufferSize), addAttributes: c.AddAttributes, + encoding: encoding, + splitFunc: multiline.SplitFunc, } if c.TLS != nil { @@ -116,6 +133,9 @@ type TCPInput struct { cancel context.CancelFunc wg sync.WaitGroup tls *tls.Config + + encoding encoding.Encoding + splitFunc bufio.SplitFunc } // Start will start listening for log entries over tcp. @@ -204,6 +224,9 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c buf := make([]byte, 0, 64*1024) scanner := bufio.NewScanner(conn) scanner.Buffer(buf, t.maxBufferSize*1024) + + scanner.Split(t.splitFunc) + for scanner.Scan() { entry, err := t.NewEntry(scanner.Text()) if err != nil { From e2623afc925fe8f8f3a7355af022126c751b1100 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Wed, 5 May 2021 09:04:02 +0200 Subject: [PATCH 04/11] Rename max_buffer_size to max_log_size --- operator/builtin/input/tcp/tcp.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 9b72faa6..480e0ef7 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -59,10 +59,10 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig { type TCPInputConfig struct { helper.InputConfig `yaml:",inline"` - MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"` - ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` - TLS *helper.TLSServerConfig `json:"tls,omitempty" yaml:"tls,omitempty"` - AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` + MaxBufferSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty" json:"listen_address,omitempty" yaml:"listen_address,omitempty"` + TLS *helper.TLSServerConfig `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"` + AddAttributes bool `mapstructure:"add_attributes,omitempty" json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` } @@ -81,7 +81,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato } if c.MaxBufferSize < minBufferSize { - return nil, fmt.Errorf("invalid value for parameter 'max_buffer_size', must be equal to or greater than %d bytes", minBufferSize) + return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minBufferSize) } if c.ListenAddress == "" { From e92cd221e02e8092ff6322c701a0d2441092e276 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Wed, 5 May 2021 09:18:07 +0200 Subject: [PATCH 05/11] Update tcp operator documentation --- docs/operators/tcp_input.md | 61 +++++++++++++++++++++++++----------- operator/helper/encoding.go | 3 +- operator/helper/multiline.go | 4 +-- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/docs/operators/tcp_input.md b/docs/operators/tcp_input.md index 493fb61a..d5dce211 100644 --- a/docs/operators/tcp_input.md +++ b/docs/operators/tcp_input.md @@ -4,43 +4,67 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `tcp_input` | A unique identifier for the operator | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | -| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input | -| `listen_address` | required | A listen address of the form `:` | -| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) | -| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | - +| Field | Default | Description | +| --- | --- | --- | +| `id` | `tcp_input` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory | +| `listen_address` | required | A listen address of the form `:` | +| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) | +| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | +| `multiline` | | A `multiline` configuration block. See below for details | +| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options | #### TLS Configuration The `tcp_input` operator supports TLS, disabled by default. config more detail [opentelemetry-collector#configtls](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/configtls#tls-configuration-settings). -| Field | Default | Description | -| --- | --- | --- | -| `cert_file` | | Path to the TLS cert to use for TLS required connections. | -| `key_file` | | Path to the TLS key to use for TLS required connections. | -| `ca_file` | | Path to the CA cert. For a client this verifies the server certificate. For a server this verifies client certificates. If empty uses system root CA. | -| `client_ca_file` | | Path to the TLS cert to use by the server to verify a client certificate. (optional) | +| Field | Default | Description | +| --- | --- | --- | +| `cert_file` | | Path to the TLS cert to use for TLS required connections. | +| `key_file` | | Path to the TLS key to use for TLS required connections. | +| `ca_file` | | Path to the CA cert. For a client this verifies the server certificate. For a server this verifies client certificates. If empty uses system root CA. | +| `client_ca_file` | | Path to the TLS cert to use by the server to verify a client certificate. (optional) | + +#### `multiline` configuration + +If set, the `multiline` configuration block instructs the `tcp_input` operator to split log entries on a pattern other than newlines. + +The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that +match either the beginning of a new log entry, or the end of a log entry. + +#### Supported encodings +| Key | Description +| --- | --- | +| `nop` | No encoding validation. Treats the file as a stream of raw bytes | +| `utf-8` | UTF-8 encoding | +| `utf-16le` | UTF-16 encoding with little-endian byte order | +| `utf-16be` | UTF-16 encoding with little-endian byte order | +| `ascii` | ASCII encoding | +| `big5` | The Big5 Chinese character encoding | + +Other less common encodings are supported on a best-effort basis. +See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) +for other encodings available. ### Example Configurations #### Simple Configuration: + ```yaml - type: tcp_input listen_address: "0.0.0.0:54525" ``` Send a log: + ```bash $ nc localhost 54525 < message1 @@ -49,6 +73,7 @@ heredoc> EOF ``` Generated entries: + ```json { "timestamp": "2020-04-30T12:10:17.656726-04:00", diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go index 193a2906..c1a0f1d6 100644 --- a/operator/helper/encoding.go +++ b/operator/helper/encoding.go @@ -18,10 +18,11 @@ import ( "fmt" "strings" - "github.com/open-telemetry/opentelemetry-log-collection/operator" "golang.org/x/text/encoding" "golang.org/x/text/encoding/ianaindex" "golang.org/x/text/encoding/unicode" + + "github.com/open-telemetry/opentelemetry-log-collection/operator" ) // NewBasicConfig creates a new Encoding config diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 5fb4d879..289cef78 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -20,8 +20,9 @@ import ( "fmt" "regexp" - "github.com/open-telemetry/opentelemetry-log-collection/operator" "golang.org/x/text/encoding" + + "github.com/open-telemetry/opentelemetry-log-collection/operator" ) // NewBasicConfig creates a new Multiline config @@ -40,7 +41,6 @@ type MultilineConfig struct { // Build will build a Multiline operator. func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding) (MultilineOperator, error) { - splitFunc, err := c.getSplitFunc(encoding) if err != nil { return MultilineOperator{}, err From 532da4b8328da4a53cad67bdef75ee70eb642b29 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Wed, 5 May 2021 10:03:22 +0200 Subject: [PATCH 06/11] Fxi handling multiline for EOF --- operator/helper/multiline.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 289cef78..ce49303b 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -167,6 +167,10 @@ func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil } + if atEOF { + return len(data), data, nil + } + // Request more data. return 0, nil, nil }, nil From 3ad8f57fc1d2afce64a83b54ed48371b6bdfb0cb Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Wed, 5 May 2021 13:50:14 +0200 Subject: [PATCH 07/11] Optionally flush multilines for closed connections --- operator/builtin/input/file/config.go | 2 +- operator/builtin/input/tcp/tcp.go | 2 +- operator/helper/encoding.go | 2 +- operator/helper/multiline.go | 30 ++++++++++++++++++--------- operator/helper/multiline_test.go | 10 ++++----- 5 files changed, 28 insertions(+), 18 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 30ae9557..a37ed377 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -112,7 +112,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - multiline, err := c.Multiline.Build(context, encoding) + multiline, err := c.Multiline.Build(context, encoding, false) if err != nil { return nil, err } diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 480e0ef7..eb0ef96d 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -98,7 +98,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - multiline, err := c.Multiline.Build(context, encoding) + multiline, err := c.Multiline.Build(context, encoding, true) if err != nil { return nil, err } diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go index c1a0f1d6..64ec3eb1 100644 --- a/operator/helper/encoding.go +++ b/operator/helper/encoding.go @@ -37,7 +37,7 @@ type EncodingConfig struct { Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"` } -// Build will build a Multiline operator. +// Build will build an Encoding operator. func (c EncodingConfig) Build(context operator.BuildContext) (encoding.Encoding, error) { return lookupEncoding(c.Encoding) } diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index ce49303b..4e8c6844 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -40,8 +40,8 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding) (MultilineOperator, error) { - splitFunc, err := c.getSplitFunc(encoding) +func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (MultilineOperator, error) { + splitFunc, err := c.getSplitFunc(encoding, flushAtEOF) if err != nil { return MultilineOperator{}, err } @@ -59,7 +59,7 @@ type MultilineOperator struct { } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -67,19 +67,19 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFu case endPattern != "" && startPattern != "": return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") case endPattern == "" && startPattern == "": - return NewNewlineSplitFunc(encoding) + return NewNewlineSplitFunc(encoding, flushAtEOF) case endPattern != "": re, err := regexp.Compile("(?m)" + c.LineEndPattern) if err != nil { return nil, fmt.Errorf("compile line end regex: %s", err) } - return NewLineEndSplitFunc(re), nil + return NewLineEndSplitFunc(re, flushAtEOF), nil case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %s", err) } - return NewLineStartSplitFunc(re), nil + return NewLineStartSplitFunc(re, flushAtEOF), nil default: return nil, fmt.Errorf("unreachable") } @@ -87,7 +87,7 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFu // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { @@ -108,6 +108,11 @@ func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { return 0, nil, nil } + // Flush if no more data is expected + if atEOF && flushAtEOF { + return len(data), data, nil + } + secondLocOffset := firstMatchEnd + 1 secondLoc := re.FindIndex(data[secondLocOffset:]) if secondLoc == nil { @@ -124,10 +129,14 @@ func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc { +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { + // Flush if no more data is expected + if len(data) != 0 && atEOF && flushAtEOF { + return len(data), data, nil + } return 0, nil, nil // read more data and try again } @@ -146,7 +155,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc { // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { +func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { newline, err := encodedNewline(encoding) if err != nil { return nil, err @@ -167,7 +176,8 @@ func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil } - if atEOF { + // Flush if no more data is expected + if atEOF && flushAtEOF { return len(data), data, nil } diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index 249ff756..637deb86 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -140,13 +140,13 @@ func TestLineStartSplitFunc(t *testing.T) { cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART")) + splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -248,7 +248,7 @@ func TestLineEndSplitFunc(t *testing.T) { cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -326,7 +326,7 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8) + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -379,7 +379,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding) + splitFunc, err := NewNewlineSplitFunc(tc.encoding, false) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) From 37ff79d3605ca6d1181b878775184d1bfc90da9a Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 7 May 2021 11:33:18 +0200 Subject: [PATCH 08/11] Return splitFunc by multiline helper --- operator/builtin/input/file/config.go | 4 ++-- operator/builtin/input/tcp/tcp.go | 4 ++-- operator/helper/multiline.go | 18 ++---------------- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index a37ed377..e3393c49 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -112,7 +112,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - multiline, err := c.Multiline.Build(context, encoding, false) + splitFunc, err := c.Multiline.Build(context, encoding, false) if err != nil { return nil, err } @@ -141,7 +141,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, InputOperator: inputOperator, Include: c.Include, Exclude: c.Exclude, - SplitFunc: multiline.SplitFunc, + SplitFunc: splitFunc, PollInterval: c.PollInterval.Raw(), FilePathField: filePathField, FileNameField: fileNameField, diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index eb0ef96d..7e396025 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -98,7 +98,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - multiline, err := c.Multiline.Build(context, encoding, true) + splitFunc, err := c.Multiline.Build(context, encoding, true) if err != nil { return nil, err } @@ -109,7 +109,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato maxBufferSize: int(c.MaxBufferSize), addAttributes: c.AddAttributes, encoding: encoding, - splitFunc: multiline.SplitFunc, + splitFunc: splitFunc, } if c.TLS != nil { diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 4e8c6844..7c786e1b 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -40,22 +40,8 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (MultilineOperator, error) { - splitFunc, err := c.getSplitFunc(encoding, flushAtEOF) - if err != nil { - return MultilineOperator{}, err - } - - operator := MultilineOperator{ - SplitFunc: splitFunc, - } - - return operator, nil -} - -// MultilineOperator is an operator which handle SplitFunc for bufio.Scanner -type MultilineOperator struct { - SplitFunc bufio.SplitFunc +func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { + return c.getSplitFunc(encoding, flushAtEOF) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern From 7dd11bdc9f7dd87a740f4943223a4621c988c117 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 7 May 2021 11:40:24 +0200 Subject: [PATCH 09/11] tcp: rename MaxBufferSize to MaxLogSize Signed-off-by: Dominik Rosiek --- operator/builtin/input/tcp/tcp.go | 22 +++++++++++----------- operator/builtin/input/tcp/tcp_test.go | 10 +++++----- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 7e396025..1ed3d6b1 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -37,9 +37,9 @@ const ( // TCP input minBufferSize = 64 * 1024 - // DefaultMaxBufferSize is the max buffer sized used - // if MaxBufferSize is not set - DefaultMaxBufferSize = 1024 * 1024 + // DefaultMaxLogSize is the max buffer sized used + // if MaxLogSize is not set + DefaultMaxLogSize = 1024 * 1024 ) func init() { @@ -59,7 +59,7 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig { type TCPInputConfig struct { helper.InputConfig `yaml:",inline"` - MaxBufferSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` ListenAddress string `mapstructure:"listen_address,omitempty" json:"listen_address,omitempty" yaml:"listen_address,omitempty"` TLS *helper.TLSServerConfig `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"` AddAttributes bool `mapstructure:"add_attributes,omitempty" json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` @@ -74,13 +74,13 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - // If MaxBufferSize not set, set sane default in order to remain + // If MaxLogSize not set, set sane default in order to remain // backwards compatible with existing plugins and configurations - if c.MaxBufferSize == 0 { - c.MaxBufferSize = DefaultMaxBufferSize + if c.MaxLogSize == 0 { + c.MaxLogSize = DefaultMaxLogSize } - if c.MaxBufferSize < minBufferSize { + if c.MaxLogSize < minBufferSize { return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minBufferSize) } @@ -106,7 +106,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato tcpInput := &TCPInput{ InputOperator: inputOperator, address: c.ListenAddress, - maxBufferSize: int(c.MaxBufferSize), + MaxLogSize: int(c.MaxLogSize), addAttributes: c.AddAttributes, encoding: encoding, splitFunc: splitFunc, @@ -126,7 +126,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato type TCPInput struct { helper.InputOperator address string - maxBufferSize int + MaxLogSize int addAttributes bool listener net.Listener @@ -223,7 +223,7 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c // Initial buffer size is 64k buf := make([]byte, 0, 64*1024) scanner := bufio.NewScanner(conn) - scanner.Buffer(buf, t.maxBufferSize*1024) + scanner.Buffer(buf, t.MaxLogSize*1024) scanner.Split(t.splitFunc) diff --git a/operator/builtin/input/tcp/tcp_test.go b/operator/builtin/input/tcp/tcp_test.go index 0d0c2730..9bbec3ed 100644 --- a/operator/builtin/input/tcp/tcp_test.go +++ b/operator/builtin/input/tcp/tcp_test.go @@ -290,7 +290,7 @@ func TestBuild(t *testing.T) { { "buffer-size-valid-default", TCPInputConfig{ - MaxBufferSize: 0, + MaxLogSize: 0, ListenAddress: "10.0.0.1:9000", }, false, @@ -298,7 +298,7 @@ func TestBuild(t *testing.T) { { "buffer-size-valid-min", TCPInputConfig{ - MaxBufferSize: 65536, + MaxLogSize: 65536, ListenAddress: "10.0.0.1:9000", }, false, @@ -306,7 +306,7 @@ func TestBuild(t *testing.T) { { "buffer-size-negative", TCPInputConfig{ - MaxBufferSize: -1, + MaxLogSize: -1, ListenAddress: "10.0.0.1:9000", }, true, @@ -314,7 +314,7 @@ func TestBuild(t *testing.T) { { "tls-enabled-with-no-such-file-error", TCPInputConfig{ - MaxBufferSize: 65536, + MaxLogSize: 65536, ListenAddress: "10.0.0.1:9000", TLS: createTlsConfig("/tmp/cert/missing", "/tmp/key/missing"), }, @@ -326,7 +326,7 @@ func TestBuild(t *testing.T) { t.Run(tc.name, func(t *testing.T) { cfg := NewTCPInputConfig("test_id") cfg.ListenAddress = tc.inputBody.ListenAddress - cfg.MaxBufferSize = tc.inputBody.MaxBufferSize + cfg.MaxLogSize = tc.inputBody.MaxLogSize cfg.TLS = tc.inputBody.TLS _, err := cfg.Build(testutil.NewBuildContext(t)) if tc.expectErr { From 9e50770bf64102323eaaedadbf123ba368f774d1 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 7 May 2021 15:50:06 +0200 Subject: [PATCH 10/11] Add decode method to encoding helper Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 2 +- operator/builtin/input/file/file.go | 3 +-- operator/builtin/input/file/reader.go | 2 +- operator/builtin/input/tcp/tcp.go | 13 ++++++---- operator/helper/encoding.go | 34 +++++++++++++++++++++++++-- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index e3393c49..b806b3ac 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -112,7 +112,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding, false) + splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false) if err != nil { return nil, err } diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 60bf2fa8..ad1fea14 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -27,7 +27,6 @@ import ( "github.com/bmatcuk/doublestar/v3" "go.uber.org/zap" - "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-log-collection/entry" "github.com/open-telemetry/opentelemetry-log-collection/operator" @@ -57,7 +56,7 @@ type InputOperator struct { fingerprintSize int - encoding encoding.Encoding + encoding helper.Encoding wg sync.WaitGroup firstCheck bool diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index f7239781..12dbf6bd 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -53,7 +53,7 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) ( Path: path, fileInput: f, SugaredLogger: f.SugaredLogger.With("path", path), - decoder: f.encoding.NewDecoder(), + decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), } return r, nil diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 1ed3d6b1..077795ca 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -26,7 +26,6 @@ import ( "time" "go.uber.org/zap" - "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-log-collection/operator" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" @@ -98,7 +97,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding, true) + splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true) if err != nil { return nil, err } @@ -134,7 +133,7 @@ type TCPInput struct { wg sync.WaitGroup tls *tls.Config - encoding encoding.Encoding + encoding helper.Encoding splitFunc bufio.SplitFunc } @@ -228,7 +227,13 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c scanner.Split(t.splitFunc) for scanner.Scan() { - entry, err := t.NewEntry(scanner.Text()) + decoded, err := t.encoding.Decode(scanner.Bytes()) + if err != nil { + t.Errorw("Failed to decode data", zap.Error(err)) + continue + } + + entry, err := t.NewEntry(decoded) if err != nil { t.Errorw("Failed to create entry", zap.Error(err)) continue diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go index 64ec3eb1..7837eba6 100644 --- a/operator/helper/encoding.go +++ b/operator/helper/encoding.go @@ -21,6 +21,7 @@ import ( "golang.org/x/text/encoding" "golang.org/x/text/encoding/ianaindex" "golang.org/x/text/encoding/unicode" + "golang.org/x/text/transform" "github.com/open-telemetry/opentelemetry-log-collection/operator" ) @@ -38,8 +39,37 @@ type EncodingConfig struct { } // Build will build an Encoding operator. -func (c EncodingConfig) Build(context operator.BuildContext) (encoding.Encoding, error) { - return lookupEncoding(c.Encoding) +func (c EncodingConfig) Build(context operator.BuildContext) (Encoding, error) { + enc, err := lookupEncoding(c.Encoding) + if err != nil { + return Encoding{}, err + } + + return Encoding{ + Encoding: enc, + }, nil +} + +type Encoding struct { + Encoding encoding.Encoding +} + +// decode converts the bytes in msgBuf to utf-8 from the configured encoding +func (e *Encoding) Decode(msgBuf []byte) (string, error) { + decodeBuffer := make([]byte, 1<<12) + decoder := e.Encoding.NewDecoder() + + for { + decoder.Reset() + nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true) + if err != nil && err == transform.ErrShortDst { + decodeBuffer = make([]byte, len(decodeBuffer)*2) + continue + } else if err != nil { + return "", fmt.Errorf("transform encoding: %s", err) + } + return string(decodeBuffer[:nDst]), nil + } } var encodingOverrides = map[string]encoding.Encoding{ From d581287f08e914dc9c7dfeef91c2539a77665ad4 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 7 May 2021 16:50:49 +0200 Subject: [PATCH 11/11] Refactor encoding helper Signed-off-by: Dominik Rosiek --- operator/helper/encoding.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go index 7837eba6..875921ff 100644 --- a/operator/helper/encoding.go +++ b/operator/helper/encoding.go @@ -62,13 +62,14 @@ func (e *Encoding) Decode(msgBuf []byte) (string, error) { for { decoder.Reset() nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true) - if err != nil && err == transform.ErrShortDst { + if err == nil { + return string(decodeBuffer[:nDst]), nil + } + if err == transform.ErrShortDst { decodeBuffer = make([]byte, len(decodeBuffer)*2) continue - } else if err != nil { - return "", fmt.Errorf("transform encoding: %s", err) } - return string(decodeBuffer[:nDst]), nil + return "", fmt.Errorf("transform encoding: %s", err) } }