Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling of bare Async and Sync blocks when a scheduler is defined. #340

Merged
merged 5 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ def self.yield
Fiber.scheduler.transfer
end

# Run the given block of code in a task, asynchronously, in the given scheduler.
def self.run(scheduler, *arguments, **options, &block)
self.new(scheduler, **options, &block).tap do |task|
task.run(*arguments)
end
end

# Create a new task.
# @parameter reactor [Reactor] the reactor this task will run within.
# @parameter parent [Task] the parent task.
Expand Down
2 changes: 2 additions & 0 deletions lib/kernel/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ module Kernel
def Async(...)
if current = ::Async::Task.current?
return current.async(...)
elsif scheduler = Fiber.scheduler
::Async::Task.run(scheduler, ...)
else
# This calls Fiber.set_scheduler(self):
reactor = ::Async::Reactor.new
Expand Down
2 changes: 2 additions & 0 deletions lib/kernel/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ module Kernel
def Sync(&block)
if task = ::Async::Task.current?
yield task
elsif scheduler = Fiber.scheduler
::Async::Task.run(scheduler, &block).wait
else
# This calls Fiber.set_scheduler(self):
reactor = Async::Reactor.new
Expand Down
4 changes: 4 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Please see the [project documentation](https://socketry.github.io/async/) for mo

Please see the [project releases](https://socketry.github.io/async/releases/index) for all releases.

### Next

- [Better Handling of Async and Sync in Nested Fibers](https://socketry.github.io/async/releases/index#better-handling-of-async-and-sync-in-nested-fibers)

## See Also

- [async-http](https://github.com/socketry/async-http) — Asynchronous HTTP client/server.
Expand Down
24 changes: 24 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Changes

## Next

### Better Handling of Async and Sync in Nested Fibers

Interleaving bare fibers within `Async` and `Sync` blocks should not cause problems, but it presents a number of issues in the current implementation. Tracking the parent-child relationship between tasks, when they are interleaved with bare fibers, is difficult. The current implementation assumes that if there is no parent task, then it should create a new reactor. This is not always the case, as the parent task might not be visible due to nested Fibers. As a result, `Async` will create a new reactor, trying to stop the existing one, causing major internal consistency issues.

I encountered this issue when trying to use `Async` within a streaming response in Rails. The `protocol-rack` [uses a normal fiber to wrap streaming responses](https://github.com/socketry/protocol-rack/blob/cb1ca44e9deadb9369bdb2ea03416556aa927c5c/lib/protocol/rack/body/streaming.rb#L24-L28), and if you try to use `Async` within it, it will create a new reactor, causing the server to lock up.

Ideally, `Async` and `Sync` helpers should work when any `Fiber.scheduler` is defined. Right now, it's unrealistic to expect `Async::Task` to work in any scheduler, but at the very least, the following should work:

```ruby
reactor = Async::Reactor.new # internally calls Fiber.set_scheduler

# This should run in the above reactor, rather than creating a new one.
Async do
puts "Hello World"
end
```

In order to do this, bare `Async` and `Sync` blocks should use `Fiber.scheduler` as a parent if possible.

See <https://github.com/socketry/async/pull/340> for more details.
85 changes: 56 additions & 29 deletions test/async/reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

describe Async::Reactor do
let(:reactor) {subject.new}

after do
Fiber.set_scheduler(nil)
end

with '#run' do
it "can run tasks on different fibers" do
Expand Down Expand Up @@ -218,24 +222,6 @@
expect(result).to be_a(Async::Task)
end

with '#async' do
include Sus::Fixtures::Async::ReactorContext

it "can pass in arguments" do
reactor.async(:arg) do |task, arg|
expect(arg).to be == :arg
end.wait
end

it "passes in the correct number of arguments" do
reactor.async(:arg1, :arg2, :arg3) do |task, arg1, arg2, arg3|
expect(arg1).to be == :arg1
expect(arg2).to be == :arg2
expect(arg3).to be == :arg3
end.wait
end
end

with '#with_timeout' do
let(:duration) {1}

Expand Down Expand Up @@ -282,16 +268,57 @@
end
end

it "validates scheduler assignment" do
# Assign the scheduler:
reactor = self.reactor

# Close the previous scheduler:
Async {}

expect do
# The reactor is closed:
reactor.async {}
end.to raise_exception(Async::Scheduler::ClosedError)
with 'Kernel.Async' do
it "reuses existing scheduler" do
# Assign the scheduler:
reactor = self.reactor

# Re-use the previous scheduler:
state = nil
Async do
state = :started
end

reactor.run

expect(state).to be == :started
end
end

with 'Kernel.Sync' do
it "reuses existing scheduler" do
# Assign the scheduler:
reactor = self.reactor

# Re-use the previous scheduler:
state = nil
Sync do |task|
state = :started
end

reactor.run

expect(state).to be == :started
end
end
end

describe Async::Reactor do
include Sus::Fixtures::Async::ReactorContext

with '#async' do
it "can pass in arguments" do
reactor.async(:arg) do |task, arg|
expect(arg).to be == :arg
end.wait
end

it "passes in the correct number of arguments" do
reactor.async(:arg1, :arg2, :arg3) do |task, arg1, arg2, arg3|
expect(arg1).to be == :arg1
expect(arg2).to be == :arg2
expect(arg3).to be == :arg3
end.wait
end
end
end
2 changes: 1 addition & 1 deletion test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
let(:reactor) {Async::Reactor.new}

after do
reactor.close
Fiber.set_scheduler(nil)
end

with '#annotate' do
Expand Down
21 changes: 21 additions & 0 deletions test/fiber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@

expect(error).to be_a(Async::Stop)
end

it "can nest child tasks within a resumed fiber" do
skip_unless_minimum_ruby_version("3.3.4")

variable = Async::Variable.new
error = nil

Sync do |task|
child_task = task.async do
Fiber.new do
Async do
variable.value
end.wait
end.resume
end

expect(child_task).to be(:running?)

variable.value = true
end
end
end

with '.schedule' do
Expand Down
Loading