Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for encoding setting of filestream input #24426

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
Reader readerConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
Expand Down Expand Up @@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
Expand Down
37 changes: 13 additions & 24 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,8 @@ type fileMeta struct {
// are actively written by other applications.
type filestream struct {
readerConfig readerConfig
bufferSize int
tailFile bool // TODO
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
lineTerminator readfile.LineTerminator
excludeLines []match.Matcher
includeLines []match.Matcher
maxBytes int
closerConfig closerConfig
}

Expand Down Expand Up @@ -97,9 +91,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("error while creating file identifier: %v", err)
}

encodingFactory, ok := encoding.FindEncoding(config.Encoding)
encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding)
if !ok || encodingFactory == nil {
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

prospector := &fileProspector{
Expand All @@ -111,13 +105,8 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
}

filestream := &filestream{
readerConfig: config.readerConfig,
bufferSize: config.BufferSize,
readerConfig: config.Reader,
encodingFactory: encodingFactory,
lineTerminator: config.LineTerminator,
excludeLines: config.ExcludeLines,
includeLines: config.IncludeLines,
maxBytes: config.MaxBytes,
closerConfig: config.Close,
}

Expand Down Expand Up @@ -191,7 +180,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
return nil, err
}

log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes)
log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
Expand All @@ -211,22 +200,22 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := inp.maxBytes * 4
encReaderMaxBytes := inp.readerConfig.MaxBytes * 4

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
BufferSize: inp.bufferSize,
Terminator: inp.lineTerminator,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
f.Close()
return nil, err
}

r = readfile.NewStripNewline(r, inp.lineTerminator)
r = readfile.NewLimitReader(r, inp.maxBytes)
r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

return r, nil
}
Expand Down Expand Up @@ -335,14 +324,14 @@ func (inp *filestream) readFromSource(
// isDroppedLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool {
if len(inp.includeLines) > 0 {
if !matchAny(inp.includeLines, line) {
if len(inp.readerConfig.IncludeLines) > 0 {
if !matchAny(inp.readerConfig.IncludeLines, line) {
log.Debug("Drop line as it does not match any of the include patterns %s", line)
return true
}
}
if len(inp.excludeLines) > 0 {
if matchAny(inp.excludeLines, line) {
if len(inp.readerConfig.ExcludeLines) > 0 {
if matchAny(inp.readerConfig.ExcludeLines, line) {
log.Debug("Drop line as it does match one of the exclude patterns%s", line)
return true
}
Expand Down
78 changes: 78 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
package filestream

import (
"bytes"
"context"
"os"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
)

Expand Down Expand Up @@ -201,3 +207,75 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) {

env.requireNoEntryInRegistry(testlogName)
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// BOM: 0xEF,0xBB,0xBF
lines := append([]byte{0xEF, 0xBB, 0xBF}, []byte(`#Software: Microsoft Exchange Server
#Version: 14.0.0.0
#Log-type: Message Tracking Log
#Date: 2016-04-05T00:00:02.052Z
#Fields: date-time,client-ip,client-hostname,server-ip,server-hostname,source-context,connector-id,source,event-id,internal-message-id,message-id,recipient-address,recipient-status,total-bytes,recipient-count,related-recipient-address,reference,message-subject,sender-address,return-path,message-info,directionality,tenant-id,original-client-ip,original-server-ip,custom-data
2016-04-05T00:00:02.052Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:279f077c-216f-4323-a9ee-48e50ffd3cad, Event:269492708, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.022Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-37-DB-F9-F9-B5-F2-42-4F-86-62-E6-5D-FC-0C-A1-41-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-00-1F-D4-B5-0E-00-00-2E-EF-F2-59-0E-E8-2D-46-BC-31-02-85-0D-67-98-43-00-00-37-4A-A3-B3-00-00
2016-04-05T00:00:02.145Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:49cb09c6-5b76-415d-a085-da0ad9079682, Event:269492711, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.038Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-97-8F-07-43-51-44-61-4A-AD-BD-29-D4-97-4E-20-A0-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-8E-8F-BD-EB-57-00-00-3D-FB-CE-26-A4-8D-46-4C-A4-35-0F-A7-9B-FA-D7-B9-00-00-37-44-2F-CA-00-00
`)...)
env.mustWriteLinesToFile(testlogName, lines)

env.waitUntilEventCount(7)

cancelInput()
env.waitUntilInputStops()

messages := env.getOutputMessages()
require.Equal(t, messages[0], "#Software: Microsoft Exchange Server")
}

// test_boms from test_harvester.py
func TestFilestreamUTF16BOMs(t *testing.T) {
encodings := map[string]encoding.Encoding{
"utf-16be-bom": unicode.UTF16(unicode.BigEndian, unicode.UseBOM),
"utf-16le-bom": unicode.UTF16(unicode.LittleEndian, unicode.UseBOM),
}

for name, enc := range encodings {
name := name
encoder := enc.NewEncoder()
t.Run(name, func(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"encoding": name,
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

line := []byte("first line\n")
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
writer.Write(line)
writer.Close()

env.mustWriteLinesToFile(testlogName, buf.Bytes())

env.waitUntilEventCount(1)

env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()
})
}
}