diff --git a/tests/ruby/helpers/pg_socket.rb b/tests/ruby/helpers/pg_socket.rb new file mode 100644 index 00000000..42234491 --- /dev/null +++ b/tests/ruby/helpers/pg_socket.rb @@ -0,0 +1,259 @@ +require 'socket' +require 'digest/md5' + +BACKEND_MESSAGE_CODES = { + 'Z' => "ReadyForQuery", + 'C' => "CommandComplete", + 'T' => "RowDescription", + 'D' => "DataRow", + '1' => "ParseComplete", + '2' => "BindComplete", + 'E' => "ErrorResponse", + 's' => "PortalSuspended", +} + +class PostgresSocket + def initialize(host, port) + @port = port + @host = host + @socket = TCPSocket.new @host, @port + @parameters = {} + @verbose = true + end + + def send_md5_password_message(username, password, salt) + m = Digest::MD5.hexdigest(password + username) + m = Digest::MD5.hexdigest(m + salt.map(&:chr).join("")) + m = 'md5' + m + bytes = (m.split("").map(&:ord) + [0]).flatten + message_size = bytes.count + 4 + + message = [] + + message << 'p'.ord + message << [message_size].pack('l>').unpack('CCCC') # 4 + message << bytes + message.flatten! + + + @socket.write(message.pack('C*')) + end + + def send_startup_message(username, database, password) + message = [] + + message << [196608].pack('l>').unpack('CCCC') # 4 + message << "user".split('').map(&:ord) # 4, 8 + message << 0 # 1, 9 + message << username.split('').map(&:ord) # 2, 11 + message << 0 # 1, 12 + message << "database".split('').map(&:ord) # 8, 20 + message << 0 # 1, 21 + message << database.split('').map(&:ord) # 2, 23 + message << 0 # 1, 24 + message << 0 # 1, 25 + message.flatten! + + total_message_size = message.size + 4 + + message_len = [total_message_size].pack('l>').unpack('CCCC') + + @socket.write([message_len + message].flatten.pack('C*')) + + sleep 0.1 + + read_startup_response(username, password) + end + + def read_startup_response(username, password) + message_code, message_len = @socket.recv(5).unpack("al>") + while message_code == 'R' + auth_code = @socket.recv(4).unpack('l>').pop + case auth_code + when 5 # md5 + salt = @socket.recv(4).unpack('CCCC') + send_md5_password_message(username, password, salt) + message_code, message_len = @socket.recv(5).unpack("al>") + when 0 # trust + break + end + end + loop do + message_code, message_len = @socket.recv(5).unpack("al>") + if message_code == 'Z' + @socket.recv(1).unpack("a") # most likely I + break # We are good to go + end + if message_code == 'S' + actual_message = @socket.recv(message_len - 4).unpack("C*") + k,v = actual_message.pack('U*').split(/\x00/) + @parameters[k] = v + end + if message_code == 'K' + process_id, secret_key = @socket.recv(message_len - 4).unpack("l>l>") + @parameters["process_id"] = process_id + @parameters["secret_key"] = secret_key + end + end + return @parameters + end + + def cancel_query + socket = TCPSocket.new @host, @port + process_key = @parameters["process_id"] + secret_key = @parameters["secret_key"] + message = [] + message << [16].pack('l>').unpack('CCCC') # 4 + message << [80877102].pack('l>').unpack('CCCC') # 4 + message << [process_key.to_i].pack('l>').unpack('CCCC') # 4 + message << [secret_key.to_i].pack('l>').unpack('CCCC') # 4 + message.flatten! + socket.write(message.flatten.pack('C*')) + socket.close + log "[F] Sent CancelRequest message" + end + + def send_query_message(query) + query_size = query.length + message_size = 1 + 4 + query_size + message = [] + message << "Q".ord + message << [message_size].pack('l>').unpack('CCCC') # 4 + message << query.split('').map(&:ord) # 2, 11 + message << 0 # 1, 12 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent Q message (#{query})" + end + + def send_parse_message(query) + query_size = query.length + message_size = 2 + 2 + 4 + query_size + message = [] + message << "P".ord + message << [message_size].pack('l>').unpack('CCCC') # 4 + message << 0 # unnamed statement + message << query.split('').map(&:ord) # 2, 11 + message << 0 # 1, 12 + message << [0, 0] + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent P message (#{query})" + end + + def send_bind_message + message = [] + message << "B".ord + message << [12].pack('l>').unpack('CCCC') # 4 + message << 0 # unnamed statement + message << 0 # unnamed statement + message << [0, 0] # 2 + message << [0, 0] # 2 + message << [0, 0] # 2 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent B message" + end + + def send_describe_message(mode) + message = [] + message << "D".ord + message << [6].pack('l>').unpack('CCCC') # 4 + message << mode.ord + message << 0 # unnamed statement + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent D message" + end + + def send_execute_message(limit=0) + message = [] + message << "E".ord + message << [9].pack('l>').unpack('CCCC') # 4 + message << 0 # unnamed statement + message << [limit].pack('l>').unpack('CCCC') # 4 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent E message" + end + + def send_sync_message + message = [] + message << "S".ord + message << [4].pack('l>').unpack('CCCC') # 4 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent S message" + end + + def send_copydone_message + message = [] + message << "c".ord + message << [4].pack('l>').unpack('CCCC') # 4 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent c message" + end + + def send_copyfail_message + message = [] + message << "f".ord + message << [5].pack('l>').unpack('CCCC') # 4 + message << 0 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent f message" + end + + def send_flush_message + message = [] + message << "H".ord + message << [4].pack('l>').unpack('CCCC') # 4 + message.flatten! + @socket.write(message.flatten.pack('C*')) + log "[F] Sent H message" + end + + def read_from_server() + output_messages = [] + retry_count = 0 + message_code = nil + message_len = 0 + loop do + begin + message_code, message_len = @socket.recv_nonblock(5).unpack("al>") + rescue IO::WaitReadable + return output_messages if retry_count > 50 + + retry_count += 1 + sleep(0.01) + next + end + message = { + code: message_code, + len: message_len, + bytes: [] + } + log "[B] #{BACKEND_MESSAGE_CODES[message_code] || ('UnknownMessage(' + message_code + ')')}" + + actual_message_length = message_len - 4 + if actual_message_length > 0 + message[:bytes] = @socket.recv(message_len - 4).unpack("C*") + log "\t#{message[:bytes].join(",")}" + log "\t#{message[:bytes].map(&:chr).join(" ")}" + end + output_messages << message + return output_messages if message_code == 'Z' + end + end + + def log(msg) + return unless @verbose + + puts msg + end + + def close + @socket.close + end +end diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index 13dc6686..ad4c32a4 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -2,6 +2,7 @@ require 'ostruct' require_relative 'pgcat_process' require_relative 'pg_instance' +require_relative 'pg_socket' class ::Hash def deep_merge(second) diff --git a/tests/ruby/protocol_spec.rb b/tests/ruby/protocol_spec.rb new file mode 100644 index 00000000..9737650b --- /dev/null +++ b/tests/ruby/protocol_spec.rb @@ -0,0 +1,155 @@ +# frozen_string_literal: true +require_relative 'spec_helper' + + +describe "Portocol handling" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 1, "session") } + let(:sequence) { [] } + let(:pgcat_socket) { PostgresSocket.new('localhost', processes.pgcat.port) } + let(:pgdb_socket) { PostgresSocket.new('localhost', processes.all_databases.first.port) } + + after do + pgdb_socket.close + pgcat_socket.close + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + def run_comparison(sequence, socket_a, socket_b) + sequence.each do |msg, *args| + socket_a.send(msg, *args) + socket_b.send(msg, *args) + + compare_messages( + socket_a.read_from_server, + socket_b.read_from_server + ) + end + end + + def compare_messages(msg_arr0, msg_arr1) + if msg_arr0.count != msg_arr1.count + error_output = [] + + error_output << "#{msg_arr0.count} : #{msg_arr1.count}" + error_output << "PgCat Messages" + error_output += msg_arr0.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" } + error_output << "PgServer Messages" + error_output += msg_arr1.map { |message| "\t#{message[:code]} - #{message[:bytes].map(&:chr).join(" ")}" } + error_desc = error_output.join("\n") + raise StandardError, "Message count mismatch #{error_desc}" + end + + (0..msg_arr0.count - 1).all? do |i| + msg0 = msg_arr0[i] + msg1 = msg_arr1[i] + + result = [ + msg0[:code] == msg1[:code], + msg0[:len] == msg1[:len], + msg0[:bytes] == msg1[:bytes], + ].all? + + next result if result + + if result == false + error_string = [] + if msg0[:code] != msg1[:code] + error_string << "code #{msg0[:code]} != #{msg1[:code]}" + end + if msg0[:len] != msg1[:len] + error_string << "len #{msg0[:len]} != #{msg1[:len]}" + end + if msg0[:bytes] != msg1[:bytes] + error_string << "bytes #{msg0[:bytes]} != #{msg1[:bytes]}" + end + err = error_string.join("\n") + + raise StandardError, "Message mismatch #{err}" + end + end + end + + RSpec.shared_examples "at parity with database" do + before do + pgcat_socket.send_startup_message("sharding_user", "sharded_db", "sharding_user") + pgdb_socket.send_startup_message("sharding_user", "shard0", "sharding_user") + end + + it "works" do + run_comparison(sequence, pgcat_socket, pgdb_socket) + end + end + + context "Cancel Query" do + let(:sequence) { + [ + [:send_query_message, "SELECT pg_sleep(5)"], + [:cancel_query] + ] + } + + it_behaves_like "at parity with database" + end + + xcontext "Simple query after parse" do + let(:sequence) { + [ + [:send_parse_message, "SELECT 5"], + [:send_query_message, "SELECT 1"], + [:send_bind_message], + [:send_describe_message, "P"], + [:send_execute_message], + [:send_sync_message], + ] + } + + # Known to fail due to PgCat not supporting flush + it_behaves_like "at parity with database" + end + + xcontext "Flush message" do + let(:sequence) { + [ + [:send_parse_message, "SELECT 1"], + [:send_flush_message] + ] + } + + # Known to fail due to PgCat not supporting flush + it_behaves_like "at parity with database" + end + + xcontext "Bind without parse" do + let(:sequence) { + [ + [:send_bind_message] + ] + } + # This is known to fail. + # Server responds immediately, Proxy buffers the message + it_behaves_like "at parity with database" + end + + context "Simple message" do + let(:sequence) { + [[:send_query_message, "SELECT 1"]] + } + + it_behaves_like "at parity with database" + end + + context "Extended protocol" do + let(:sequence) { + [ + [:send_parse_message, "SELECT 1"], + [:send_bind_message], + [:send_describe_message, "P"], + [:send_execute_message], + [:send_sync_message], + ] + } + + it_behaves_like "at parity with database" + end +end