Skip to content

Commit

Permalink
move multiline handling from file input to helper package (#319)
Browse files Browse the repository at this point in the history
* move multiline handling from file input to helper. this will allow multiline to be used by other operators such as tcp and udp input

* Replace open-telemetry/opentelemetry-log-collection with observiq/stanza. Remove map structure tests as they are not implemented yet.

* make encoding inline and re-enable encoding lower|upper tests
  • Loading branch information
Joseph Sirianni authored Jun 1, 2021
1 parent 3114da3 commit 02ea8fd
Show file tree
Hide file tree
Showing 63 changed files with 948 additions and 106 deletions.
90 changes: 12 additions & 78 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package file

import (
"bufio"
"fmt"
"regexp"
"strings"
"time"

"github.com/bmatcuk/doublestar/v2"
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/encoding/unicode"
)

func init() {
Expand All @@ -35,7 +29,7 @@ func NewInputConfig(operatorID string) *InputConfig {
StartAt: "end",
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: "nop",
Encoding: helper.NewEncodingConfig(),
}
}

Expand All @@ -46,21 +40,15 @@ type InputConfig struct {
Include []string `json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"`
}

// MultilineConfig is the configuration a multiline operation
type MultilineConfig struct {
LineStartPattern string `json:"line_start_pattern" yaml:"line_start_pattern"`
LineEndPattern string `json:"line_end_pattern" yaml:"line_end_pattern"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -104,12 +92,12 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", minFingerprintSize)
}

encoding, err := lookupEncoding(c.Encoding)
encoding, err := c.Encoding.Build(context)
if err != nil {
return nil, err
}

splitFunc, err := c.getSplitFunc(encoding)
splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,57 +145,3 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,

return []operator.Operator{op}, nil
}

var encodingOverrides = map[string]encoding.Encoding{
"utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf8": unicode.UTF8,
"ascii": unicode.UTF8,
"us-ascii": unicode.UTF8,
"nop": encoding.Nop,
"": encoding.Nop,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return encoding, nil
}
encoding, err := ianaindex.IANA.Encoding(enc)
if err != nil {
return nil, fmt.Errorf("unsupported encoding '%s'", enc)
}
if encoding == nil {
return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc)
}
return encoding, nil
}

// getSplitFunc will return the split function associated the configured mode.
func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
if c.Multiline == nil {
return NewNewlineSplitFunc(encoding)
}
endPattern := c.Multiline.LineEndPattern
startPattern := c.Multiline.LineStartPattern

switch {
case endPattern != "" && startPattern != "":
return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set")
case endPattern == "" && startPattern == "":
return nil, fmt.Errorf("one of line_start_pattern or line_end_pattern must be set")
case endPattern != "":
re, err := regexp.Compile("(?m)" + c.Multiline.LineEndPattern)
if err != nil {
return nil, fmt.Errorf("compile line end regex: %s", err)
}
return NewLineEndSplitFunc(re), nil
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.Multiline.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %s", err)
}
return NewLineStartSplitFunc(re), nil
default:
return nil, fmt.Errorf("unreachable")
}
}
Loading

0 comments on commit 02ea8fd

Please sign in to comment.