Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert connecting on a host-by-host basis and make sure only authentication errors stop host iteration #485

Merged
merged 5 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ LIBDIR = BASEDIR + 'lib'
EXTDIR = BASEDIR + 'ext'
PKGDIR = BASEDIR + 'pkg'
TMPDIR = BASEDIR + 'tmp'
TESTDIR = BASEDIR + "tmp_test_specs"
TESTDIR = BASEDIR + "tmp_test_*"

DLEXT = RbConfig::CONFIG['DLEXT']
EXT = LIBDIR + "pg_ext.#{DLEXT}"

GEMSPEC = 'pg.gemspec'

CLOBBER.include( TESTDIR.to_s )
CLEAN.include( TESTDIR.to_s )
CLEAN.include( PKGDIR.to_s, TMPDIR.to_s )
CLEAN.include "lib/*/libpq.dll"
CLEAN.include "lib/pg_ext.*"
Expand Down
78 changes: 23 additions & 55 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,17 @@ def cancel
if (timeo = conninfo_hash[:connect_timeout].to_i) && timeo > 0
# Lowest timeout is 2 seconds - like in libpq
timeo = [timeo, 2].max
stop_time = timeo + Process.clock_gettime(Process::CLOCK_MONOTONIC)
host_count = conninfo_hash[:host].to_s.count(",") + 1
stop_time = timeo * host_count + Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

poll_status = PG::PGRES_POLLING_WRITING
until poll_status == PG::PGRES_POLLING_OK ||
poll_status == PG::PGRES_POLLING_FAILED

timeout = stop_time&.-(Process.clock_gettime(Process::CLOCK_MONOTONIC))
# Set single timeout to parameter "connect_timeout" but
# don't exceed total connection time of number-of-hosts * connect_timeout.
timeout = [timeo, stop_time - Process.clock_gettime(Process::CLOCK_MONOTONIC)].min if stop_time
event = if !timeout || timeout >= 0
# If the socket needs to read, wait 'til it becomes readable to poll again
case poll_status
Expand Down Expand Up @@ -600,7 +603,6 @@ def cancel

# Check to see if it's finished or failed yet
poll_status = send( poll_meth )
@last_status = status unless [PG::CONNECTION_BAD, PG::CONNECTION_OK].include?(status)
end

unless status == PG::CONNECTION_OK
Expand Down Expand Up @@ -694,81 +696,47 @@ def new(*args)
errors = []
if iopts[:hostaddr]
# hostaddr is provided -> no need to resolve hostnames
ihostaddrs = iopts[:hostaddr].split(",", -1)

ihosts = iopts[:host].split(",", -1) if iopts[:host]
raise PG::ConnectionBad, "could not match #{ihosts.size} host names to #{ihostaddrs.size} hostaddr values" if ihosts && ihosts.size != ihostaddrs.size

iports = iopts[:port].split(",", -1)
iports = iports * ihostaddrs.size if iports.size == 1
raise PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihostaddrs.size} hosts" if iports.size != ihostaddrs.size

# Try to connect to each hostaddr with separate timeout
ihostaddrs.each_with_index do |ihostaddr, idx|
oopts = iopts.merge(hostaddr: ihostaddr, port: iports[idx])
oopts[:host] = ihosts[idx] if ihosts
c = connect_internal(oopts, errors)
return c if c
end
elsif iopts[:host] && !iopts[:host].empty?
# Resolve DNS in Ruby to avoid blocking state while connecting, when it ...
elsif iopts[:host] && !iopts[:host].empty? && PG.library_version >= 100000
# Resolve DNS in Ruby to avoid blocking state while connecting.
# Multiple comma-separated values are generated, if the hostname resolves to both IPv4 and IPv6 addresses.
# This requires PostgreSQL-10+, so no DNS resolving is done on earlier versions.
ihosts = iopts[:host].split(",", -1)

iports = iopts[:port].split(",", -1)
iports = iports * ihosts.size if iports.size == 1
raise PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihosts.size} hosts" if iports.size != ihosts.size

ihosts.each_with_index do |mhost, idx|
dests = ihosts.each_with_index.flat_map do |mhost, idx|
unless host_is_named_pipe?(mhost)
addrs = if Fiber.respond_to?(:scheduler) &&
if Fiber.respond_to?(:scheduler) &&
Fiber.scheduler &&
RUBY_VERSION < '3.1.'

# Use a second thread to avoid blocking of the scheduler.
# `TCPSocket.gethostbyname` isn't fiber aware before ruby-3.1.
Thread.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value
hostaddrs = Thread.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value
else
Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
end

# Try to connect to each host with separate timeout
addrs.each do |addr|
oopts = iopts.merge(hostaddr: addr, host: mhost, port: iports[idx])
c = connect_internal(oopts, errors)
return c if c
hostaddrs = Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
end
else
# No hostname to resolve (UnixSocket)
oopts = iopts.merge(host: mhost, port: iports[idx])
c = connect_internal(oopts, errors)
return c if c
hostaddrs = [nil]
end
hostaddrs.map { |hostaddr| [hostaddr, mhost, iports[idx]] }
end
iopts.merge!(
hostaddr: dests.map{|d| d[0] }.join(","),
host: dests.map{|d| d[1] }.join(","),
port: dests.map{|d| d[2] }.join(","))
else
# No host given
return connect_internal(iopts)
end
raise PG::ConnectionBad, errors.join("\n")
end

private def connect_internal(opts, errors=nil)
begin
conn = self.connect_start(opts) or
raise(PG::Error, "Unable to create a new connection")
conn = self.connect_start(iopts) or
raise(PG::Error, "Unable to create a new connection")

raise PG::ConnectionBad, conn.error_message if conn.status == PG::CONNECTION_BAD
raise PG::ConnectionBad, conn.error_message if conn.status == PG::CONNECTION_BAD

conn.send(:async_connect_or_reset, :connect_poll)
rescue PG::ConnectionBad => err
if errors && !(conn && [PG::CONNECTION_AWAITING_RESPONSE].include?(conn.instance_variable_get(:@last_status)))
# Seems to be no authentication error -> try next host
errors << err
return nil
else
# Probably an authentication error
raise
end
end
conn.send(:async_connect_or_reset, :connect_poll)
conn
end

Expand Down
Loading