Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add possibility to force flush logs after certain period of time #216

Merged
merged 15 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
// Ensure that multiline is buildable
_, err = c.Multiline.Build(encoding.Encoding, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,7 +157,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
Multiline: c.Multiline,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is just a naming issue, but I'll have a difficult time accepting that the splitfunc is being delegated to a Multiline struct. Put another way, we need a split func regardless of whether or not we are using multiline splitting.

I suspect that we can accomplish the same functionality as you are implementing, but by composing things in a better way.

Would it make sense to create a new struct which would pair the concept of a split func with the concept of a flush? Maybe something like:

type Splitter struct {
    splitFunc // can be simple like '\n' or multiline config
    flusher     // forces a flush based on timing
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this further, I'm not convinced we even need to do what I've suggested here. Please see my related comment on reader.go. I'll leave this comment in place as a possible alternative, but I think we need to start by justifying the need to alter splitFunc behavior in any way.

PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
Expand Down
19 changes: 15 additions & 4 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package file

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand Down Expand Up @@ -44,7 +43,7 @@ type InputOperator struct {
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
Multiline helper.MultilineConfig
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Expand Down Expand Up @@ -323,7 +322,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
}

// If we don't match any previously known files, create a new reader from scratch
newReader, err := f.NewReader(file.Name(), file, fp)
multiline, err := f.getMultiline()
if err != nil {
return nil, err
}
newReader, err := f.NewReader(file.Name(), file, fp, multiline)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,7 +396,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {
// Decode each of the known files
f.knownFiles = make([]*Reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
newReader, err := f.NewReader("", nil, nil)
multiline, err := f.getMultiline()
if err != nil {
return err
}
newReader, err := f.NewReader("", nil, nil, multiline)
if err != nil {
return err
}
Expand All @@ -405,3 +412,7 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {

return nil
}

func (f *InputOperator) getMultiline() (*helper.Multiline, error) {
return f.Multiline.Build(f.encoding.Encoding, false)
}
11 changes: 9 additions & 2 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,11 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
tempCopy := openFile(t, temp.Name())
fp, err := operator.NewFingerprint(temp)
require.NoError(t, err)
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)

multiline, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -666,7 +670,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
multiline, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline)
require.NoError(t, err)
defer reader.Close()

Expand Down
15 changes: 12 additions & 3 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"golang.org/x/text/transform"

"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

// File attributes contains information about file paths
Expand Down Expand Up @@ -70,11 +71,13 @@ type Reader struct {
decoder *encoding.Decoder
decodeBuffer []byte

multiline *helper.Multiline

*zap.SugaredLogger `json:"-"`
}

// NewReader creates a new file reader
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (*Reader, error) {
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, multiline *helper.Multiline) (*Reader, error) {
r := &Reader{
Fingerprint: fp,
file: file,
Expand All @@ -83,13 +86,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
decoder: f.encoding.Encoding.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
fileAttributes: f.resolveFileAttributes(path),
multiline: multiline,
}
return r, nil
}

// Copy creates a deep copy of a Reader
func (r *Reader) Copy(file *os.File) (*Reader, error) {
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy())
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.multiline)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +120,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
return
}

scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.fileInput.SplitFunc)
scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.multiline.SplitFunc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the only place we are using the split func, so I'm not sure why it needs to be part of the multiline config.

We have two concepts here:

  1. How to split lines
  2. When to force a flush

I'm not convinced these are dependent on each other, or that either is necessarily a concern of multiline behavior. (Of course a split func may implement multiline splitting, but it does not always.)

Can we leave splitFunc as it was and just add a new config that controls flushing?

Copy link
Member Author

@sumo-drosiek sumo-drosiek Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some relations I was unsure how to resolve, so I put both functionality into one struct:

  • splitFunc needs to be aware when to force a flush (so new structure is created and pass to the splitFunc)
  • every file should be flushed separately (so splitFunc is created for every unique reader)

Regarding line splitting. Of course it is not always a multiline behavior, but looking at it from user perspective it is intuitive to use multline config for it, so Multiline struct underneath, than mixing Splitter struct with multiline config, or using splitter config.

I will exclude force a flush to separate config, as logic of it is mostly out of the splitFunc, but not sure how to resolve other dependencies in nice way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes in requested direction, but after all I think Splitter struct seems to be reasonable consensus.

You can see current implementation without Splitter. There are two dependent entities: splitFunc and ForceFlush and I feel that they should be connected together via additional struct. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think consolidating into one struct makes sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest code, please


// Iterate over the tokenized file, emitting entries as we go
for {
Expand All @@ -131,8 +135,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if err := getScannerError(scanner); err != nil {
r.Errorw("Failed during scan", zap.Error(err))
}

// Force flush eventually in next iteration
r.multiline.CheckAndFlush()
break
}
// Update information about last flush time
r.multiline.Flushed()

if err := r.emit(ctx, scanner.Bytes()); err != nil {
r.Error("Failed to emit entry", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
multiline, err := c.Multiline.Build(encoding.Encoding, true)
if err != nil {
return nil, err
}
Expand All @@ -114,7 +114,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
splitFunc: multiline.SplitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
Expand Down
4 changes: 2 additions & 2 deletions operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
multiline, err := c.Multiline.Build(encoding.Encoding, true)
if err != nil {
return nil, err
}
Expand All @@ -97,7 +97,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
splitFunc: multiline.SplitFunc,
resolver: resolver,
}
return []operator.Operator{udpInput}, nil
Expand Down
99 changes: 88 additions & 11 deletions operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,125 @@ import (
"bytes"
"fmt"
"regexp"
"time"

"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
)

type ForceFlush struct {
Force bool
LastFlush time.Time
}

func NewForceFlush() *ForceFlush {
return &ForceFlush{
Force: false,
LastFlush: time.Now(),
}
}

type Multiline struct {
SplitFunc bufio.SplitFunc
force *ForceFlush
forcePeriod time.Duration

// lastFlush > force.LastFlush => we can force flush if no logs are incoming for forcePeriod
// lastFlush = force.LastFlush => last flush was forced, so we do cannot force, we can update lastFlush
// lastFlush < force.LastFlush =>we just forced flush, set lastFlush to force.LastFlush
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
lastFlush time.Time
}

// Flushed update lastFlush with current timestamp
func (m *Multiline) Flushed() {
if m.lastFlush.Sub(m.force.LastFlush) < 0 {
m.lastFlush = m.force.LastFlush
} else {
m.lastFlush = time.Now()
}
}

// CheckAndFlush returns true if data is going to be force flushed
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
func (m *Multiline) CheckAndFlush() {
if m.forcePeriod > 0 && time.Since(m.lastFlush) > m.forcePeriod && m.lastFlush.Sub(m.force.LastFlush) > 0 {
m.force.Force = true
}
}

// NewBasicConfig creates a new Multiline config
func NewMultilineConfig() MultilineConfig {
return MultilineConfig{
LineStartPattern: "",
LineEndPattern: "",
ForceFlushPeriod: "0s",
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
}

// MultilineConfig is the configuration of a multiline helper
type MultilineConfig struct {
LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"`
LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"`
ForceFlushPeriod string `mapstructure:"force_flush_period" json:"force_flush_period" yaml:"force_flush_period"`
}

// Build will build a Multiline operator.
func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) {
return c.getSplitFunc(encoding, flushAtEOF)
func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Multiline, error) {
force := NewForceFlush()
splitFunc, err := c.getSplitFunc(encoding, flushAtEOF, force)
if err != nil {
return nil, err
}

duration, err := time.ParseDuration(c.ForceFlushPeriod)
if err != nil {
return nil, err
}

return &Multiline{
SplitFunc: splitFunc,
force: force,
forcePeriod: duration,
}, nil
}

// getSplitFunc returns split function for bufio.Scanner basing on configured pattern
func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) {
func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) {
endPattern := c.LineEndPattern
startPattern := c.LineStartPattern

switch {
case endPattern != "" && startPattern != "":
return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set")
case endPattern == "" && startPattern == "":
return NewNewlineSplitFunc(encoding, flushAtEOF)
return NewNewlineSplitFunc(encoding, flushAtEOF, force)
case endPattern != "":
re, err := regexp.Compile("(?m)" + c.LineEndPattern)
if err != nil {
return nil, fmt.Errorf("compile line end regex: %s", err)
}
return NewLineEndSplitFunc(re, flushAtEOF), nil
return NewLineEndSplitFunc(re, flushAtEOF, force), nil
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %s", err)
}
return NewLineStartSplitFunc(re, flushAtEOF), nil
return NewLineStartSplitFunc(re, flushAtEOF, force), nil
default:
return nil, fmt.Errorf("unreachable")
}
}

// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that start with a match to the regex pattern provided
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force.Force {
force.Force = false
force.LastFlush = time.Now()
token = trimWhitespaces(data)
advance = len(data)
return
}

firstLoc := re.FindIndex(data)
if firstLoc == nil {
// Flush if no more data is expected
Expand Down Expand Up @@ -123,8 +186,15 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {

// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force.Force {
force.Force = false
force.LastFlush = time.Now()
token = trimWhitespaces(data)
advance = len(data)
return
}
loc := re.FindIndex(data)
if loc == nil {
// Flush if no more data is expected
Expand All @@ -151,7 +221,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {

// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) {
func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) {
newline, err := encodedNewline(encoding)
if err != nil {
return nil, err
Expand All @@ -163,6 +233,13 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.Spl
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force.Force {
force.Force = false
force.LastFlush = time.Now()
token = trimWhitespaces(data)
advance = len(data)
return
}
if atEOF && len(data) == 0 {
return 0, nil, nil
}
Expand Down
Loading