Skip to content

Commit

Permalink
[pkg/stanza] Create a new decoder for each TCP/UDP connection to prev…
Browse files Browse the repository at this point in the history
…ent concurrent write to buffer. (#25100)

Change the TCP/UDP inputs so that they parse the
encoding and get the `encoding.Encoding` during initialization, but wait
to create the `helper.Encoding` instance in each goroutine. This
prevents concurrent calls to `Decode` using the same instance.

Co-authored-by: Dan Jaglowski <jaglows3@gmail.com>
  • Loading branch information
jefchien and djaglowski authored Aug 14, 2023
1 parent e136bfd commit 6ebf905
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 52 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-concurrent-decode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Create a new decoder for each TCP/UDP connection to prevent concurrent write to buffer.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24980]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/fix-concurrent-decode2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate helper.Encoding and helper.EncodingConfig.Build

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24980]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact

var hCfg *header.Config
if c.Header != nil {
enc, err := c.Splitter.EncodingConfig.Build()
enc, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create encoding: %w", err)
}

hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding)
hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc)
if err != nil {
return nil, fmt.Errorf("failed to build header config: %w", err)
}
Expand Down Expand Up @@ -212,13 +212,13 @@ func (c Config) validate() error {
return errors.New("`max_batches` must not be negative")
}

enc, err := c.Splitter.EncodingConfig.Build()
enc, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding)
if err != nil {
return err
}

if c.Header != nil {
if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding); err != nil {
if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc); err != nil {
return fmt.Errorf("invalid config for `header`: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type reader struct {
*readerConfig
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
encoding helper.Encoding
decoder *helper.Decoder
processFunc emit.Callback

Fingerprint *fingerprint.Fingerprint
Expand Down Expand Up @@ -87,7 +87,7 @@ func (r *reader) ReadToEnd(ctx context.Context) {
break
}

token, err := r.encoding.Decode(s.Bytes())
token, err := r.decoder.Decode(s.Bytes())
if err != nil {
r.Errorw("decode: %w", zap.Error(err))
} else if err := r.processFunc(ctx, token, r.FileAttributes); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/reader_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ func (b *readerBuilder) build() (r *reader, err error) {
}
}

r.encoding, err = b.encodingConfig.Build()
encoding, err := helper.LookupEncoding(b.encodingConfig.Encoding)
if err != nil {
return nil, err
}
r.decoder = helper.NewDecoder(encoding)

if b.headerConfig == nil || b.headerFinalized {
r.splitFunc = r.lineSplitFunc
Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ func TestHeaderFingerprintIncluded(t *testing.T) {
regexConf := regex.NewConfig()
regexConf.Regex = "^#(?P<header>.*)"

enc, err := helper.EncodingConfig{
encodingConf := helper.EncodingConfig{
Encoding: "utf-8",
}.Build()
}
enc, err := helper.LookupEncoding(encodingConf.Encoding)
require.NoError(t, err)

h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc.Encoding)
h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc)
require.NoError(t, err)
f.headerConfig = h

Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/splitter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func newMultilineSplitterFactory(splitter helper.SplitterConfig) *multilineSplit

// Build builds Multiline Splitter struct
func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, error) {
enc, err := factory.EncodingConfig.Build()
enc, err := helper.LookupEncoding(factory.EncodingConfig.Encoding)
if err != nil {
return nil, err
}
flusher := factory.Flusher.Build()
splitter, err := factory.Multiline.Build(enc.Encoding, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
splitter, err := factory.Multiline.Build(enc, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}
Expand Down
42 changes: 30 additions & 12 deletions pkg/stanza/operator/helper/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import (
"golang.org/x/text/transform"
)

// NewBasicConfig creates a new Encoding config
// NewEncodingConfig creates a new Encoding config
func NewEncodingConfig() EncodingConfig {
return EncodingConfig{
Encoding: "utf-8",
}
}

// EncodingConfig is the configuration of a Encoding helper
// EncodingConfig is the configuration of an Encoding helper
type EncodingConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
}

// Build will build an Encoding operator.
// Deprecated: [v0.83.0] Use NewDecoder instead.
func (c EncodingConfig) Build() (Encoding, error) {
enc, err := lookupEncoding(c.Encoding)
enc, err := LookupEncoding(c.Encoding)
if err != nil {
return Encoding{}, err
}
Expand All @@ -40,22 +40,39 @@ func (c EncodingConfig) Build() (Encoding, error) {
}, nil
}

// Deprecated: [v0.83.0] Use Decoder instead.
type Encoding struct {
Encoding encoding.Encoding
decoder *encoding.Decoder
decodeBuffer []byte
}

// Decode converts the bytes in msgBuf to utf-8 from the configured encoding
func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) {
type Decoder struct {
encoding encoding.Encoding
decoder *encoding.Decoder
decodeBuffer []byte
}

// NewDecoder wraps a character set encoding and creates a reusable buffer to reduce allocation.
// Decoder is not thread-safe and must not be used in multiple goroutines.
func NewDecoder(enc encoding.Encoding) *Decoder {
return &Decoder{
encoding: enc,
decoder: enc.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
}
}

// Decode converts the bytes in msgBuf to UTF-8 from the configured encoding.
func (d *Decoder) Decode(msgBuf []byte) ([]byte, error) {
for {
e.decoder.Reset()
nDst, _, err := e.decoder.Transform(e.decodeBuffer, msgBuf, true)
d.decoder.Reset()
nDst, _, err := d.decoder.Transform(d.decodeBuffer, msgBuf, true)
if err == nil {
return e.decodeBuffer[:nDst], nil
return d.decodeBuffer[:nDst], nil
}
if errors.Is(err, transform.ErrShortDst) {
e.decodeBuffer = make([]byte, len(e.decodeBuffer)*2)
d.decodeBuffer = make([]byte, len(d.decodeBuffer)*2)
continue
}
return nil, fmt.Errorf("transform encoding: %w", err)
Expand All @@ -73,7 +90,8 @@ var encodingOverrides = map[string]encoding.Encoding{
"": unicode.UTF8,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
// LookupEncoding attempts to match the string name provided with a character set encoding.
func LookupEncoding(enc string) (encoding.Encoding, error) {
if e, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return e, nil
}
Expand All @@ -88,7 +106,7 @@ func lookupEncoding(enc string) (encoding.Encoding, error) {
}

func IsNop(enc string) bool {
e, err := lookupEncoding(enc)
e, err := LookupEncoding(enc)
if err != nil {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Multiline struct {
Force *Flusher
}

// NewBasicConfig creates a new Multiline config
// NewMultilineConfig creates a new Multiline config
func NewMultilineConfig() MultilineConfig {
return MultilineConfig{
LineStartPattern: "",
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/operator/helper/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ func NewSplitterConfig() SplitterConfig {

// Build builds Splitter struct
func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) {
enc, err := c.EncodingConfig.Build()
enc, err := LookupEncoding(c.EncodingConfig.Encoding)
if err != nil {
return nil, err
}

flusher := c.Flusher.Build()
splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize)
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}

return &Splitter{
Encoding: enc,
Decoder: NewDecoder(enc),
Flusher: flusher,
SplitFunc: splitFunc,
}, nil
}

// Splitter consolidates Flusher and dependent splitFunc
type Splitter struct {
Encoding Encoding
Decoder *Decoder
SplitFunc bufio.SplitFunc
Flusher *Flusher
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/operator/input/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"

"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -143,7 +144,7 @@ func (t *Input) SetOutputs(operators []operator.Operator) error {
return t.parser.SetOutputs(operators)
}

func OctetMultiLineBuilder(_ helper.Encoding) (bufio.SplitFunc, error) {
func OctetMultiLineBuilder(_ encoding.Encoding) (bufio.SplitFunc, error) {
return newOctetFrameSplitFunc(true), nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/operator/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/internal"
Expand Down Expand Up @@ -250,7 +249,7 @@ func TestOctetFramingSplitFunc(t *testing.T) {
},
}
for _, tc := range testCases {
splitFunc, err := OctetMultiLineBuilder(helper.Encoding{})
splitFunc, err := OctetMultiLineBuilder(nil)
require.NoError(t, err)
t.Run(tc.Name, tc.RunFunc(splitFunc))
}
Expand Down
Loading

0 comments on commit 6ebf905

Please sign in to comment.