Skip to content

Commit

Permalink
Change connection setup to respect connect_timeout parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
larskanis committed May 18, 2022
1 parent a8dca16 commit c008f3c
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 125 deletions.
240 changes: 135 additions & 105 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,6 @@ def self.connect_hash_to_string( hash )
hash.map { |k,v| "#{k}=#{quote_connstr(v)}" }.join( ' ' )
end

# Decode a connection string to Hash options
#
# Value are properly unquoted and unescaped.
def self.connect_string_to_hash( str )
options = {}
key = nil
value = String.new
str.scan(/\G\s*(?>([^\s\\\']+)\s*=\s*|([^\s\\\']+)|'((?:[^\'\\]|\\.)*)'|(\\.?)|(\S))(\s|\z)?/m) do
|k, word, sq, esc, garbage, sep|
raise ArgumentError, "unterminated quoted string in connection info string: #{str.inspect}" if garbage
if k
key = k
else
value << (word || (sq || esc).gsub(/\\(.)/, '\\1'))
end
if sep
raise ArgumentError, "missing = after #{value.inspect}" unless key
options[key.to_sym] = value
key = nil
value = String.new
end
end
options
end

# URI defined in RFC3986
# This regexp is modified to allow host to specify multiple comma separated components captured as <hostports> and to disallow comma in hostnames.
# Taken from: https://github.com/ruby/ruby/blob/be04006c7d2f9aeb7e9d8d09d945b3a9c7850202/lib/uri/rfc3986_parser.rb#L6
HOST_AND_PORT = /(?<hostport>(?<host>(?<IP-literal>\[(?:(?<IPv6address>(?:\h{1,4}:){6}(?<ls32>\h{1,4}:\h{1,4}|(?<IPv4address>(?<dec-octet>[1-9]\d|1\d{2}|2[0-4]\d|25[0-5]|\d)\.\g<dec-octet>\.\g<dec-octet>\.\g<dec-octet>))|::(?:\h{1,4}:){5}\g<ls32>|\h{1,4}?::(?:\h{1,4}:){4}\g<ls32>|(?:(?:\h{1,4}:)?\h{1,4})?::(?:\h{1,4}:){3}\g<ls32>|(?:(?:\h{1,4}:){,2}\h{1,4})?::(?:\h{1,4}:){2}\g<ls32>|(?:(?:\h{1,4}:){,3}\h{1,4})?::\h{1,4}:\g<ls32>|(?:(?:\h{1,4}:){,4}\h{1,4})?::\g<ls32>|(?:(?:\h{1,4}:){,5}\h{1,4})?::\h{1,4}|(?:(?:\h{1,4}:){,6}\h{1,4})?::)|(?<IPvFuture>v\h+\.[!$&-.0-;=A-Z_a-z~]+))\])|\g<IPv4address>|(?<reg-name>(?:%\h\h|[-\.!$&-+0-9;=A-Z_a-z~])+))?(?::(?<port>\d*))?)/
POSTGRESQL_URI = /\A(?<URI>(?<scheme>[A-Za-z][+\-.0-9A-Za-z]*):(?<hier-part>\/\/(?<authority>(?:(?<userinfo>(?:%\h\h|[!$&-.0-;=A-Z_a-z~])*)@)?(?<hostports>#{HOST_AND_PORT}(?:,\g<hostport>)*))(?<path-abempty>(?:\/(?<segment>(?:%\h\h|[!$&-.0-;=@-Z_a-z~])*))*)|(?<path-absolute>\/(?:(?<segment-nz>(?:%\h\h|[!$&-.0-;=@-Z_a-z~])+)(?:\/\g<segment>)*)?)|(?<path-rootless>\g<segment-nz>(?:\/\g<segment>)*)|(?<path-empty>))(?:\?(?<query>[^#]*))?(?:\#(?<fragment>(?:%\h\h|[!$&-.0-;=@-Z_a-z~\/?])*))?)\z/

# Parse the connection +args+ into a connection-parameter string.
# See PG::Connection.new for valid arguments.
#
Expand All @@ -90,88 +59,41 @@ def self.connect_string_to_hash( str )
# The method adds the option "hostaddr" and "fallback_application_name" if they aren't already set.
# The URI and the options string is passed through and "hostaddr" as well as "fallback_application_name"
# are added to the end.
def self::parse_connect_args( *args )
def self.parse_connect_args( *args )
hash_arg = args.last.is_a?( Hash ) ? args.pop.transform_keys(&:to_sym) : {}
option_string = ""
iopts = {}

if args.length == 1
case args.first
when URI, POSTGRESQL_URI
uri = args.first.to_s
uri_match = POSTGRESQL_URI.match(uri)
if uri_match['query']
iopts = URI.decode_www_form(uri_match['query']).to_h.transform_keys(&:to_sym)
end
# extract "host1,host2" from "host1:5432,host2:5432"
iopts[:host] = uri_match['hostports'].split(',', -1).map do |hostport|
hostmatch = /\A#{HOST_AND_PORT}\z/.match(hostport)
hostmatch['IPv6address'] || hostmatch['IPv4address'] || hostmatch['reg-name']&.gsub(/%(\h\h)/){ $1.hex.chr }
end.join(',')
oopts = {}
when /=/
# Option string style
option_string = args.first.to_s
iopts = connect_string_to_hash(option_string)
oopts = {}
when URI, /=/, /:\/\//
# Option or URL string style
conn_string = args.first.to_s
iopts = PG::Connection.conninfo_parse(conn_string).each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }
else
# Positional parameters (only host given)
iopts[CONNECT_ARGUMENT_ORDER.first.to_sym] = args.first
oopts = iopts.dup
end
else
# Positional parameters
# Positional parameters with host and more
max = CONNECT_ARGUMENT_ORDER.length
raise ArgumentError,
"Extra positional parameter %d: %p" % [ max + 1, args[max] ] if args.length > max
"Extra positional parameter %d: %p" % [ max + 1, args[max] ] if args.length > max

CONNECT_ARGUMENT_ORDER.zip( args ) do |(k,v)|
iopts[ k.to_sym ] = v if v
end
iopts.delete(:tty) # ignore obsolete tty parameter
oopts = iopts.dup
end

iopts.merge!( hash_arg )
oopts.merge!( hash_arg )

# Resolve DNS in Ruby to avoid blocking state while connecting, when it ...
if (host=iopts[:host]) && !iopts[:hostaddr]
hostaddrs = host.split(",", -1).map do |mhost|
if !mhost.empty? && !mhost.start_with?("/") && # isn't UnixSocket
# isn't a path on Windows
(RUBY_PLATFORM !~ /mingw|mswin/ || mhost !~ /\A\w:[\/\\]/)

if Fiber.respond_to?(:scheduler) &&
Fiber.scheduler &&
RUBY_VERSION < '3.1.'

# Use a second thread to avoid blocking of the scheduler.
# `IPSocket.getaddress` isn't fiber aware before ruby-3.1.
Thread.new{ IPSocket.getaddress(mhost) rescue '' }.value
else
IPSocket.getaddress(mhost) rescue ''
end
end
end
oopts[:hostaddr] = hostaddrs.join(",") if hostaddrs.any?
end

if !iopts[:fallback_application_name]
oopts[:fallback_application_name] = $0.sub( /^(.{30}).{4,}(.{30})$/ ){ $1+"..."+$2 }
iopts[:fallback_application_name] = $0.sub( /^(.{30}).{4,}(.{30})$/ ){ $1+"..."+$2 }
end

if uri
uri += uri_match['query'] ? "&" : "?"
uri += URI.encode_www_form( oopts )
return uri
else
option_string += ' ' unless option_string.empty? && oopts.empty?
return option_string + connect_hash_to_string(oopts)
end
return connect_hash_to_string(iopts)
end


# call-seq:
# conn.copy_data( sql [, coder] ) {|sql_result| ... } -> PG::Result
#
Expand Down Expand Up @@ -618,22 +540,46 @@ def cancel

private def async_connect_or_reset(poll_meth)
# Track the progress of the connection, waiting for the socket to become readable/writable before polling it

if (timeo = conninfo_hash[:connect_timeout].to_i) && timeo > 0
# Lowest timeout is 2 seconds - like in libpq
timeo = [timeo, 2].max
stop_time = timeo + Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

poll_status = PG::PGRES_POLLING_WRITING
until poll_status == PG::PGRES_POLLING_OK ||
poll_status == PG::PGRES_POLLING_FAILED

# If the socket needs to read, wait 'til it becomes readable to poll again
case poll_status
when PG::PGRES_POLLING_READING
socket_io.wait_readable
timeout = stop_time&.-(Process.clock_gettime(Process::CLOCK_MONOTONIC))
event = if !timeout || timeout >= 0
# If the socket needs to read, wait 'til it becomes readable to poll again
case poll_status
when PG::PGRES_POLLING_READING
socket_io.wait_readable(timeout)

# ...and the same for when the socket needs to write
when PG::PGRES_POLLING_WRITING
socket_io.wait_writable
# ...and the same for when the socket needs to write
when PG::PGRES_POLLING_WRITING
socket_io.wait_writable(timeout)
end
end
# connection to server at "localhost" (127.0.0.1), port 5433 failed: timeout expired (PG::ConnectionBad)
# connection to server on socket "/var/run/postgresql/.s.PGSQL.5433" failed: No such file or directory
unless event
if self.class.send(:host_is_named_pipe?, host)
connhost = "on socket \"#{host}\""
elsif respond_to?(:hostaddr)
connhost = "at \"#{host}\" (#{hostaddr}), port #{port}"
else
connhost = "at \"#{host}\", port #{port}"
end
raise PG::ConnectionBad, "connection to server #{connhost} failed: timeout expired"
end

# Check to see if it's finished or failed yet
poll_status = send( poll_meth )
p status, PG::Constants.constants.grep(/CONNECTION_/).find{|c| PG::Constants.const_get(c) == status}
@last_status = status unless [PG::CONNECTION_BAD, PG::CONNECTION_OK].include?(status)
end

unless status == PG::CONNECTION_OK
Expand Down Expand Up @@ -701,13 +647,95 @@ class << self
# connection will have its +client_encoding+ set accordingly.
#
# Raises a PG::Error if the connection fails.
def new(*args, **kwargs)
conn = self.connect_start(*args, **kwargs ) or
raise(PG::Error, "Unable to create a new connection")

raise(PG::ConnectionBad, conn.error_message) if conn.status == PG::CONNECTION_BAD
def new(*args, **kwargs, &block)
option_string = parse_connect_args(*args, **kwargs)
iopts = PG::Connection.conninfo_parse(option_string).each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }
iopts = PG::Connection.conndefaults.each_with_object({}){|h, o| o[h[:keyword].to_sym] = h[:val] if h[:val] }.merge(iopts)

errors = []
if iopts[:hostaddr]
# hostaddr is provided -> no need to resolve hostnames
ihostaddrs = iopts[:hostaddr].split(",", -1)
ihosts = iopts[:host].split(",", -1) if iopts[:host]
raise PG::ConnectionBad, "could not match #{ihosts.size} host names to #{ihostaddrs.size} hostaddr values" if ihosts && ihosts.size != ihostaddrs.size

iports = iopts[:port].split(",", -1)
iports = iports * ihostaddrs.size if iports.size == 1
raise PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihostaddrs.size} hosts" if iports.size != ihostaddrs.size

# Try to connect to each hostaddr with separate timeout
ihostaddrs.each_with_index do |ihostaddr, idx|
oopts = iopts.merge(hostaddr: ihostaddr, port: iports[idx])
oopts[:host] = ihosts[idx] if ihosts
c = connect_internal(oopts, errors, &block)
return c if c
end
elsif iopts[:host]
# Resolve DNS in Ruby to avoid blocking state while connecting, when it ...
ihosts = iopts[:host].split(",", -1)

iports = iopts[:port].split(",", -1)
iports = iports * ihosts.size if iports.size == 1
raise PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihosts.size} hosts" if iports.size != ihosts.size

ihosts.each_with_index do |mhost, idx|
unless host_is_named_pipe?(mhost)
addrs = if Fiber.respond_to?(:scheduler) &&
Fiber.scheduler &&
RUBY_VERSION < '3.1.'

# Use a second thread to avoid blocking of the scheduler.
# `TCPSocket.gethostbyname` isn't fiber aware before ruby-3.1.
Thread.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value
else
Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
end

# Try to connect to each host with separate timeout
addrs.each do |addr|
oopts = iopts.merge(hostaddr: addr, host: mhost, port: iports[idx])
c = connect_internal(oopts, errors, &block)
return c if c
end
else
# No hostname to resolve (UnixSocket)
oopts = iopts.merge(host: mhost, port: iports[idx])
c = connect_internal(oopts, errors, &block)
return c if c
end
end
else
# No host given
return connect_internal(iopts, &block)
end
raise PG::ConnectionBad, errors.join("\n")
end
alias async_connect new
alias connect new
alias open new
alias setdb new
alias setdblogin new

conn.send(:async_connect_or_reset, :connect_poll)
private def connect_internal(opts, errors=nil)
begin
conn = self.connect_start(opts) or
raise(PG::Error, "Unable to create a new connection")

raise(PG::ConnectionBad, conn.error_message) if conn.status == PG::CONNECTION_BAD

conn.send(:async_connect_or_reset, :connect_poll)
rescue PG::ConnectionBad => err
if errors && ![PG::CONNECTION_AWAITING_RESPONSE].include?(conn.instance_variable_get(:@last_status))
# Seems to be no authentication error -> try next host
errors << err
#p rescue: err, status_repeat: conn.instance_variable_get(:@last_status)
return nil
else
# Probably an authentication error
#p rescue: err, status_abort: conn.instance_variable_get(:@last_status)
raise
end
end

if block_given?
begin
Expand All @@ -718,11 +746,13 @@ def new(*args, **kwargs)
end
conn
end
alias async_connect new
alias connect new
alias open new
alias setdb new
alias setdblogin new

private def host_is_named_pipe?(host_string)
host_string.empty? || host_string.start_with?("/") || # it's UnixSocket?
host_string.start_with?("@") || # it's UnixSocket in the abstract namespace?
# it's a path on Windows?
(RUBY_PLATFORM =~ /mingw|mswin/ && host_string =~ /\A([\/\\]|\w:[\/\\])/)
end

# call-seq:
# PG::Connection.ping(connection_hash) -> Integer
Expand Down
Loading

0 comments on commit c008f3c

Please sign in to comment.