Skip to content

Commit

Permalink
Transition filter_parser from single record to stream
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <athishanna@gmail.com>
  • Loading branch information
Athishpranav2003 committed Aug 29, 2024
1 parent 4a94271 commit 904f7df
Showing 1 changed file with 59 additions and 65 deletions.
124 changes: 59 additions & 65 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,81 +57,75 @@ def configure(conf)
FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
REPLACE_CHAR = '?'.freeze

def filter_with_time(tag, time, record)
raw_value = @accessor.call(record)
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
return FAILED_RESULT
end
end
begin
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
result_time = nil
result_record = nil

@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
time
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
begin
raw_value = @accessor.call(record)
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
new_es.add(time, handle_parsed(tag, record, time, {}))
end
next
end
begin
result_time = nil
result_record = nil

@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
time
else
t.nil? ? time : t
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)

if result_record.nil?
result_time = t
result_record = r
else
t.nil? ? time : t
if @emit_invalid_record_to_error
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
))
end
end
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)

if result_record.nil?
result_time = t
result_record = r
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
))

next unless @reserve_data
next unless result_record.nil?

result_time = time
result_record = handle_parsed(tag, record, time, {})
end
new_es.add(result_time, result_record)
end
else

rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
raise e
end
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

next unless @reserve_data
next unless result_record.nil?

result_time = time
result_record = handle_parsed(tag, record, time, {})
raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
end
end
end

return result_time, result_record
rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
else
return FAILED_RESULT
end
end
end

private

def handle_parsed(tag, record, t, values)
Expand Down

0 comments on commit 904f7df

Please sign in to comment.