diff --git a/lib/async/task.rb b/lib/async/task.rb index 9bb6c60a..0df511dc 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -67,6 +67,13 @@ def self.yield Fiber.scheduler.transfer end + # Run the given block of code in a task, asynchronously, in the given scheduler. + def self.run(scheduler, *arguments, **options, &block) + self.new(scheduler, **options, &block).tap do |task| + task.run(*arguments) + end + end + # Create a new task. # @parameter reactor [Reactor] the reactor this task will run within. # @parameter parent [Task] the parent task. diff --git a/lib/kernel/async.rb b/lib/kernel/async.rb index ed93af62..5b69a5fc 100644 --- a/lib/kernel/async.rb +++ b/lib/kernel/async.rb @@ -24,6 +24,8 @@ module Kernel def Async(...) if current = ::Async::Task.current? return current.async(...) + elsif scheduler = Fiber.scheduler + ::Async::Task.run(scheduler, ...) else # This calls Fiber.set_scheduler(self): reactor = ::Async::Reactor.new diff --git a/lib/kernel/sync.rb b/lib/kernel/sync.rb index bb23461b..e52ddcad 100644 --- a/lib/kernel/sync.rb +++ b/lib/kernel/sync.rb @@ -18,6 +18,8 @@ module Kernel def Sync(&block) if task = ::Async::Task.current? yield task + elsif scheduler = Fiber.scheduler + ::Async::Task.run(scheduler, &block).wait else # This calls Fiber.set_scheduler(self): reactor = Async::Reactor.new diff --git a/readme.md b/readme.md index b9926150..b99dbc11 100644 --- a/readme.md +++ b/readme.md @@ -35,6 +35,10 @@ Please see the [project documentation](https://socketry.github.io/async/) for mo Please see the [project releases](https://socketry.github.io/async/releases/index) for all releases. +### Next + + - [Better Handling of Async and Sync in Nested Fibers](https://socketry.github.io/async/releases/index#better-handling-of-async-and-sync-in-nested-fibers) + ## See Also - [async-http](https://github.com/socketry/async-http) — Asynchronous HTTP client/server. diff --git a/releases.md b/releases.md index e69de29b..3bc26f9a 100644 --- a/releases.md +++ b/releases.md @@ -0,0 +1,24 @@ +# Changes + +## Next + +### Better Handling of Async and Sync in Nested Fibers + +Interleaving bare fibers within `Async` and `Sync` blocks should not cause problems, but it presents a number of issues in the current implementation. Tracking the parent-child relationship between tasks, when they are interleaved with bare fibers, is difficult. The current implementation assumes that if there is no parent task, then it should create a new reactor. This is not always the case, as the parent task might not be visible due to nested Fibers. As a result, `Async` will create a new reactor, trying to stop the existing one, causing major internal consistency issues. + +I encountered this issue when trying to use `Async` within a streaming response in Rails. The `protocol-rack` [uses a normal fiber to wrap streaming responses](https://github.com/socketry/protocol-rack/blob/cb1ca44e9deadb9369bdb2ea03416556aa927c5c/lib/protocol/rack/body/streaming.rb#L24-L28), and if you try to use `Async` within it, it will create a new reactor, causing the server to lock up. + +Ideally, `Async` and `Sync` helpers should work when any `Fiber.scheduler` is defined. Right now, it's unrealistic to expect `Async::Task` to work in any scheduler, but at the very least, the following should work: + +```ruby +reactor = Async::Reactor.new # internally calls Fiber.set_scheduler + +# This should run in the above reactor, rather than creating a new one. +Async do + puts "Hello World" +end +``` + +In order to do this, bare `Async` and `Sync` blocks should use `Fiber.scheduler` as a parent if possible. + +See for more details. diff --git a/test/async/reactor.rb b/test/async/reactor.rb index cd0e4c8e..098674d2 100644 --- a/test/async/reactor.rb +++ b/test/async/reactor.rb @@ -10,6 +10,10 @@ describe Async::Reactor do let(:reactor) {subject.new} + + after do + Fiber.set_scheduler(nil) + end with '#run' do it "can run tasks on different fibers" do @@ -218,24 +222,6 @@ expect(result).to be_a(Async::Task) end - with '#async' do - include Sus::Fixtures::Async::ReactorContext - - it "can pass in arguments" do - reactor.async(:arg) do |task, arg| - expect(arg).to be == :arg - end.wait - end - - it "passes in the correct number of arguments" do - reactor.async(:arg1, :arg2, :arg3) do |task, arg1, arg2, arg3| - expect(arg1).to be == :arg1 - expect(arg2).to be == :arg2 - expect(arg3).to be == :arg3 - end.wait - end - end - with '#with_timeout' do let(:duration) {1} @@ -282,16 +268,57 @@ end end - it "validates scheduler assignment" do - # Assign the scheduler: - reactor = self.reactor - - # Close the previous scheduler: - Async {} - - expect do - # The reactor is closed: - reactor.async {} - end.to raise_exception(Async::Scheduler::ClosedError) + with 'Kernel.Async' do + it "reuses existing scheduler" do + # Assign the scheduler: + reactor = self.reactor + + # Re-use the previous scheduler: + state = nil + Async do + state = :started + end + + reactor.run + + expect(state).to be == :started + end + end + + with 'Kernel.Sync' do + it "reuses existing scheduler" do + # Assign the scheduler: + reactor = self.reactor + + # Re-use the previous scheduler: + state = nil + Sync do |task| + state = :started + end + + reactor.run + + expect(state).to be == :started + end + end +end + +describe Async::Reactor do + include Sus::Fixtures::Async::ReactorContext + + with '#async' do + it "can pass in arguments" do + reactor.async(:arg) do |task, arg| + expect(arg).to be == :arg + end.wait + end + + it "passes in the correct number of arguments" do + reactor.async(:arg1, :arg2, :arg3) do |task, arg1, arg2, arg3| + expect(arg1).to be == :arg1 + expect(arg2).to be == :arg2 + expect(arg3).to be == :arg3 + end.wait + end end end diff --git a/test/async/task.rb b/test/async/task.rb index 87dac949..060d2646 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -17,7 +17,7 @@ let(:reactor) {Async::Reactor.new} after do - reactor.close + Fiber.set_scheduler(nil) end with '#annotate' do diff --git a/test/fiber.rb b/test/fiber.rb index 5803f5ca..db6e3d93 100644 --- a/test/fiber.rb +++ b/test/fiber.rb @@ -32,6 +32,27 @@ expect(error).to be_a(Async::Stop) end + + it "can nest child tasks within a resumed fiber" do + skip_unless_minimum_ruby_version("3.3.4") + + variable = Async::Variable.new + error = nil + + Sync do |task| + child_task = task.async do + Fiber.new do + Async do + variable.value + end.wait + end.resume + end + + expect(child_task).to be(:running?) + + variable.value = true + end + end end with '.schedule' do