Skip to content

Commit

Permalink
Revert connecting on a host-by-host basis
Browse files Browse the repository at this point in the history
.. in favor of passing all hosts to libpq at once and instead adjust connect_timeout handling roughtly to how libpq handles it.

The problem is that libpg aborts connecting to multiple hosts, if there's a authentication failure.
But if pg imitates this behaviour, the libpq API doesn't give an exact indication, whether the connection aborted due to an authentication error or due to some other error, which continues the host iteration.
So we can not distinguish between an authentication error and other types of errors, other then by the error message.
But there's the next problem, that the error message is locale dependent and that when both client and server are running on Windows, the error message is often not correctly delivered, which is a known long standing PostgreSQL issue.

This commit therefore changes the execution back to how multiple hosts were handled similar to pg-1.3.x, but with two fixes:
1. Multiple IP addresses to one hostname are handled correctly, (fixes ged#452)
2. and connect_timeout is handled roughly like libpq. (fixes ged#450)

It's only roughly, since the timeout is not strictly per host, but per single socket event, but with a total timeout multiplied with the number-of-hosts.
Exact handling of connect_timeout like libpq is only possible if we connect host-by-host.
  • Loading branch information
larskanis committed Oct 9, 2022
1 parent 801a70f commit 71d1212
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 57 deletions.
77 changes: 23 additions & 54 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,17 @@ def cancel
if (timeo = conninfo_hash[:connect_timeout].to_i) && timeo > 0
# Lowest timeout is 2 seconds - like in libpq
timeo = [timeo, 2].max
stop_time = timeo + Process.clock_gettime(Process::CLOCK_MONOTONIC)
host_count = conninfo_hash[:host].to_s.count(",") + 1
stop_time = timeo * host_count + Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

poll_status = PG::PGRES_POLLING_WRITING
until poll_status == PG::PGRES_POLLING_OK ||
poll_status == PG::PGRES_POLLING_FAILED

timeout = stop_time&.-(Process.clock_gettime(Process::CLOCK_MONOTONIC))
# Set single timeout to parameter "connect_timeout" but
# don't exceed total connection time of number-of-hosts * connect_timeout.
timeout = [timeo, stop_time - Process.clock_gettime(Process::CLOCK_MONOTONIC)].min if stop_time
event = if !timeout || timeout >= 0
# If the socket needs to read, wait 'til it becomes readable to poll again
case poll_status
Expand Down Expand Up @@ -693,81 +696,47 @@ def new(*args)
errors = []
if iopts[:hostaddr]
# hostaddr is provided -> no need to resolve hostnames
ihostaddrs = iopts[:hostaddr].split(",", -1)

ihosts = iopts[:host].split(",", -1) if iopts[:host]
raise PG::ConnectionBad, "could not match #{ihosts.size} host names to #{ihostaddrs.size} hostaddr values" if ihosts && ihosts.size != ihostaddrs.size

iports = iopts[:port].split(",", -1)
iports = iports * ihostaddrs.size if iports.size == 1
raise PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihostaddrs.size} hosts" if iports.size != ihostaddrs.size

# Try to connect to each hostaddr with separate timeout
ihostaddrs.each_with_index do |ihostaddr, idx|
oopts = iopts.merge(hostaddr: ihostaddr, port: iports[idx])
oopts[:host] = ihosts[idx] if ihosts
c = connect_internal(oopts, errors)
return c if c
end
elsif iopts[:host] && !iopts[:host].empty?
# Resolve DNS in Ruby to avoid blocking state while connecting, when it ...
elsif iopts[:host] && !iopts[:host].empty? && PG.library_version >= 100000
# Resolve DNS in Ruby to avoid blocking state while connecting.
# Multiple comma-separated values are generated, if the hostname resolves to both IPv4 and IPv6 addresses.
# This requires PostgreSQL-10+, so no DNS resolving is done on earlier versions.
ihosts = iopts[:host].split(",", -1)

iports = iopts[:port].split(",", -1)
iports = iports * ihosts.size if iports.size == 1
raise PG::ConnectionBad, "could not match #{iports.size} port numbers to #{ihosts.size} hosts" if iports.size != ihosts.size

ihosts.each_with_index do |mhost, idx|
dests = ihosts.each_with_index.flat_map do |mhost, idx|
unless host_is_named_pipe?(mhost)
addrs = if Fiber.respond_to?(:scheduler) &&
if Fiber.respond_to?(:scheduler) &&
Fiber.scheduler &&
RUBY_VERSION < '3.1.'

# Use a second thread to avoid blocking of the scheduler.
# `TCPSocket.gethostbyname` isn't fiber aware before ruby-3.1.
Thread.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value
hostaddrs = Thread.new{ Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue [''] }.value
else
Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
end

# Try to connect to each host with separate timeout
addrs.each do |addr|
oopts = iopts.merge(hostaddr: addr, host: mhost, port: iports[idx])
c = connect_internal(oopts, errors)
return c if c
hostaddrs = Addrinfo.getaddrinfo(mhost, nil, nil, :STREAM).map(&:ip_address) rescue ['']
end
else
# No hostname to resolve (UnixSocket)
oopts = iopts.merge(host: mhost, port: iports[idx])
c = connect_internal(oopts, errors)
return c if c
hostaddrs = [nil]
end
hostaddrs.map { |hostaddr| [hostaddr, mhost, iports[idx]] }
end
iopts.merge!(
hostaddr: dests.map{|d| d[0] }.join(","),
host: dests.map{|d| d[1] }.join(","),
port: dests.map{|d| d[2] }.join(","))
else
# No host given
return connect_internal(iopts)
end
raise PG::ConnectionBad, errors.join("\n")
end

private def connect_internal(opts, errors=nil)
begin
conn = self.connect_start(opts) or
raise(PG::Error, "Unable to create a new connection")
conn = self.connect_start(iopts) or
raise(PG::Error, "Unable to create a new connection")

raise PG::ConnectionBad, conn.error_message if conn.status == PG::CONNECTION_BAD
raise PG::ConnectionBad, conn.error_message if conn.status == PG::CONNECTION_BAD

conn.send(:async_connect_or_reset, :connect_poll)
rescue PG::ConnectionBad => err
if errors && /authenticat/ !~ err.message
# Seems to be no authentication error -> try next host
errors << err
return nil
else
# Probably an authentication error
raise err
end
end
conn.send(:async_connect_or_reset, :connect_poll)
conn
end

Expand Down
3 changes: 1 addition & 2 deletions spec/pg/connection_async_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
it "tries to connect to localhost with IPv6 and IPv4", :ipv6 do
uri = "postgres://localhost:#{@port+1}/test"
expect(described_class).to receive(:parse_connect_args).once.ordered.with(uri, any_args).and_call_original
expect(described_class).to receive(:parse_connect_args).once.ordered.with(hash_including(hostaddr: "::1")).and_call_original
expect(described_class).to receive(:parse_connect_args).once.ordered.with(hash_including(hostaddr: "127.0.0.1")).and_call_original
expect(described_class).to receive(:parse_connect_args).once.ordered.with(hash_including(hostaddr: "::1,127.0.0.1")).and_call_original
expect{ described_class.connect( uri ) }.to raise_error(PG::ConnectionBad)
end

Expand Down
2 changes: 1 addition & 1 deletion spec/pg/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
conninfo = @conninfo_gate.gsub(/(^| )host=\w+/, " host=scheduler-localhost")
conn = PG.connect(conninfo)
opt = conn.conninfo.find { |info| info[:keyword] == 'host' }
expect( opt[:val] ).to eq( 'scheduler-localhost' )
expect( opt[:val] ).to start_with( 'scheduler-localhost' )
conn.finish
end
end
Expand Down

0 comments on commit 71d1212

Please sign in to comment.