Skip to content

Commit

Permalink
Update encoding noop (#262)
Browse files Browse the repository at this point in the history
* Add new Nop split func and testing for it

* Add test to file input

* WIP

* Add Tests & Modify Split func

* Add test for Error

* Update 'emit' func to dump bytes on nop

* Add MaxLogSize tests

* Change where to decode

* Add Large Log Const
  • Loading branch information
Mrod1598 authored Sep 15, 2021
1 parent 08df24c commit bef2800
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 21 deletions.
2 changes: 1 addition & 1 deletion operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
}

// Ensure that multiline is buildable
_, err = c.Splitter.Build(encoding.Encoding, false)
_, err = c.Splitter.Build(encoding.Encoding, false, int(c.MaxLogSize))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,5 +415,5 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {

// getMultiline returns helper.Splitter structure and error eventually
func (f *InputOperator) getMultiline() (*helper.Splitter, error) {
return f.Splitter.Build(f.encoding.Encoding, false)
return f.Splitter.Build(f.encoding.Encoding, false, f.MaxLogSize)
}
142 changes: 142 additions & 0 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,148 @@ func TestReadExistingLogs(t *testing.T) {
waitForMessage(t, logReceived, "testlog2")
}

// TestReadUsingNopEncoding tests when nop encoding is set, that the splitfunction returns all bytes unchanged.
func TestReadUsingNopEncoding(t *testing.T) {
tcs := []struct {
testName string
input []byte
test func(*testing.T, chan *entry.Entry)
}{
{
"simple",
[]byte("testlog1"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
},
},
{
"longer than maxlogsize",
[]byte("testlog1testlog2testlog3"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
waitForByteMessage(t, c, []byte("testlog2"))
waitForByteMessage(t, c, []byte("testlog3"))
},
},
{
"doesn't hit max log size before eof",
[]byte("testlog1testlog2test"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
waitForByteMessage(t, c, []byte("testlog2"))
waitForByteMessage(t, c, []byte("test"))
},
},
{
"special characters",
[]byte("testlog1\n\ttestlog2\n\t"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
waitForByteMessage(t, c, []byte("\n\ttestlo"))
waitForByteMessage(t, c, []byte("g2\n\t"))
},
},
}

t.Parallel()

for _, tc := range tcs {
t.Run(tc.testName, func(t *testing.T) {
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.MaxLogSize = 8
cfg.Encoding.Encoding = "nop"
}, nil)
// Create a file, then start
temp := openTemp(t, tempDir)
bytesWritten, err := temp.Write(tc.input)
require.Greater(t, bytesWritten, 0)
require.NoError(t, err)
require.NoError(t, operator.Start(testutil.NewMockPersister("test")))
defer operator.Stop()

tc.test(t, logReceived)
})
}
}

func TestNopEncodingDifferentLogSizes(t *testing.T) {
tcs := []struct {
testName string
input []byte
test func(*testing.T, chan *entry.Entry)
maxLogSize helper.ByteSize
}{
{
"same size",
[]byte("testlog1"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
},
8,
},
{
"massive log size",
[]byte("testlog1"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
},
8000000,
},
{
"slightly larger log size",
[]byte("testlog1"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog1"))
},
9,
},
{
"slightly smaller log size",
[]byte("testlog1"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("testlog"))
waitForByteMessage(t, c, []byte("1"))
},
7,
},
{
"tiny log size",
[]byte("testlog1"),
func(t *testing.T, c chan *entry.Entry) {
waitForByteMessage(t, c, []byte("t"))
waitForByteMessage(t, c, []byte("e"))
waitForByteMessage(t, c, []byte("s"))
waitForByteMessage(t, c, []byte("t"))
waitForByteMessage(t, c, []byte("l"))
waitForByteMessage(t, c, []byte("o"))
waitForByteMessage(t, c, []byte("g"))
waitForByteMessage(t, c, []byte("1"))
},
1,
},
}

t.Parallel()

for _, tc := range tcs {
t.Run(tc.testName, func(t *testing.T) {
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.MaxLogSize = tc.maxLogSize
cfg.Encoding.Encoding = "nop"
}, nil)
// Create a file, then start
temp := openTemp(t, tempDir)
bytesWritten, err := temp.Write(tc.input)
require.Greater(t, bytesWritten, 0)
require.NoError(t, err)
require.NoError(t, operator.Start(testutil.NewMockPersister("test")))
defer operator.Stop()

tc.test(t, logReceived)
})
}
}

// ReadNewLogs tests that, after starting, if a new file is created
// all the entries in that file are read from the beginning
func TestReadNewLogs(t *testing.T) {
Expand Down
26 changes: 17 additions & 9 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/text/encoding"
"golang.org/x/text/transform"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)
Expand Down Expand Up @@ -166,15 +167,22 @@ func (r *Reader) emit(ctx context.Context, msgBuf []byte) error {
if len(msgBuf) == 0 {
return nil
}

msg, err := r.decode(msgBuf)
if err != nil {
return fmt.Errorf("decode: %s", err)
}

e, err := r.fileInput.NewEntry(msg)
if err != nil {
return fmt.Errorf("create entry: %s", err)
var e *entry.Entry
var err error
if r.fileInput.encoding.Encoding == encoding.Nop {
e, err = r.fileInput.NewEntry(msgBuf)
if err != nil {
return fmt.Errorf("create entry: %s", err)
}
} else {
msg, err := r.decode(msgBuf)
if err != nil {
return fmt.Errorf("decode: %s", err)
}
e, err = r.fileInput.NewEntry(msg)
if err != nil {
return fmt.Errorf("create entry: %s", err)
}
}

if err := e.Set(r.fileInput.FilePathField, r.fileAttributes.Path); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions operator/builtin/input/file/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) {
}
}

func waitForByteMessage(t *testing.T, c chan *entry.Entry, expected []byte) {
select {
case e := <-c:
require.Equal(t, expected, e.Body.([]byte))
case <-time.After(3 * time.Second):
require.FailNow(t, "Timed out waiting for message", expected)
}
}

func waitForMessages(t *testing.T, c chan *entry.Entry, expected []string) {
receivedMessages := make([]string, 0, len(expected))
LOOP:
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
}

// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil)
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil, int(c.MaxLogSize))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
}

// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil)
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil, MaxUDPSize)
if err != nil {
return nil, err
}
Expand Down
34 changes: 28 additions & 6 deletions operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,24 @@ type MultilineConfig struct {
}

// Build will build a Multiline operator.
func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) {
return c.getSplitFunc(encoding, flushAtEOF, force)
func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) {
return c.getSplitFunc(encoding, flushAtEOF, force, maxLogSize)
}

// getSplitFunc returns split function for bufio.Scanner basing on configured pattern
func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) {
func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) {
endPattern := c.LineEndPattern
startPattern := c.LineStartPattern

switch {
case endPattern != "" && startPattern != "":
return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set")
case encodingVar == encoding.Nop && (endPattern != "" || startPattern != ""):
return nil, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")
case encodingVar == encoding.Nop:
return SplitNone(maxLogSize), nil
case endPattern == "" && startPattern == "":
return NewNewlineSplitFunc(encoding, flushAtEOF, force)
return NewNewlineSplitFunc(encodingVar, flushAtEOF, force)
case endPattern != "":
re, err := regexp.Compile("(?m)" + c.LineEndPattern)
if err != nil {
Expand Down Expand Up @@ -204,6 +208,24 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) b
}
}

// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop
func SplitNone(maxLogSize int) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if len(data) >= maxLogSize {
return maxLogSize, data[:maxLogSize], nil
}

if !atEOF {
return 0, nil, nil
}

if len(data) == 0 {
return 0, nil, nil
}
return len(data), data, nil
}
}

// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc {
Expand Down Expand Up @@ -313,9 +335,9 @@ func NewSplitterConfig() SplitterConfig {
}

// Build builds Splitter struct
func (c *SplitterConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Splitter, error) {
func (c *SplitterConfig) Build(encoding encoding.Encoding, flushAtEOF bool, maxLogSize int) (*Splitter, error) {
flusher := c.Flusher.Build()
splitFunc, err := c.Multiline.Build(encoding, flushAtEOF, flusher)
splitFunc, err := c.Multiline.Build(encoding, flushAtEOF, flusher, maxLogSize)

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit bef2800

Please sign in to comment.