Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add multiline support to TCP #125

Merged
merged 11 commits into from
May 7, 2021
61 changes: 43 additions & 18 deletions docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,67 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |

| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |

#### TLS Configuration

The `tcp_input` operator supports TLS, disabled by default.
config more detail [opentelemetry-collector#configtls](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/configtls#tls-configuration-settings).

| Field | Default | Description |
| --- | --- | --- |
| `cert_file` | | Path to the TLS cert to use for TLS required connections. |
| `key_file` | | Path to the TLS key to use for TLS required connections. |
| `ca_file` | | Path to the CA cert. For a client this verifies the server certificate. For a server this verifies client certificates. If empty uses system root CA. |
| `client_ca_file` | | Path to the TLS cert to use by the server to verify a client certificate. (optional) |
| Field | Default | Description |
| --- | --- | --- |
| `cert_file` | | Path to the TLS cert to use for TLS required connections. |
| `key_file` | | Path to the TLS key to use for TLS required connections. |
| `ca_file` | | Path to the CA cert. For a client this verifies the server certificate. For a server this verifies client certificates. If empty uses system root CA. |
| `client_ca_file` | | Path to the TLS cert to use by the server to verify a client certificate. (optional) |

#### `multiline` configuration

If set, the `multiline` configuration block instructs the `tcp_input` operator to split log entries on a pattern other than newlines.

The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

#### Supported encodings

| Key | Description
| --- | --- |
| `nop` | No encoding validation. Treats the file as a stream of raw bytes |
| `utf-8` | UTF-8 encoding |
| `utf-16le` | UTF-16 encoding with little-endian byte order |
| `utf-16be` | UTF-16 encoding with little-endian byte order |
| `ascii` | ASCII encoding |
| `big5` | The Big5 Chinese character encoding |

Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

### Example Configurations

#### Simple

Configuration:

```yaml
- type: tcp_input
listen_address: "0.0.0.0:54525"
```

Send a log:

```bash
$ nc localhost 54525 <<EOF
heredoc> message1
Expand All @@ -49,6 +73,7 @@ heredoc> EOF
```

Generated entries:

```json
{
"timestamp": "2020-04-30T12:10:17.656726-04:00",
Expand Down
90 changes: 12 additions & 78 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,10 @@
package file

import (
"bufio"
"fmt"
"regexp"
"strings"
"time"

"github.com/bmatcuk/doublestar/v3"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
Expand All @@ -50,7 +44,7 @@ func NewInputConfig(operatorID string) *InputConfig {
StartAt: "end",
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: "nop",
Encoding: helper.NewEncodingConfig(),
}
}

Expand All @@ -61,21 +55,15 @@ type InputConfig struct {
Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding string `mapstructure:"encoding,omitempty" json:"encoding,omitempty" yaml:"encoding,omitempty"`
}

// MultilineConfig is the configuration a multiline operation
type MultilineConfig struct {
LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"`
LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"`
PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -119,12 +107,12 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", minFingerprintSize)
}

encoding, err := lookupEncoding(c.Encoding)
encoding, err := c.Encoding.Build(context)
if err != nil {
return nil, err
}

splitFunc, err := c.getSplitFunc(encoding)
splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -171,57 +159,3 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,

return []operator.Operator{op}, nil
}

var encodingOverrides = map[string]encoding.Encoding{
"utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf8": unicode.UTF8,
"ascii": unicode.UTF8,
"us-ascii": unicode.UTF8,
"nop": encoding.Nop,
"": encoding.Nop,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return encoding, nil
}
encoding, err := ianaindex.IANA.Encoding(enc)
if err != nil {
return nil, fmt.Errorf("unsupported encoding '%s'", enc)
}
if encoding == nil {
return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc)
}
return encoding, nil
}

// getSplitFunc will return the split function associated the configured mode.
func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
if c.Multiline == nil {
return NewNewlineSplitFunc(encoding)
}
endPattern := c.Multiline.LineEndPattern
startPattern := c.Multiline.LineStartPattern

switch {
case endPattern != "" && startPattern != "":
return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set")
case endPattern == "" && startPattern == "":
return nil, fmt.Errorf("one of line_start_pattern or line_end_pattern must be set")
case endPattern != "":
re, err := regexp.Compile("(?m)" + c.Multiline.LineEndPattern)
if err != nil {
return nil, fmt.Errorf("compile line end regex: %s", err)
}
return NewLineEndSplitFunc(re), nil
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.Multiline.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %s", err)
}
return NewLineStartSplitFunc(re), nil
default:
return nil, fmt.Errorf("unreachable")
}
}
19 changes: 11 additions & 8 deletions operator/builtin/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestConfig(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := new(MultilineConfig)
newMulti := helper.MultilineConfig{}
newMulti.LineStartPattern = "Start"
cfg.Multiline = newMulti
return cfg
Expand All @@ -398,7 +398,7 @@ func TestConfig(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := new(MultilineConfig)
newMulti := helper.MultilineConfig{}
newMulti.LineStartPattern = "%"
cfg.Multiline = newMulti
return cfg
Expand All @@ -409,7 +409,7 @@ func TestConfig(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := new(MultilineConfig)
newMulti := helper.MultilineConfig{}
newMulti.LineEndPattern = "Start"
cfg.Multiline = newMulti
return cfg
Expand All @@ -420,7 +420,7 @@ func TestConfig(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := new(MultilineConfig)
newMulti := helper.MultilineConfig{}
newMulti.LineEndPattern = "%"
cfg.Multiline = newMulti
return cfg
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestConfig(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
cfg.Encoding = "utf-16le"
cfg.Encoding = helper.EncodingConfig{Encoding: "utf-16le"}
return cfg
}(),
},
Expand All @@ -499,7 +499,7 @@ func TestConfig(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
cfg.Encoding = "UTF-16lE"
cfg.Encoding = helper.EncodingConfig{Encoding: "UTF-16lE"}
return cfg
}(),
},
Expand All @@ -521,9 +521,12 @@ func NewTestInputConfig() *InputConfig {
cfg.WriteTo = entry.NewBodyField([]string{}...)
cfg.Include = []string{"i1", "i2"}
cfg.Exclude = []string{"e1", "e2"}
cfg.Multiline = &MultilineConfig{"start", "end"}
cfg.Multiline = helper.MultilineConfig{
LineStartPattern: "start",
LineEndPattern: "end",
}
cfg.FingerprintSize = 1024
cfg.Encoding = "utf16"
cfg.Encoding = helper.EncodingConfig{Encoding: "utf16"}
return cfg
}

Expand Down
3 changes: 1 addition & 2 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/bmatcuk/doublestar/v3"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
Expand Down Expand Up @@ -57,7 +56,7 @@ type InputOperator struct {

fingerprintSize int

encoding encoding.Encoding
encoding helper.Encoding

wg sync.WaitGroup
firstCheck bool
Expand Down
22 changes: 11 additions & 11 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestBuild(t *testing.T) {
{
"MultilineConfiguredStartAndEndPatterns",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{
f.Multiline = helper.MultilineConfig{
LineEndPattern: "Exists",
LineStartPattern: "Exists",
}
Expand All @@ -175,7 +175,7 @@ func TestBuild(t *testing.T) {
{
"MultilineConfiguredStartPattern",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{
f.Multiline = helper.MultilineConfig{
LineStartPattern: "START.*",
}
},
Expand All @@ -185,7 +185,7 @@ func TestBuild(t *testing.T) {
{
"MultilineConfiguredEndPattern",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{
f.Multiline = helper.MultilineConfig{
LineEndPattern: "END.*",
}
},
Expand All @@ -195,15 +195,15 @@ func TestBuild(t *testing.T) {
{
"InvalidEncoding",
func(f *InputConfig) {
f.Encoding = "UTF-3233"
f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"}
},
require.Error,
nil,
},
{
"LineStartAndEnd",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{
f.Multiline = helper.MultilineConfig{
LineStartPattern: ".*",
LineEndPattern: ".*",
}
Expand All @@ -214,15 +214,15 @@ func TestBuild(t *testing.T) {
{
"NoLineStartOrEnd",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{}
f.Multiline = helper.MultilineConfig{}
},
require.Error,
nil,
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"InvalidLineStartRegex",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{
f.Multiline = helper.MultilineConfig{
LineStartPattern: "(",
}
},
Expand All @@ -232,7 +232,7 @@ func TestBuild(t *testing.T) {
{
"InvalidLineEndRegex",
func(f *InputConfig) {
f.Multiline = &MultilineConfig{
f.Multiline = helper.MultilineConfig{
LineEndPattern: "(",
}
},
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestEncodings(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
operator, receivedEntries, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.Encoding = tc.encoding
cfg.Encoding = helper.EncodingConfig{Encoding: tc.encoding}
}, nil)

// Popualte the file
Expand Down
Loading