Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: Transition from single record to stream #4620

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 27 additions & 51 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,32 @@ def configure(conf)
@parser = parser_create
end

FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
REPLACE_CHAR = '?'.freeze

def filter_with_time(tag, time, record)
raw_value = @accessor.call(record)
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
return FAILED_RESULT
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
begin
raw_value = @accessor.call(record)
if raw_value.nil?
new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data
raise ArgumentError, "#{@key_name} does not exist"
else
filter_one_record(tag, time, record, raw_value) do |result_time, result_record|
new_es.add(result_time, result_record)
end
end
rescue => e
router.emit_error_event(tag, time, record, e) if @emit_invalid_record_to_error
end
end
begin
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
result_time = nil
result_record = nil
new_es
end

private

def filter_one_record(tag, time, record, raw_value)
begin
@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
Expand All @@ -85,55 +88,28 @@ def filter_with_time(tag, time, record)
t.nil? ? time : t
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)

if result_record.nil?
result_time = t
result_record = r
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
))
end
end
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
end

router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) if @emit_invalid_record_to_error
next unless @reserve_data
next unless result_record.nil?

result_time = time
result_record = handle_parsed(tag, record, time, {})
t = time
values = {}
end
yield(t, handle_parsed(tag, record, t, values))
end

return result_time, result_record
rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end
raise e
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0
Athishpranav2003 marked this conversation as resolved.
Show resolved Hide resolved

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
else
return FAILED_RESULT
end
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
end
end

private

def handle_parsed(tag, record, t, values)
if values && @inject_key_prefix
values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]
Expand Down
19 changes: 18 additions & 1 deletion test/plugin/test_filter_parser.rb
Athishpranav2003 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,23 @@ def test_filter

end

def test_filter_with_multiple_records
d1 = create_driver(%[
key_name data
<parse>
@type json
</parse>
])
time = Fluent::EventTime.from_time(@default_time)
d1.run(default_tag: @tag) do
d1.feed(time, {'data' => '[{"xxx_1":"first","yyy":"second"}, {"xxx_2":"first", "yyy_2":"second"}]'})
end
filtered = d1.filtered
assert_equal 2, filtered.length
assert_equal ({"xxx_1"=>"first", "yyy"=>"second"}), filtered[0][1]
assert_equal ({"xxx_2"=>"first", "yyy_2"=>"second"}), filtered[1][1]
end

data(:keep_key_name => false,
:remove_key_name => true)
def test_filter_with_reserved_data(remove_key_name)
Expand Down Expand Up @@ -633,7 +650,7 @@ def test_filter_invalid_byte
def test_filter_key_not_exist
d = create_driver(CONFIG_NOT_IGNORE)
flexmock(d.instance.router).should_receive(:emit_error_event).
with(String, Integer, Hash, ArgumentError.new("data does not exist")).once
with(String, Integer, Hash, ArgumentError).once
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'})
Expand Down
Loading