Skip to content

Commit

Permalink
Add tests for encoding settings of filestream input (#24426) (#24629)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1db96a3)
  • Loading branch information
kvch committed Mar 18, 2021
1 parent d91c70b commit b571b6f
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 26 deletions.
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
Reader readerConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
Expand Down Expand Up @@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
Expand Down
37 changes: 13 additions & 24 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,8 @@ type fileMeta struct {
// are actively written by other applications.
type filestream struct {
readerConfig readerConfig
bufferSize int
tailFile bool // TODO
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
lineTerminator readfile.LineTerminator
excludeLines []match.Matcher
includeLines []match.Matcher
maxBytes int
closerConfig closerConfig
}

Expand Down Expand Up @@ -97,9 +91,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("error while creating file identifier: %v", err)
}

encodingFactory, ok := encoding.FindEncoding(config.Encoding)
encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding)
if !ok || encodingFactory == nil {
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

prospector := &fileProspector{
Expand All @@ -111,13 +105,8 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
}

filestream := &filestream{
readerConfig: config.readerConfig,
bufferSize: config.BufferSize,
readerConfig: config.Reader,
encodingFactory: encodingFactory,
lineTerminator: config.LineTerminator,
excludeLines: config.ExcludeLines,
includeLines: config.IncludeLines,
maxBytes: config.MaxBytes,
closerConfig: config.Close,
}

Expand Down Expand Up @@ -191,7 +180,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
return nil, err
}

log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes)
log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
Expand All @@ -211,22 +200,22 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := inp.maxBytes * 4
encReaderMaxBytes := inp.readerConfig.MaxBytes * 4

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
BufferSize: inp.bufferSize,
Terminator: inp.lineTerminator,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
f.Close()
return nil, err
}

r = readfile.NewStripNewline(r, inp.lineTerminator)
r = readfile.NewLimitReader(r, inp.maxBytes)
r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

return r, nil
}
Expand Down Expand Up @@ -335,14 +324,14 @@ func (inp *filestream) readFromSource(
// isDroppedLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool {
if len(inp.includeLines) > 0 {
if !matchAny(inp.includeLines, line) {
if len(inp.readerConfig.IncludeLines) > 0 {
if !matchAny(inp.readerConfig.IncludeLines, line) {
log.Debug("Drop line as it does not match any of the include patterns %s", line)
return true
}
}
if len(inp.excludeLines) > 0 {
if matchAny(inp.excludeLines, line) {
if len(inp.readerConfig.ExcludeLines) > 0 {
if matchAny(inp.readerConfig.ExcludeLines, line) {
log.Debug("Drop line as it does match one of the exclude patterns%s", line)
return true
}
Expand Down
137 changes: 137 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
package filestream

import (
"bytes"
"context"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
)

// test_close_renamed from test_harvester.py
Expand Down Expand Up @@ -100,3 +106,134 @@ func TestFilestreamCloseEOF(t *testing.T) {

env.requireOffsetInRegistry(testlogName, expectedOffset)
}

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\nnext is an empty line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, len(testlines))

moreTestlines := []byte("\nafter an empty line\n")
env.mustAppendLinesToFile(testlogName, moreTestlines)

env.waitUntilEventCount(3)
env.requireEventsReceived([]string{
"first log line",
"next is an empty line",
"after an empty line",
})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines))
}

// test_empty_lines_only from test_harvester.py
// This test differs from the original because in filestream
// input offset is no longer persisted when the line is empty.
func TestFilestreamEmptyLinesOnly(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("\n\n\n")
env.mustWriteLinesToFile(testlogName, testlines)

cancelInput()
env.waitUntilInputStops()

env.requireNoEntryInRegistry(testlogName)
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// BOM: 0xEF,0xBB,0xBF
lines := append([]byte{0xEF, 0xBB, 0xBF}, []byte(`#Software: Microsoft Exchange Server
#Version: 14.0.0.0
#Log-type: Message Tracking Log
#Date: 2016-04-05T00:00:02.052Z
#Fields: date-time,client-ip,client-hostname,server-ip,server-hostname,source-context,connector-id,source,event-id,internal-message-id,message-id,recipient-address,recipient-status,total-bytes,recipient-count,related-recipient-address,reference,message-subject,sender-address,return-path,message-info,directionality,tenant-id,original-client-ip,original-server-ip,custom-data
2016-04-05T00:00:02.052Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:279f077c-216f-4323-a9ee-48e50ffd3cad, Event:269492708, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.022Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-37-DB-F9-F9-B5-F2-42-4F-86-62-E6-5D-FC-0C-A1-41-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-00-1F-D4-B5-0E-00-00-2E-EF-F2-59-0E-E8-2D-46-BC-31-02-85-0D-67-98-43-00-00-37-4A-A3-B3-00-00
2016-04-05T00:00:02.145Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:49cb09c6-5b76-415d-a085-da0ad9079682, Event:269492711, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.038Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-97-8F-07-43-51-44-61-4A-AD-BD-29-D4-97-4E-20-A0-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-8E-8F-BD-EB-57-00-00-3D-FB-CE-26-A4-8D-46-4C-A4-35-0F-A7-9B-FA-D7-B9-00-00-37-44-2F-CA-00-00
`)...)
env.mustWriteLinesToFile(testlogName, lines)

env.waitUntilEventCount(7)

cancelInput()
env.waitUntilInputStops()

messages := env.getOutputMessages()
require.Equal(t, messages[0], "#Software: Microsoft Exchange Server")
}

// test_boms from test_harvester.py
func TestFilestreamUTF16BOMs(t *testing.T) {
encodings := map[string]encoding.Encoding{
"utf-16be-bom": unicode.UTF16(unicode.BigEndian, unicode.UseBOM),
"utf-16le-bom": unicode.UTF16(unicode.LittleEndian, unicode.UseBOM),
}

for name, enc := range encodings {
name := name
encoder := enc.NewEncoder()
t.Run(name, func(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"encoding": name,
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

line := []byte("first line\n")
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
writer.Write(line)
writer.Close()

env.mustWriteLinesToFile(testlogName, buf.Bytes())

env.waitUntilEventCount(1)

env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()
})
}
}

0 comments on commit b571b6f

Please sign in to comment.