From f7b063b49dfa02237178c96148f583525fcb2a7e Mon Sep 17 00:00:00 2001 From: Lars Kanis Date: Mon, 23 Aug 2021 13:17:25 +0200 Subject: [PATCH] Add async_connect and async_send methods and add specific specs for Fiber.scheduler Fixes #342 --- lib/pg/connection.rb | 202 ++++++++++++++++++++++-- spec/helpers.rb | 2 + spec/helpers/scheduler.rb | 241 +++++++++++++++++++++++++++++ spec/helpers/tcp_gate_scheduler.rb | 186 ++++++++++++++++++++++ spec/pg/connection_sync_spec.rb | 6 +- spec/pg/scheduler_spec.rb | 97 ++++++++++++ 6 files changed, 718 insertions(+), 16 deletions(-) create mode 100644 spec/helpers/scheduler.rb create mode 100644 spec/helpers/tcp_gate_scheduler.rb create mode 100644 spec/pg/scheduler_spec.rb diff --git a/lib/pg/connection.rb b/lib/pg/connection.rb index d3335d0b6..c4e228ed7 100644 --- a/lib/pg/connection.rb +++ b/lib/pg/connection.rb @@ -284,20 +284,194 @@ def ssl_attributes end end - REDIRECT_METHODS = { - :exec => [:async_exec, :sync_exec], - :query => [:async_exec, :sync_exec], - :exec_params => [:async_exec_params, :sync_exec_params], - :prepare => [:async_prepare, :sync_prepare], - :exec_prepared => [:async_exec_prepared, :sync_exec_prepared], - :describe_portal => [:async_describe_portal, :sync_describe_portal], - :describe_prepared => [:async_describe_prepared, :sync_describe_prepared], - } - - def self.async_api=(enable) - REDIRECT_METHODS.each do |ali, (async, sync)| - remove_method(ali) if method_defined?(ali) - alias_method( ali, enable ? async : sync ) + private def async_send_command + # From https://www.postgresql.org/docs/13/libpq-async.html + # After sending any command or data on a nonblocking connection, call PQflush. If it returns 1, wait for the socket to become read- or write-ready. If it becomes write-ready, call PQflush again. If it becomes read-ready, call PQconsumeInput , then call PQflush again. Repeat until PQflush returns 0. (It is necessary to check for read-ready and drain the input with PQconsumeInput , because the server can block trying to send us data, e.g., NOTICE messages, and won't read our data until we read its.) Once PQflush returns 0, wait for the socket to be read-ready and then read the response as described above. + + sent = false + loop do + unless sent + begin + yield + sent = true + rescue PG::UnableToSend => err + end + end + + unless flush() + # wait for the socket to become read- or write-ready + + if Fiber.respond_to?(:scheduler) && Fiber.scheduler + # If a scheduler is set use it directly. + # This is necessary since IO.select isn't passed to the scheduler. + events = Fiber.scheduler.io_wait(socket_io, IO::READABLE | IO::WRITABLE, nil) + if (events & IO::READABLE) > 0 + consume_input + end + else + readable, writable = IO.select([socket_io], [socket_io]) + if readable.any? + consume_input + end + end + else + raise err if err + break + end + end + end + + def async_exec(*args) + discard_results + async_send_query(*args) + + block() + res = get_last_result + + if block_given? + begin + return yield(res) + ensure + res.clear + end + end + res + end + + def async_exec_params(*args) + discard_results + + if args[1].nil? + # TODO: pg_deprecated(3, ("forwarding async_exec_params to async_exec is deprecated")); + async_send_query(*args) + else + async_send_query_params(*args) + end + + block() + res = get_last_result + + if block_given? + begin + return yield(res) + ensure + res.clear + end + end + res + end + + alias sync_send_query send_query + def async_send_query(*args, &block) + async_send_command do + sync_send_query(*args) + end + end + + alias sync_send_query_params send_query_params + def async_send_query_params(*args, &block) + async_send_command do + sync_send_query_params(*args) + end + end + + # In async_api=false mode all send calls run directly on libpq. + # Blocking vs. nonblocking state can be changed in libpq. + alias sync_setnonblocking setnonblocking + + # In async_api=true mode (default) all send calls run nonblocking. + # The difference is that setnonblocking(true) disables automatic handling of would-block cases. + def async_setnonblocking(enabled) + singleton_class.async_send_api = !enabled + sync_setnonblocking(true) + end + + # sync/async isnonblocking methods are switched by async_setnonblocking() + alias sync_isnonblocking isnonblocking + def async_isnonblocking + false + end + + + class << self + alias sync_connect new + + def async_connect(*args, **kwargs) + conn = PG::Connection.connect_start( *args, **kwargs ) or + raise(PG::Error, "Unable to create a new connection") + raise(PG::ConnectionBad, conn.error_message) if conn.status == PG::CONNECTION_BAD + + # Now grab a reference to the underlying socket so we know when the connection is established + socket = conn.socket_io + + # Track the progress of the connection, waiting for the socket to become readable/writable before polling it + poll_status = PG::PGRES_POLLING_WRITING + until poll_status == PG::PGRES_POLLING_OK || + poll_status == PG::PGRES_POLLING_FAILED + + # If the socket needs to read, wait 'til it becomes readable to poll again + case poll_status + when PG::PGRES_POLLING_READING + socket.wait_readable + + # ...and the same for when the socket needs to write + when PG::PGRES_POLLING_WRITING + socket.wait_writable + end + + # Check to see if it's finished or failed yet + poll_status = conn.connect_poll + end + + raise(PG::ConnectionBad, conn.error_message) unless conn.status == PG::CONNECTION_OK + + # Set connection to nonblocking to handle all blocking states in ruby. + # That way a fiber scheduler is able to handle IO requests. + conn.sync_setnonblocking(true) + conn.set_default_encoding + + conn + end + + REDIRECT_CLASS_METHODS = { + :new => [:async_connect, :sync_connect], + } + + REDIRECT_SEND_METHODS = { + :send_query => [:async_send_query, :sync_send_query], + :send_query_params => [:async_send_query_params, :sync_send_query_params], + :isnonblocking => [:async_isnonblocking, :sync_isnonblocking], + :nonblocking? => [:async_isnonblocking, :sync_isnonblocking], + } + REDIRECT_METHODS = { + :exec => [:async_exec, :sync_exec], + :query => [:async_exec, :sync_exec], + :exec_params => [:async_exec_params, :sync_exec_params], + :prepare => [:async_prepare, :sync_prepare], + :exec_prepared => [:async_exec_prepared, :sync_exec_prepared], + :describe_portal => [:async_describe_portal, :sync_describe_portal], + :describe_prepared => [:async_describe_prepared, :sync_describe_prepared], + :setnonblocking => [:async_setnonblocking, :sync_setnonblocking], + } + + def async_send_api=(enable) + REDIRECT_SEND_METHODS.each do |ali, (async, sync)| + undef_method(ali) if method_defined?(ali) + alias_method( ali, enable ? async : sync ) + end + end + + def async_api=(enable) + self.async_send_api = enable + REDIRECT_METHODS.each do |ali, (async, sync)| + remove_method(ali) if method_defined?(ali) + alias_method( ali, enable ? async : sync ) + end + REDIRECT_CLASS_METHODS.each do |ali, (async, sync)| + singleton_class.remove_method(ali) if method_defined?(ali) + # TODO: send is necessary for ruby < 2.5 + singleton_class.send(:alias_method, ali, enable ? async : sync ) + end end end diff --git a/spec/helpers.rb b/spec/helpers.rb index ac2e7d755..006c4ed55 100644 --- a/spec/helpers.rb +++ b/spec/helpers.rb @@ -4,6 +4,8 @@ require 'rspec' require 'shellwords' require 'pg' +require_relative 'helpers/scheduler.rb' +require_relative 'helpers/tcp_gate_scheduler.rb' DEFAULT_TEST_DIR_STR = File.join(Dir.pwd, "tmp_test_specs") TEST_DIR_STR = ENV['RUBY_PG_TEST_DIR'] || DEFAULT_TEST_DIR_STR diff --git a/spec/helpers/scheduler.rb b/spec/helpers/scheduler.rb new file mode 100644 index 000000000..a58946877 --- /dev/null +++ b/spec/helpers/scheduler.rb @@ -0,0 +1,241 @@ +# frozen_string_literal: true + +# This file is copied from https://github.com/ruby/ruby/blob/5e9598baea97c53757f12713bacc7f19f315c846/test/fiber/scheduler.rb + +# This is an example and simplified scheduler for test purposes. +# It is not efficient for a large number of file descriptors as it uses IO.select(). +# Production Fiber schedulers should use epoll/kqueue/etc. + +require 'fiber' +require 'socket' + +begin + require 'io/nonblock' +rescue LoadError + # Ignore. +end + +module Helpers +class Scheduler + def initialize + @readable = {} + @writable = {} + @waiting = {} + + @closed = false + + @lock = Thread::Mutex.new + @blocking = 0 + @ready = [] + + @urgent = IO.pipe + end + + attr :readable + attr :writable + attr :waiting + + def next_timeout + _fiber, timeout = @waiting.min_by{|key, value| value} + + if timeout + offset = timeout - current_time + + if offset < 0 + return 0 + else + return offset + end + end + end + + def run + #$stderr.puts [__method__, Fiber.current].inspect + + while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? + # Can only handle file descriptors up to 1024... + readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) + + puts "readable: #{readable}" if readable&.any? + puts "writable: #{writable}" if writable&.any? + + selected = {} + readable&.each do |io| + if fiber = @readable.delete(io) + @writable.delete(io) if @writable[io] == fiber + selected[fiber] = IO::READABLE + elsif io == @urgent.first + @urgent.first.read_nonblock(1024) + end + end + + writable&.each do |io| + if fiber = @writable.delete(io) + @readable.delete(io) if @readable[io] == fiber + selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE + end + end + + selected.each do |fiber, events| + fiber.resume(events) + end + + if @waiting.any? + time = current_time + waiting, @waiting = @waiting, {} + + waiting.each do |fiber, timeout| + if fiber.alive? + if timeout <= time + fiber.resume + else + @waiting[fiber] = timeout + end + end + end + end + + if @ready.any? + ready = nil + + @lock.synchronize do + ready, @ready = @ready, [] + end + + ready.each do |fiber| + fiber.resume + end + end + end + end + + def close + # $stderr.puts [__method__, Fiber.current].inspect + + raise "Scheduler already closed!" if @closed + + self.run + ensure + if @urgent + @urgent.each(&:close) + @urgent = nil + end + + @closed = true + + # We freeze to detect any unintended modifications after the scheduler is closed: + self.freeze + end + + def closed? + @closed + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def timeout_after(duration, klass, message, &block) + fiber = Fiber.current + + self.fiber do + sleep(duration) + + if fiber&.alive? + fiber.raise(klass, message) + end + end + + begin + yield(duration) + ensure + fiber = nil + end + end + + def process_wait(pid, flags) + # $stderr.puts [__method__, pid, flags, Fiber.current].inspect + + # This is a very simple way to implement a non-blocking wait: + Thread.new do + Process::Status.wait(pid, flags) + end.value + end + + def io_wait(io, events, duration) + #$stderr.puts [__method__, io, events, duration, Fiber.current].inspect + + unless (events & IO::READABLE).zero? + @readable[io] = Fiber.current + end + + unless (events & IO::WRITABLE).zero? + @writable[io] = Fiber.current + end + + Fiber.yield + end + + # Used for Kernel#sleep and Thread::Mutex#sleep + def kernel_sleep(duration = nil) + # $stderr.puts [__method__, duration, Fiber.current].inspect + + self.block(:sleep, duration) + + return true + end + + # Used when blocking on synchronization (Thread::Mutex#lock, + # Thread::Queue#pop, Thread::SizedQueue#push, ...) + def block(blocker, timeout = nil) + # $stderr.puts [__method__, blocker, timeout].inspect + + if timeout + @waiting[Fiber.current] = current_time + timeout + begin + Fiber.yield + ensure + # Remove from @waiting in the case #unblock was called before the timeout expired: + @waiting.delete(Fiber.current) + end + else + @blocking += 1 + begin + Fiber.yield + ensure + @blocking -= 1 + end + end + end + + # Used when synchronization wakes up a previously-blocked fiber + # (Thread::Mutex#unlock, Thread::Queue#push, ...). + # This might be called from another thread. + def unblock(blocker, fiber) + # $stderr.puts [__method__, blocker, fiber].inspect + # $stderr.puts blocker.backtrace.inspect + # $stderr.puts fiber.backtrace.inspect + + @lock.synchronize do + @ready << fiber + end + + io = @urgent.last + io.write_nonblock('.') + end + + def fiber(&block) + fiber = Fiber.new(blocking: false, &block) + + fiber.resume + + return fiber + end + + def address_resolve(hostname) + Thread.new do + Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq + end.value + end +end +end diff --git a/spec/helpers/tcp_gate_scheduler.rb b/spec/helpers/tcp_gate_scheduler.rb new file mode 100644 index 000000000..4706a980d --- /dev/null +++ b/spec/helpers/tcp_gate_scheduler.rb @@ -0,0 +1,186 @@ +# frozen_string_literal: true + +# This is an example and simplified scheduler for test purposes. +# It is not efficient for a large number of file descriptors as it uses IO.select(). +# Production Fiber schedulers should use epoll/kqueue/etc. + +module Helpers +class TcpGateScheduler < Scheduler + class Connection + attr_reader :internal_io + attr_reader :external_io + + def initialize(internal_io, external_host, external_port, debug: false) + @internal_io = internal_io + @external_host = external_host + @external_port = external_port + @external_io = nil + @pending_connect = false + @pending_read = false + @pending_write = false + @debug = debug + end + + def print_data(desc, data) + return unless @debug + if data.bytesize >= 90 + sdata = data[0..90] + puts "#{desc}: #{sdata} (... #{data.bytesize} bytes)" + else + puts "#{desc}: #{data} (#{data.bytesize} bytes)" + end + end + + def puts(*args) + return unless @debug + super + end + + def connect + # Not yet connected? + if !@external_io && !@pending_connect + @pending_connect = true + Fiber.schedule do + @external_io = TCPSocket.new(@external_host, @external_port) + @pending_connect = false + end + end + end + + def read + connect + + if @external_io && !@pending_read + @pending_read = true + Fiber.schedule do + begin + read_str = @external_io.readpartial(1000) + print_data("read fd:#{@external_io.fileno}->#{@internal_io.fileno}", read_str) + @internal_io.write(read_str) + rescue EOFError + puts "read_eof from fd:#{@external_io.fileno}" + @internal_io.close_write + end + @pending_read = false + end + end + end + + def write(amount=5) + connect + if @external_io + if @pending_write + @pending_write += amount + else + @pending_write = amount + Fiber.schedule do + # transfer up to 5*65536 bytes + # this should be enough to trigger writability on the observed connection + loop do + len = 65536 + begin + read_str = @internal_io.readpartial(len) + print_data("write fd:#{@internal_io.fileno}->#{@external_io.fileno}", read_str) + @external_io.write(read_str) + rescue EOFError + puts "write_eof from fd:#{@internal_io.fileno}" + @external_io.close_write + end + @pending_write -= 1 + break if !read_str || read_str.bytesize < len || @pending_write <= 0 + end + @pending_write = false + end + end + end + end + end + + def initialize(external_host:, external_port:, internal_host: 'localhost', internal_port: 0, debug: false) + super() + @started = false + @connections = [] + @server_io = TCPServer.new(internal_host, internal_port) + @external_host = external_host + @external_port = external_port + @finish = false + @debug = debug + end + + def finish + @finish = true + TCPSocket.new('localhost', internal_port).close + end + + def internal_port + @server_io.local_address.ip_port + end + + def puts(*args) + return unless @debug + super + end + + def io_wait(io, events, duration) + #$stderr.puts [:IO_WAIT, io, events, duration, Fiber.current].inspect + + begin + sock = TCPSocket.for_fd(io.fileno) + sock.autoclose = false + remote_address = sock.remote_address + rescue Errno::ENOTCONN + end + + unless @started + @started = true + Fiber.schedule do + # Wait for new connections to the TCP gate + while client=@server_io.accept + break if @finish + conn = Connection.new(client, @external_host, @external_port) + @connections << conn + end + end + end + + # Remove old connections + @connections.reject! do |conn| + conn.internal_io.closed? || conn.external_io&.closed? + end + + # Some IO call is waiting for data by rb_wait_for_single_fd() or so. + # Is it on our intercepted IO? + # Inspect latest connections first, since closed connections aren't removed immediately. + if cidx=@connections.rindex { |g| g.internal_io.local_address.to_s == remote_address.to_s } + conn = @connections[cidx] + puts "trigger: fd:#{io.fileno} #{{addr: remote_address, events: events}}" + # Success! Our observed client IO waits for some data to be readable or writable. + # The IO function running on the observed IO did make proper use of some ruby wait function. + # As a reward we provide some data to read or write. + # + # To the contrary: + # If the blocking IO function doesn't make use of ruby wait functions, then it won't get any data and starve as a result. + + if (events & IO::WRITABLE) > 0 + conn.write + + if (events & IO::READABLE) > 0 + conn.read + end + else + if (events & IO::READABLE) > 0 + # The write handler is called here because writes usually succeed without waiting for writablility. + # In this case the callback wait_io(IO::WRITABLE) isn't called, so that we don't get a trigger to transfer data. + # But after sending some data the caller usually waits for some answer to read. + # Therefore trigger transfer of all pending written data. + conn.write(99999) + + conn.read + end + end + end + + super + end +end +end diff --git a/spec/pg/connection_sync_spec.rb b/spec/pg/connection_sync_spec.rb index 52fd22469..1b8ed5e7b 100644 --- a/spec/pg/connection_sync_spec.rb +++ b/spec/pg/connection_sync_spec.rb @@ -4,11 +4,13 @@ require_relative '../helpers' context "running with sync_* methods" do - before :each do + before :all do + @conn.finish PG::Connection.async_api = false + @conn = connect_testing_db end - after :each do + after :all do PG::Connection.async_api = true end diff --git a/spec/pg/scheduler_spec.rb b/spec/pg/scheduler_spec.rb new file mode 100644 index 000000000..a1f73f2b1 --- /dev/null +++ b/spec/pg/scheduler_spec.rb @@ -0,0 +1,97 @@ +# -*- rspec -*- +#encoding: utf-8 + +require_relative '../helpers' + +$scheduler_timeout = false + +context "with a Fiber scheduler" do + + def setup + # Run examples with gated scheduler + sched = Helpers::TcpGateScheduler.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i) + Fiber.set_scheduler(sched) + @conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{sched.internal_port}") + + # Run examples with default scheduler + #Fiber.set_scheduler(Helpers::Scheduler.new) + #@conninfo_gate = @conninfo + + # Run examples without scheduler + #def Fiber.schedule; yield; end + #@conninfo_gate = @conninfo + end + + def teardown + Fiber.set_scheduler(nil) + end + + def stop_scheduler + if Fiber.scheduler && Fiber.scheduler.respond_to?(:finish) + Fiber.scheduler.finish + end + end + + def thread_with_timeout(timeout) + th = Thread.new do + yield + end + unless th.join(timeout) + th.kill + $scheduler_timeout = true + raise("scheduler timeout in:\n#{th.backtrace.join("\n")}") + end + end + + it "connects to a server" do + thread_with_timeout(10) do + setup + Fiber.schedule do + conn = PG.connect(@conninfo_gate) + + res = conn.exec_params("SELECT 7", []) + expect(res.values).to eq([["7"]]) + + conn.finish + stop_scheduler + end + end + end + + it "waits when sending data" do + thread_with_timeout(10) do + setup + Fiber.schedule do + conn = PG.connect(@conninfo_gate) + + data = "x" * 1000 * 1000 * 10 + res = conn.exec_params("SELECT LENGTH($1)", [data]) + expect(res.values).to eq([[data.length.to_s]]) + + conn.finish + stop_scheduler + end + end + end + + it "connects several times" do + thread_with_timeout(10) do + setup + Fiber.schedule do + 3.times do + conn = PG.connect(@conninfo_gate) + conn.finish + end + stop_scheduler + end + end + end +end + +# Do not wait for threads doing blocking calls at the process shutdown. +# Instead exit immediately after printing the rspec report, if we know there are pending IO calls, which do not react on ruby interrupts. +END{ + if $scheduler_timeout + exit!(1) + end +}