Skip to content

Commit

Permalink
Use PackedForward instead of Message
Browse files Browse the repository at this point in the history
  • Loading branch information
okkez committed May 8, 2017
1 parent 46359ba commit 2769592
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
25 changes: 17 additions & 8 deletions lib/fluent/logger/fluent_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
require 'monitor'
require 'logger'
require 'json'
require 'base64'
require 'securerandom'

module Fluent
module Logger
Expand Down Expand Up @@ -130,17 +132,19 @@ def post_with_time(tag, map, time)
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
if @nanosecond_precision && time.is_a?(Time)
write [tag, EventTime.new(time), map]
write(tag, EventTime.new(time), map)
else
write [tag, time.to_i, map]
write(tag, time.to_i, map)
end
end

def close
@mon.synchronize {
if @pending
begin
send_data(@pending)
@pending.each do |tag, record|
send_data([tag, record].to_msgpack)
end
rescue => e
set_last_error(e)
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
Expand Down Expand Up @@ -200,20 +204,22 @@ def suppress_sec
end
end

def write(msg)
def write(tag, time, map)
begin
data = to_msgpack(msg)
record = to_msgpack([time, map])
rescue => e
set_last_error(e)
msg = [tag, time, map]
@logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
return false
end

@mon.synchronize {
if @pending
@pending << data
@pending[tag] << record
else
@pending = data
@pending = Hash.new{|h, k| h[k] = "" }
@pending[tag] = record
end

# suppress reconnection burst
Expand All @@ -224,7 +230,9 @@ def write(msg)
end

begin
send_data(@pending)
@pending.each do |tag, record|
send_data([tag, record].to_msgpack)
end
@pending = nil
true
rescue => e
Expand Down Expand Up @@ -259,6 +267,7 @@ def send_data(data)
# end
# data = data[n..-1]
#end

true
end

Expand Down
13 changes: 8 additions & 5 deletions spec/fluent_logger_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
logger_io.rewind
log = logger_io.read
expect(log).to match /Failed to connect/
expect(log).to match /Can't send logs to/
expect(log).to match /Can\'t send logs to/
}

it ('post limit over') do
Expand All @@ -206,11 +206,11 @@
expect(fluentd.queue.last).to be_nil

logger_io.rewind
expect(logger_io.read).not_to match /Can't send logs to/
expect(logger_io.read).not_to match /Can\'t send logs to/

logger.post('tag', {'a' => ('c' * 1000)})
logger_io.rewind
expect(logger_io.read).to match /Can't send logs to/
expect(logger_io.read).to match /Can\'t send logs to/
end

it ('log connect error once') do
Expand All @@ -233,8 +233,11 @@ class BufferOverflowHandler

def flush(messages)
@buffer ||= []
MessagePack::Unpacker.new.feed_each(messages) do |msg|
@buffer << msg
messages.each do |tag, message|
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
unpacker.each do |time, record|
@buffer << [tag, time, record]
end
end
end
end
Expand Down

0 comments on commit 2769592

Please sign in to comment.