Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add possibility to force flush logs after certain period of time #216

Merged
merged 15 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ If set, the `multiline` configuration block instructs the `file_input` operator
The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.
If using multiline, last log can sometimes be not flushed due to waiting for more content.
In order to forcefully flush last buffered log after certain period of time,
set `force_flush_period` option to [duration string](https://golang.org/pkg/time/#ParseDuration),
eg: `5s`, `1m`. It's by default `0s` which means, that no force flushing will be performed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, I think we should use helper.Duration instead of time.Duration. We can then link to https://github.com/open-telemetry/opentelemetry-log-collection/blob/main/docs/types/duration.md, which explains formatting sufficiently.

This is basically the same thing, but with a default unit of seconds, since nanoseconds is basically never reasonable in this context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated documentation


Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.

### File rotation

Expand Down
30 changes: 17 additions & 13 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewInputConfig(operatorID string) *InputConfig {
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
ForceFlush: helper.NewForceFlushConfig(),
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
Expand All @@ -58,17 +59,18 @@ type InputConfig struct {
Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
ForceFlush helper.ForceFlushConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -117,7 +119,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
// Ensure that multiline is buildable
_, err = c.Multiline.Build(encoding.Encoding, false, nil)
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,13 +159,14 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
Multiline: c.Multiline,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is just a naming issue, but I'll have a difficult time accepting that the splitfunc is being delegated to a Multiline struct. Put another way, we need a split func regardless of whether or not we are using multiline splitting.

I suspect that we can accomplish the same functionality as you are implementing, but by composing things in a better way.

Would it make sense to create a new struct which would pair the concept of a split func with the concept of a flush? Maybe something like:

type Splitter struct {
    splitFunc // can be simple like '\n' or multiline config
    flusher     // forces a flush based on timing
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this further, I'm not convinced we even need to do what I've suggested here. Please see my related comment on reader.go. I'll leave this comment in place as a possible alternative, but I think we need to start by justifying the need to alter splitFunc behavior in any way.

PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
FilePathResolvedField: filePathResolvedField,
FileNameResolvedField: fileNameResolvedField,
startAtBeginning: startAtBeginning,
ForceFlush: c.ForceFlush,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
Expand Down
23 changes: 20 additions & 3 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type InputOperator struct {
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
Multiline helper.MultilineConfig
ForceFlush helper.ForceFlushConfig
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Expand Down Expand Up @@ -323,7 +324,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
}

// If we don't match any previously known files, create a new reader from scratch
newReader, err := f.NewReader(file.Name(), file, fp)
splitFunc, forceFlush, err := f.getMultiline()
if err != nil {
return nil, err
}
newReader, err := f.NewReader(file.Name(), file, fp, splitFunc, forceFlush)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,7 +398,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {
// Decode each of the known files
f.knownFiles = make([]*Reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
newReader, err := f.NewReader("", nil, nil)
splitFunc, forceFlush, err := f.getMultiline()
if err != nil {
return err
}
newReader, err := f.NewReader("", nil, nil, splitFunc, forceFlush)
if err != nil {
return err
}
Expand All @@ -405,3 +414,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {

return nil
}

// getMultiline returns splitFunc, related ForceFlush structure and error eventually
func (f *InputOperator) getMultiline() (bufio.SplitFunc, *helper.ForceFlush, error) {
force := f.ForceFlush.Build()
splitFunc, err := f.Multiline.Build(f.encoding.Encoding, false, force)

return splitFunc, force, err
}
17 changes: 13 additions & 4 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,10 @@ func TestStartAtEndNewFile(t *testing.T) {
// even if the file doesn't end in a newline
func TestNoNewline(t *testing.T) {
t.Parallel()
t.Skip()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.ForceFlush = helper.NewForceFlushConfig()
cfg.ForceFlush.Period.Duration = time.Nanosecond
}, nil)

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\ntestlog2")
Expand Down Expand Up @@ -625,7 +627,11 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
tempCopy := openFile(t, temp.Name())
fp, err := operator.NewFingerprint(temp)
require.NoError(t, err)
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)

splitFunc, forceFlush, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitFunc, forceFlush)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -666,7 +672,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
splitFunc, forceFlush, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitFunc, forceFlush)
require.NoError(t, err)
defer reader.Close()

Expand Down
17 changes: 14 additions & 3 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"golang.org/x/text/transform"

"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

// File attributes contains information about file paths
Expand Down Expand Up @@ -70,11 +71,14 @@ type Reader struct {
decoder *encoding.Decoder
decodeBuffer []byte

splitFunc bufio.SplitFunc
forceFlush *helper.ForceFlush

*zap.SugaredLogger `json:"-"`
}

// NewReader creates a new file reader
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (*Reader, error) {
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitFunc bufio.SplitFunc, force *helper.ForceFlush) (*Reader, error) {
r := &Reader{
Fingerprint: fp,
file: file,
Expand All @@ -83,13 +87,15 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
decoder: f.encoding.Encoding.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
fileAttributes: f.resolveFileAttributes(path),
splitFunc: splitFunc,
forceFlush: force,
}
return r, nil
}

// Copy creates a deep copy of a Reader
func (r *Reader) Copy(file *os.File) (*Reader, error) {
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy())
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitFunc, r.forceFlush)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +122,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
return
}

scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.fileInput.SplitFunc)
scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitFunc)

// Iterate over the tokenized file, emitting entries as we go
for {
Expand All @@ -131,8 +137,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if err := getScannerError(scanner); err != nil {
r.Errorw("Failed during scan", zap.Error(err))
}

// Force flush eventually in next iteration
r.forceFlush.CheckAndFlush()
break
}
// Update information about last flush time
r.forceFlush.Flushed()

if err := r.emit(ctx, scanner.Bytes()); err != nil {
r.Error("Failed to emit entry", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil)
if err != nil {
return nil, err
}
Expand Down
Loading