Skip to content

Commit

Permalink
Fix input buffer on encoding problem (elastic#2661) (elastic#2669)
Browse files Browse the repository at this point in the history
Based on elastic#2416
  • Loading branch information
ruflin authored and tsg committed Oct 4, 2016
1 parent 565a14b commit 864643d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di
*Topbeat*

*Filebeat*
- Fix input buffer on encoding problem

*Winlogbeat*

Expand Down
23 changes: 13 additions & 10 deletions filebeat/harvester/reader/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (l *Line) advance() error {
sz, err := l.decode(idx + len(l.nl))
if err != nil {
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = idx + len(l.nl)
}

// consume transformed bytes from input buffer
Expand All @@ -157,19 +159,20 @@ func (l *Line) decode(end int) (int, error) {
var nDst, nSrc int

nDst, nSrc, err = l.decoder.Transform(buffer, inBytes[start:end], false)

start += nSrc

l.outBuffer.Write(buffer[:nDst])

if err != nil {
if err == transform.ErrShortDst { // continue transforming
// Reset error as decoding continues
err = nil
continue
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
l.outBuffer.Write(inBytes[0:end])
start = end
break
}
break

// Reset error as decoding continues
err = nil
}

start += nSrc
l.outBuffer.Write(buffer[:nDst])
}

l.byteCount += start
Expand Down
47 changes: 47 additions & 0 deletions filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# coding=utf-8

from filebeat import BaseTest
import os
import codecs
Expand Down Expand Up @@ -756,3 +758,48 @@ def test_truncate(self):
# Check that only 1 registry entry as original was only truncated
data = self.get_registry()
assert len(data) == 1


def test_decode_error(self):
"""
Tests that in case of a decoding error it is handled gracefully
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
encoding="GBK", # Set invalid encoding for entry below which is actually uft-8
)

os.mkdir(self.working_dir + "/log/")

logfile = self.working_dir + "/log/test.log"

with open(logfile, 'w') as file:
file.write("hello world1" + "\n")

file.write('<meta content="瞭解「Google 商業解決方案」提供的各類服務軟件如何助您分析資料、刊登廣告、提升網站成效等。" name="description">' + '\n')
file.write("hello world2" + "\n")

filebeat = self.start_beat()

# Make sure both files were read
self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)

# Wait until error shows up
self.wait_until(
lambda: self.log_contains("Error decoding line: simplifiedchinese: invalid GBK encoding"),
max_timeout=5)

filebeat.check_kill_and_wait()

# Check that only 1 registry entry as original was only truncated
data = self.get_registry()
assert len(data) == 1

output = self.read_output_json()
assert output[2]["message"] == "hello world2"




0 comments on commit 864643d

Please sign in to comment.