diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index b179cdb8199a..b2884caf4a9b 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -140,12 +140,12 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s var hs *headerSettings if c.Header != nil { - enc, err := c.Splitter.EncodingConfig.Build() + enc, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding) if err != nil { return nil, fmt.Errorf("failed to create encoding: %w", err) } - hs, err = c.Header.buildHeaderSettings(enc.Encoding) + hs, err = c.Header.buildHeaderSettings(enc) if err != nil { return nil, fmt.Errorf("failed to build header config: %w", err) } @@ -222,7 +222,7 @@ func (c Config) validate() error { return errors.New("`max_batches` must not be negative") } - _, err := c.Splitter.EncodingConfig.Build() + _, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding) if err != nil { return err } diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index b8cf7f67c133..8a820ad1e365 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -130,11 +130,11 @@ func (b *readerBuilder) build() (r *Reader, err error) { r.splitFunc = r.lineSplitFunc } - enc, err := b.encodingConfig.Build() + enc, err := helper.LookupEncoding(b.encodingConfig.Encoding) if err != nil { return } - r.encoding = enc + r.encoding = helper.NewEncoding(enc) if b.file != nil { r.file = b.file diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index f78533abfe46..a5a782cae945 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -182,12 +182,13 @@ func TestHeaderFingerprintIncluded(t *testing.T) { }, } - enc, err := helper.EncodingConfig{ + cfg := helper.EncodingConfig{ Encoding: "utf-8", - }.Build() + } + enc, err := helper.LookupEncoding(cfg.Encoding) require.NoError(t, err) - h, err := headerConf.buildHeaderSettings(enc.Encoding) + h, err := headerConf.buildHeaderSettings(enc) require.NoError(t, err) f.headerSettings = h diff --git a/pkg/stanza/fileconsumer/splitter_factory.go b/pkg/stanza/fileconsumer/splitter_factory.go index ada06263abd1..a34ebb900453 100644 --- a/pkg/stanza/fileconsumer/splitter_factory.go +++ b/pkg/stanza/fileconsumer/splitter_factory.go @@ -38,12 +38,12 @@ func newMultilineSplitterFactory(splitter helper.SplitterConfig) *multilineSplit // Build builds Multiline Splitter struct func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, error) { - enc, err := factory.EncodingConfig.Build() + enc, err := helper.LookupEncoding(factory.EncodingConfig.Encoding) if err != nil { return nil, err } flusher := factory.Flusher.Build() - splitter, err := factory.Multiline.Build(enc.Encoding, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize) + splitter, err := factory.Multiline.Build(enc, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize) if err != nil { return nil, err } diff --git a/pkg/stanza/operator/helper/encoding.go b/pkg/stanza/operator/helper/encoding.go index 4bac27c9454d..864daf59374c 100644 --- a/pkg/stanza/operator/helper/encoding.go +++ b/pkg/stanza/operator/helper/encoding.go @@ -25,7 +25,7 @@ import ( "golang.org/x/text/transform" ) -// NewBasicConfig creates a new Encoding config +// NewEncodingConfig creates a new Encoding config func NewEncodingConfig() EncodingConfig { return EncodingConfig{ Encoding: "utf-8", @@ -37,26 +37,20 @@ type EncodingConfig struct { Encoding string `mapstructure:"encoding,omitempty"` } -// Build will build an Encoding operator. -func (c EncodingConfig) Build() (Encoding, error) { - enc, err := lookupEncoding(c.Encoding) - if err != nil { - return Encoding{}, err - } - - return Encoding{ - Encoding: enc, - decodeBuffer: make([]byte, 1<<12), - decoder: enc.NewDecoder(), - }, nil -} - type Encoding struct { Encoding encoding.Encoding decoder *encoding.Decoder decodeBuffer []byte } +func NewEncoding(enc encoding.Encoding) Encoding { + return Encoding{ + Encoding: enc, + decoder: enc.NewDecoder(), + decodeBuffer: make([]byte, 1<<12), + } +} + // Decode converts the bytes in msgBuf to utf-8 from the configured encoding func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) { for { @@ -84,7 +78,7 @@ var encodingOverrides = map[string]encoding.Encoding{ "": unicode.UTF8, } -func lookupEncoding(enc string) (encoding.Encoding, error) { +func LookupEncoding(enc string) (encoding.Encoding, error) { if e, ok := encodingOverrides[strings.ToLower(enc)]; ok { return e, nil } @@ -99,7 +93,7 @@ func lookupEncoding(enc string) (encoding.Encoding, error) { } func IsNop(enc string) bool { - e, err := lookupEncoding(enc) + e, err := LookupEncoding(enc) if err != nil { return false } diff --git a/pkg/stanza/operator/helper/splitter.go b/pkg/stanza/operator/helper/splitter.go index 1394a416b817..234978f27bf3 100644 --- a/pkg/stanza/operator/helper/splitter.go +++ b/pkg/stanza/operator/helper/splitter.go @@ -36,19 +36,19 @@ func NewSplitterConfig() SplitterConfig { // Build builds Splitter struct func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { - enc, err := c.EncodingConfig.Build() + enc, err := LookupEncoding(c.EncodingConfig.Encoding) if err != nil { return nil, err } flusher := c.Flusher.Build() - splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize) + splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize) if err != nil { return nil, err } return &Splitter{ - Encoding: enc, + Encoding: NewEncoding(enc), Flusher: flusher, SplitFunc: splitFunc, }, nil diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 76f5af11fcb0..a2979dd391ab 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -28,6 +28,7 @@ import ( "github.com/jpillora/backoff" "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" + "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -108,13 +109,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, fmt.Errorf("failed to resolve listen_address: %w", err) } - encoding, err := c.Encoding.Build() + enc, err := helper.LookupEncoding(c.Encoding.Encoding) if err != nil { return nil, err } // Build multiline - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize)) + splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize)) if err != nil { return nil, err } @@ -129,7 +130,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { address: c.ListenAddress, MaxLogSize: int(c.MaxLogSize), addAttributes: c.AddAttributes, - encoding: encoding, + encoding: enc, splitFunc: splitFunc, backoff: backoff.Backoff{ Max: 3 * time.Second, @@ -160,7 +161,7 @@ type Input struct { tls *tls.Config backoff backoff.Backoff - encoding helper.Encoding + encoding encoding.Encoding splitFunc bufio.SplitFunc resolver *helper.IPResolver } @@ -253,11 +254,11 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont buf := make([]byte, 0, t.MaxLogSize) scanner := bufio.NewScanner(conn) scanner.Buffer(buf, t.MaxLogSize) - scanner.Split(t.splitFunc) + decoder := helper.NewEncoding(t.encoding) for scanner.Scan() { - decoded, err := t.encoding.Decode(scanner.Bytes()) + decoded, err := decoder.Decode(scanner.Bytes()) if err != nil { t.Errorw("Failed to decode data", zap.Error(err)) continue diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index 43a37b3458a5..a7baf8078a12 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -24,6 +24,7 @@ import ( "sync" "go.uber.org/zap" + "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -91,13 +92,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, fmt.Errorf("failed to resolve listen_address: %w", err) } - encoding, err := c.Encoding.Build() + enc, err := helper.LookupEncoding(c.Encoding.Encoding) if err != nil { return nil, err } // Build multiline - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, MaxUDPSize) + splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, MaxUDPSize) if err != nil { return nil, err } @@ -112,7 +113,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { address: address, buffer: make([]byte, MaxUDPSize), addAttributes: c.AddAttributes, - encoding: encoding, + encoding: enc, splitFunc: splitFunc, resolver: resolver, } @@ -130,7 +131,7 @@ type Input struct { cancel context.CancelFunc wg sync.WaitGroup - encoding helper.Encoding + encoding encoding.Encoding splitFunc bufio.SplitFunc resolver *helper.IPResolver } @@ -158,6 +159,7 @@ func (u *Input) goHandleMessages(ctx context.Context) { defer u.wg.Done() buf := make([]byte, 0, MaxUDPSize) + decoder := helper.NewEncoding(u.encoding) for { message, remoteAddr, err := u.readMessage() if err != nil { @@ -172,11 +174,10 @@ func (u *Input) goHandleMessages(ctx context.Context) { scanner := bufio.NewScanner(bytes.NewReader(message)) scanner.Buffer(buf, MaxUDPSize) - scanner.Split(u.splitFunc) for scanner.Scan() { - decoded, err := u.encoding.Decode(scanner.Bytes()) + decoded, err := decoder.Decode(scanner.Bytes()) if err != nil { u.Errorw("Failed to decode data", zap.Error(err)) continue