Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Fix fluentd postgresql plugin (#4884)
Browse files Browse the repository at this point in the history
* fix

* fix

* fix

* fix
  • Loading branch information
hzy46 authored Sep 8, 2020
1 parent a0ee647 commit 03c4c44
Showing 1 changed file with 67 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,65 @@ def format(tag, time, record)
[Time.at(time).strftime(@time_format), record].to_msgpack
end

def insert_framework(hex_id, time, record)
# This function try to insert the framework snapshot into framework history table.
# In some cases, the framework controller may have duplicate logs about one framework attempt,
# or there has been already successful inserted record before.
# To handle it, `insert_framework` executes a "SELECT" first.
# If the uid exists in the table, it will ignore it safely.
# Any error should be raised.
thread = Thread.current
frameworkName = record["objectSnapshot"]["metadata"]["name"]
attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"]
historyType = "retry"
snapshot = record_value(record["objectSnapshot"])
# use frameworkName + attemptIndex + historyType to generate a uid
uid = Digest::MD5.hexdigest "#{frameworkName}+#{attemptIndex}+#{historyType}"
# select from framework_history, ensure there is no corresponding history object
selectResult = thread[:conn].exec_params('SELECT uid from framework_history where uid=$1', [uid])
if selectResult.cmd_tuples == 0
# if there is no existing records, try to insert a new one.
thread[:conn].exec_params("INSERT INTO framework_history (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@historyType_col}\", \"#{@snapshot_col}\") " +
"VALUES ($1, $2, $3, $4, $5, $6, $7)", [time, time, uid, frameworkName, attemptIndex, historyType, snapshot]
)
else
# if there is an existing record, ignore it.
log.warn "[pgjson] chunk #{hex_id}: ignored framework snapshot object as it already exists, uid=#{uid}"
end
end

def insert_pod(hex_id, time, record)
# This function try to insert the pod snapshot into pods table.
# In some cases, the framework controller may have duplicate logs about one pod,
# or there has been already successful inserted record before.
# To handle it, `insert_pod` executes a "SELECT" first.
# If the uid exists in the table, it will ignore it safely.
# Any error should be raised.
thread = Thread.current
uid = record["objectSnapshot"]["metadata"]["uid"]
frameworkName = record["objectSnapshot"]["metadata"]["name"][0..31]
attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"]
taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"]
taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"]
taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"]
snapshot = record_value(record["objectSnapshot"])
selectResult = thread[:conn].exec_params('SELECT uid from pods where uid=$1', [uid])
if selectResult.cmd_tuples == 0
# if there is no existing records, try to insert a new one.
thread[:conn].exec("INSERT INTO pods (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@taskroleName_col}\", \"#{@taskroleIndex_col}\", \"#{@taskAttemptIndex_col}\", \"#{@snapshot_col}\") " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", [time, time, uid, frameworkName, attemptIndex, taskroleName, taskroleIndex, taskAttemptIndex, snapshot]
)
else
# if there is an existing record, ignore it.
log.warn "[pgjson] chunk #{hex_id}: ignored pod snapshot object as it already exists, uid=#{uid}"
end


end

def write(chunk)
log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}"
hex_id = dump_unique_id_hex chunk.unique_id
log.info "[pgjson] chunk #{hex_id} received"
thread = Thread.current
if ! thread.key?(:conn)
init_connection
Expand All @@ -150,44 +207,24 @@ def write(chunk)
begin
chunk.msgpack_each do |time, record|
kind = record["objectSnapshot"]["kind"]
log.debug "log type: #{kind}"
log.info "[pgjson] object type #{kind} in chunk #{hex_id}"
if kind == "Framework"
thread[:conn].exec("COPY framework_history (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@historyType_col}\", \"#{@snapshot_col}\") FROM STDIN WITH DELIMITER E'\\x01'")
frameworkName = record["objectSnapshot"]["metadata"]["name"]
attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"]
historyType = "retry"
snapshot = record_value(record["objectSnapshot"])
# use frameworkName + attemptIndex + historyType to generate a uid
uid = Digest::MD5.hexdigest "#{frameworkName}+#{attemptIndex}+#{historyType}"
thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{snapshot}\n"
insert_framework hex_id, time, record
elsif kind == "Pod"
thread[:conn].exec("COPY pods (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@taskroleName_col}\", \"#{@taskroleIndex_col}\", \"#{@taskAttemptIndex_col}\", \"#{@snapshot_col}\") FROM STDIN WITH DELIMITER E'\\x01'")
uid = record["objectSnapshot"]["metadata"]["uid"]
frameworkName = record["objectSnapshot"]["metadata"]["name"][0..31]
attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"]
taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"]
taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"]
taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"]
snapshot = record_value(record["objectSnapshot"])
thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{taskAttemptIndex}\x01#{snapshot}\n"
insert_pod hex_id, time, record
end
end
rescue PG::ConnectionBad, PG::UnableToSend => err
# connection error
reset_connection # try to reset broken connection, and wait for next retry
log.debug "%s while copy data: %s" % [ err.class.name, err.message ]
log.warn "[pgjson] connection error happens for chunk #{hex_id}. message: #{err.message}"
retry
rescue PG::Error => err
log.debug "[pgjson] [write] Error while writing, error is #{err.class}"
errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
thread[:conn].put_copy_end( errmsg )
thread[:conn].get_result
log.warn "[pgjson] PG::Error happens for chunk #{hex_id}. message: #{err.message}"
errmsg = "error class %s happens, message: %s" % [ err.class.name, err.message ]
raise errmsg
else
thread[:conn].put_copy_end
res = thread[:conn].get_result
raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK
log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}"
log.info "[pgjson] chunk #{hex_id} writes successfully"
end
else
raise "Cannot connect to db host."
Expand All @@ -197,10 +234,11 @@ def write(chunk)
def record_value(record)
thread = Thread.current
if @msgpack
# dump record as msgpack
"\\#{thread[:conn].escape_bytea(record.to_msgpack)}"
else
# dump record as string
json = @encoder.dump(record)
json.gsub!(/\\/){ '\\\\' }
json
end
end
Expand Down

0 comments on commit 03c4c44

Please sign in to comment.