Skip to content

Commit

Permalink
Add async_connect and async_send methods and add specific specs for F…
Browse files Browse the repository at this point in the history
…iber.scheduler

Fixes ged#342
  • Loading branch information
larskanis committed Aug 23, 2021
1 parent 9d3bf8f commit f99ec5d
Show file tree
Hide file tree
Showing 6 changed files with 696 additions and 16 deletions.
201 changes: 187 additions & 14 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,193 @@ def ssl_attributes
end
end

REDIRECT_METHODS = {
:exec => [:async_exec, :sync_exec],
:query => [:async_exec, :sync_exec],
:exec_params => [:async_exec_params, :sync_exec_params],
:prepare => [:async_prepare, :sync_prepare],
:exec_prepared => [:async_exec_prepared, :sync_exec_prepared],
:describe_portal => [:async_describe_portal, :sync_describe_portal],
:describe_prepared => [:async_describe_prepared, :sync_describe_prepared],
}

def self.async_api=(enable)
REDIRECT_METHODS.each do |ali, (async, sync)|
remove_method(ali) if method_defined?(ali)
alias_method( ali, enable ? async : sync )
private def async_send_command
# From https://www.postgresql.org/docs/13/libpq-async.html
# After sending any command or data on a nonblocking connection, call PQflush. If it returns 1, wait for the socket to become read- or write-ready. If it becomes write-ready, call PQflush again. If it becomes read-ready, call PQconsumeInput , then call PQflush again. Repeat until PQflush returns 0. (It is necessary to check for read-ready and drain the input with PQconsumeInput , because the server can block trying to send us data, e.g., NOTICE messages, and won't read our data until we read its.) Once PQflush returns 0, wait for the socket to be read-ready and then read the response as described above.

sent = false
loop do
unless sent
begin
yield
sent = true
rescue PG::UnableToSend => err
end
end

unless flush()
# wait for the socket to become read- or write-ready

if Fiber.respond_to?(:scheduler) && Fiber.scheduler
# If a scheduler is set use it directly.
# This is necessary since IO.select isn't passed to the scheduler.
events = Fiber.scheduler.io_wait(socket_io, IO::READABLE | IO::WRITABLE, nil)
if (events & IO::READABLE) > 0
consume_input
end
else
readable, writable = IO.select([socket_io], [socket_io])
if readable.any?
consume_input
end
end
else
raise err if err
break
end
end
end

def async_exec(*args)
discard_results
async_send_query(*args)

block()
res = get_last_result

if block_given?
begin
return yield(res)
ensure
res.clear
end
end
res
end

def async_exec_params(*args)
discard_results

if args[1].nil?
# TODO: pg_deprecated(3, ("forwarding async_exec_params to async_exec is deprecated"));
async_send_query(*args)
else
async_send_query_params(*args)
end

block()
res = get_last_result

if block_given?
begin
return yield(res)
ensure
res.clear
end
end
res
end

alias sync_send_query send_query
def async_send_query(*args, &block)
async_send_command do
sync_send_query(*args)
end
end

alias sync_send_query_params send_query_params
def async_send_query_params(*args, &block)
async_send_command do
sync_send_query_params(*args)
end
end

# In async_api=false mode all send calls run directly on libpq.
# Blocking vs. nonblocking state can be changed in libpq.
alias sync_setnonblocking setnonblocking

# In async_api=true mode (default) all send calls run nonblocking.
# The difference is that setnonblocking(true) disables automatic handling of would-block cases.
def async_setnonblocking(enabled)
singleton_class.async_send_api = !enabled
sync_setnonblocking(true)
end

# sync/async isnonblocking methods are switched by async_setnonblocking()
alias sync_isnonblocking isnonblocking
def async_isnonblocking
false
end


class << self
alias sync_connect new

def async_connect(*args, **kwargs)
conn = PG::Connection.connect_start( *args, **kwargs ) or
raise(PG::Error, "Unable to create a new connection")
raise(PG::ConnectionBad, conn.error_message) if conn.status == PG::CONNECTION_BAD

# Now grab a reference to the underlying socket so we know when the connection is established
socket = conn.socket_io

# Track the progress of the connection, waiting for the socket to become readable/writable before polling it
poll_status = PG::PGRES_POLLING_WRITING
until poll_status == PG::PGRES_POLLING_OK ||
poll_status == PG::PGRES_POLLING_FAILED

# If the socket needs to read, wait 'til it becomes readable to poll again
case poll_status
when PG::PGRES_POLLING_READING
socket.wait_readable

# ...and the same for when the socket needs to write
when PG::PGRES_POLLING_WRITING
socket.wait_writable
end

# Check to see if it's finished or failed yet
poll_status = conn.connect_poll
end

raise(PG::ConnectionBad, conn.error_message) unless conn.status == PG::CONNECTION_OK

# Set connection to nonblocking to handle all blocking states in ruby.
# That way a fiber scheduler is able to handle IO requests.
conn.sync_setnonblocking(true)
conn.set_default_encoding

conn
end

REDIRECT_CLASS_METHODS = {
:new => [:async_connect, :sync_connect],
}

REDIRECT_SEND_METHODS = {
:send_query => [:async_send_query, :sync_send_query],
:send_query_params => [:async_send_query_params, :sync_send_query_params],
:isnonblocking => [:async_isnonblocking, :sync_isnonblocking],
:nonblocking? => [:async_isnonblocking, :sync_isnonblocking],
}
REDIRECT_METHODS = {
:exec => [:async_exec, :sync_exec],
:query => [:async_exec, :sync_exec],
:exec_params => [:async_exec_params, :sync_exec_params],
:prepare => [:async_prepare, :sync_prepare],
:exec_prepared => [:async_exec_prepared, :sync_exec_prepared],
:describe_portal => [:async_describe_portal, :sync_describe_portal],
:describe_prepared => [:async_describe_prepared, :sync_describe_prepared],
:setnonblocking => [:async_setnonblocking, :sync_setnonblocking],
}

def async_send_api=(enable)
REDIRECT_SEND_METHODS.each do |ali, (async, sync)|
undef_method(ali) if method_defined?(ali)
alias_method( ali, enable ? async : sync )
end
end

def async_api=(enable)
self.async_send_api = enable
REDIRECT_METHODS.each do |ali, (async, sync)|
remove_method(ali) if method_defined?(ali)
alias_method( ali, enable ? async : sync )
end
REDIRECT_CLASS_METHODS.each do |ali, (async, sync)|
singleton_class.remove_method(ali) if method_defined?(ali)
singleton_class.alias_method( ali, enable ? async : sync )
end
end
end

Expand Down
2 changes: 2 additions & 0 deletions spec/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require 'rspec'
require 'shellwords'
require 'pg'
require_relative 'helpers/scheduler.rb'
require_relative 'helpers/tcp_gate_scheduler.rb'

DEFAULT_TEST_DIR_STR = File.join(Dir.pwd, "tmp_test_specs")
TEST_DIR_STR = ENV['RUBY_PG_TEST_DIR'] || DEFAULT_TEST_DIR_STR
Expand Down
Loading

0 comments on commit f99ec5d

Please sign in to comment.