Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiline reader normalizing multiline character #1552

Merged
merged 1 commit into from
May 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v5.0.0alpha1...v5.0.0-alpha2[View commi
- Fix issue with JSON decoding where `@timestamp` or `type` keys with the wrong type could cause Filebeat
to crash. {issue}1378[1378]
- Fix issue with JSON decoding where values having `null` as values could crash Filebeat. {issue}1466[1466]
- Multiline reader normalizing newline to use `\n`. {pull}1552[1552]

*Winlogbeat*

Expand Down
6 changes: 2 additions & 4 deletions filebeat/harvester/linereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ func createLineReader(
p = processor.NewJSONProcessor(p, jsonConfig)
}

p = processor.NewStripNewline(p)
if mlrConfig != nil {
p, err = processor.NewMultiline(p, maxBytes, mlrConfig)
p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig)
if err != nil {
return nil, err
}

return processor.NewStripNewline(p), nil
}

p = processor.NewStripNewline(p)
return processor.NewLimitProcessor(p, maxBytes), nil
}
36 changes: 25 additions & 11 deletions filebeat/harvester/processor/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
// Errors will force the multiline processor to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type MultiLine struct {
reader LineProcessor
pred matcher
maxBytes int // bytes stored in content
maxLines int
reader LineProcessor
pred matcher
maxBytes int // bytes stored in content
maxLines int
separator []byte

ts time.Time
content []byte
Expand Down Expand Up @@ -58,6 +59,7 @@ var (
// line events into stream of multi-line events.
func NewMultiline(
r LineProcessor,
separator string,
maxBytes int,
config *config.MultilineConfig,
) (*MultiLine, error) {
Expand Down Expand Up @@ -102,11 +104,12 @@ func NewMultiline(
}

mlr := &MultiLine{
reader: r,
pred: matcher,
state: (*MultiLine).readFirst,
maxBytes: maxBytes,
maxLines: maxLines,
reader: r,
pred: matcher,
state: (*MultiLine).readFirst,
maxBytes: maxBytes,
maxLines: maxLines,
separator: []byte(separator),
}
return mlr, nil
}
Expand Down Expand Up @@ -239,14 +242,25 @@ func (mlr *MultiLine) addLine(l Line) {
return
}

space := mlr.maxBytes - len(mlr.content)
sz := len(mlr.content)
addSeparator := len(mlr.content) > 0 && len(mlr.separator) > 0
if addSeparator {
sz += len(mlr.separator)
}

space := mlr.maxBytes - sz
spaceLeft := (mlr.maxBytes <= 0 || space > 0) &&
(mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines)
if spaceLeft {
if space < 0 || space > len(l.Content) {
space = len(l.Content)
}
mlr.content = append(mlr.content, l.Content[:space]...)

tmp := mlr.content
if addSeparator {
tmp = append(tmp, mlr.separator...)
}
mlr.content = append(tmp, l.Content[:space]...)
mlr.numLines++
}

Expand Down
5 changes: 3 additions & 2 deletions filebeat/harvester/processor/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"errors"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -89,7 +90,7 @@ func testMultilineOK(t *testing.T, cfg config.MultilineConfig, expected ...strin
var tsZero time.Time

assert.NotEqual(t, tsZero, line.Ts)
assert.Equal(t, expected[i], string(line.Content))
assert.Equal(t, strings.TrimRight(expected[i], "\r\n "), string(line.Content))
assert.Equal(t, len(expected[i]), int(line.Bytes))
}
}
Expand All @@ -111,7 +112,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg config.Multil
t.Fatalf("Failed to initialize line reader: %v", err)
}

reader, err = NewMultiline(reader, 1<<20, &cfg)
reader, err = NewMultiline(NewStripNewline(reader), "\n", 1<<20, &cfg)
if err != nil {
t.Fatalf("failed to initializ reader: %v", err)
}
Expand Down