From 03c4c44a790bf757c8e8e02927f7cae795e59219 Mon Sep 17 00:00:00 2001 From: Zhiyuan He <362583303@qq.com> Date: Tue, 8 Sep 2020 20:35:05 +0800 Subject: [PATCH] Fix fluentd postgresql plugin (#4884) * fix * fix * fix * fix --- .../lib/fluent/plugin/out_pgjson.rb | 96 +++++++++++++------ 1 file changed, 67 insertions(+), 29 deletions(-) diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index afadb1c841..592bd10c94 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -140,8 +140,65 @@ def format(tag, time, record) [Time.at(time).strftime(@time_format), record].to_msgpack end + def insert_framework(hex_id, time, record) + # This function try to insert the framework snapshot into framework history table. + # In some cases, the framework controller may have duplicate logs about one framework attempt, + # or there has been already successful inserted record before. + # To handle it, `insert_framework` executes a "SELECT" first. + # If the uid exists in the table, it will ignore it safely. + # Any error should be raised. + thread = Thread.current + frameworkName = record["objectSnapshot"]["metadata"]["name"] + attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"] + historyType = "retry" + snapshot = record_value(record["objectSnapshot"]) + # use frameworkName + attemptIndex + historyType to generate a uid + uid = Digest::MD5.hexdigest "#{frameworkName}+#{attemptIndex}+#{historyType}" + # select from framework_history, ensure there is no corresponding history object + selectResult = thread[:conn].exec_params('SELECT uid from framework_history where uid=$1', [uid]) + if selectResult.cmd_tuples == 0 + # if there is no existing records, try to insert a new one. + thread[:conn].exec_params("INSERT INTO framework_history (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@historyType_col}\", \"#{@snapshot_col}\") " + + "VALUES ($1, $2, $3, $4, $5, $6, $7)", [time, time, uid, frameworkName, attemptIndex, historyType, snapshot] + ) + else + # if there is an existing record, ignore it. + log.warn "[pgjson] chunk #{hex_id}: ignored framework snapshot object as it already exists, uid=#{uid}" + end + end + + def insert_pod(hex_id, time, record) + # This function try to insert the pod snapshot into pods table. + # In some cases, the framework controller may have duplicate logs about one pod, + # or there has been already successful inserted record before. + # To handle it, `insert_pod` executes a "SELECT" first. + # If the uid exists in the table, it will ignore it safely. + # Any error should be raised. + thread = Thread.current + uid = record["objectSnapshot"]["metadata"]["uid"] + frameworkName = record["objectSnapshot"]["metadata"]["name"][0..31] + attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"] + taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"] + taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"] + taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"] + snapshot = record_value(record["objectSnapshot"]) + selectResult = thread[:conn].exec_params('SELECT uid from pods where uid=$1', [uid]) + if selectResult.cmd_tuples == 0 + # if there is no existing records, try to insert a new one. + thread[:conn].exec("INSERT INTO pods (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@taskroleName_col}\", \"#{@taskroleIndex_col}\", \"#{@taskAttemptIndex_col}\", \"#{@snapshot_col}\") " + + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", [time, time, uid, frameworkName, attemptIndex, taskroleName, taskroleIndex, taskAttemptIndex, snapshot] + ) + else + # if there is an existing record, ignore it. + log.warn "[pgjson] chunk #{hex_id}: ignored pod snapshot object as it already exists, uid=#{uid}" + end + + + end + def write(chunk) - log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" + hex_id = dump_unique_id_hex chunk.unique_id + log.info "[pgjson] chunk #{hex_id} received" thread = Thread.current if ! thread.key?(:conn) init_connection @@ -150,44 +207,24 @@ def write(chunk) begin chunk.msgpack_each do |time, record| kind = record["objectSnapshot"]["kind"] - log.debug "log type: #{kind}" + log.info "[pgjson] object type #{kind} in chunk #{hex_id}" if kind == "Framework" - thread[:conn].exec("COPY framework_history (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@historyType_col}\", \"#{@snapshot_col}\") FROM STDIN WITH DELIMITER E'\\x01'") - frameworkName = record["objectSnapshot"]["metadata"]["name"] - attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"] - historyType = "retry" - snapshot = record_value(record["objectSnapshot"]) - # use frameworkName + attemptIndex + historyType to generate a uid - uid = Digest::MD5.hexdigest "#{frameworkName}+#{attemptIndex}+#{historyType}" - thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{snapshot}\n" + insert_framework hex_id, time, record elsif kind == "Pod" - thread[:conn].exec("COPY pods (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@taskroleName_col}\", \"#{@taskroleIndex_col}\", \"#{@taskAttemptIndex_col}\", \"#{@snapshot_col}\") FROM STDIN WITH DELIMITER E'\\x01'") - uid = record["objectSnapshot"]["metadata"]["uid"] - frameworkName = record["objectSnapshot"]["metadata"]["name"][0..31] - attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"] - taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"] - taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"] - taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"] - snapshot = record_value(record["objectSnapshot"]) - thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{taskAttemptIndex}\x01#{snapshot}\n" + insert_pod hex_id, time, record end end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error reset_connection # try to reset broken connection, and wait for next retry - log.debug "%s while copy data: %s" % [ err.class.name, err.message ] + log.warn "[pgjson] connection error happens for chunk #{hex_id}. message: #{err.message}" retry rescue PG::Error => err - log.debug "[pgjson] [write] Error while writing, error is #{err.class}" - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - thread[:conn].put_copy_end( errmsg ) - thread[:conn].get_result + log.warn "[pgjson] PG::Error happens for chunk #{hex_id}. message: #{err.message}" + errmsg = "error class %s happens, message: %s" % [ err.class.name, err.message ] raise errmsg else - thread[:conn].put_copy_end - res = thread[:conn].get_result - raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK - log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" + log.info "[pgjson] chunk #{hex_id} writes successfully" end else raise "Cannot connect to db host." @@ -197,10 +234,11 @@ def write(chunk) def record_value(record) thread = Thread.current if @msgpack + # dump record as msgpack "\\#{thread[:conn].escape_bytea(record.to_msgpack)}" else + # dump record as string json = @encoder.dump(record) - json.gsub!(/\\/){ '\\\\' } json end end