Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move multiline handling from file input to helper package #319

Merged
merged 4 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 12 additions & 78 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package file

import (
"bufio"
"fmt"
"regexp"
"strings"
"time"

"github.com/bmatcuk/doublestar/v2"
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/encoding/unicode"
)

func init() {
Expand All @@ -35,7 +29,7 @@ func NewInputConfig(operatorID string) *InputConfig {
StartAt: "end",
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: "nop",
Encoding: helper.NewEncodingConfig(),
}
}

Expand All @@ -46,21 +40,15 @@ type InputConfig struct {
Include []string `json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"`
}

// MultilineConfig is the configuration a multiline operation
type MultilineConfig struct {
LineStartPattern string `json:"line_start_pattern" yaml:"line_start_pattern"`
LineEndPattern string `json:"line_end_pattern" yaml:"line_end_pattern"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -104,12 +92,12 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", minFingerprintSize)
}

encoding, err := lookupEncoding(c.Encoding)
encoding, err := c.Encoding.Build(context)
if err != nil {
return nil, err
}

splitFunc, err := c.getSplitFunc(encoding)
splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,57 +145,3 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,

return []operator.Operator{op}, nil
}

var encodingOverrides = map[string]encoding.Encoding{
"utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf8": unicode.UTF8,
"ascii": unicode.UTF8,
"us-ascii": unicode.UTF8,
"nop": encoding.Nop,
"": encoding.Nop,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return encoding, nil
}
encoding, err := ianaindex.IANA.Encoding(enc)
if err != nil {
return nil, fmt.Errorf("unsupported encoding '%s'", enc)
}
if encoding == nil {
return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc)
}
return encoding, nil
}

// getSplitFunc will return the split function associated the configured mode.
func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
if c.Multiline == nil {
return NewNewlineSplitFunc(encoding)
}
endPattern := c.Multiline.LineEndPattern
startPattern := c.Multiline.LineStartPattern

switch {
case endPattern != "" && startPattern != "":
return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set")
case endPattern == "" && startPattern == "":
return nil, fmt.Errorf("one of line_start_pattern or line_end_pattern must be set")
case endPattern != "":
re, err := regexp.Compile("(?m)" + c.Multiline.LineEndPattern)
if err != nil {
return nil, fmt.Errorf("compile line end regex: %s", err)
}
return NewLineEndSplitFunc(re), nil
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.Multiline.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %s", err)
}
return NewLineStartSplitFunc(re), nil
default:
return nil, fmt.Errorf("unreachable")
}
}
Loading