-
Notifications
You must be signed in to change notification settings - Fork 507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add the ability to spawn futures #679
base: main
Are you sure you want to change the base?
Conversation
JobResult::Ok(x) => Poll::Ready(x), | ||
JobResult::Panic(p) => { | ||
drop(guard); // don't poison the lock | ||
unwind::resume_unwinding(p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is usually how we propagate panics, but maybe the future should yield Result<T, ...>
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be best to let Rayon's panic_handler
handle the actual panic, but also panic here with something like panic!("the spawned task has panicked")
rather than resuming with the original one.
If one were to retrieve the result of a spawned task without using futures, they would probably create a channel and send the result through it. Then, if the task panics, the sender side of the channel gets dropped, thus disconnecting it. If one attempts to receive the result from the receiver side of the channel, the receiver.recv()
call panics because the channel is disconnected.
So that way spawn_future
would closely match the behavior of spawn
+ using a channel to retrieve the result.
Over the past few months I did a lot of exploration with panic propagation strategies in asynchronous contexts and talked to people about their use cases. In the end, I figured the generally best way of handling panics is to pass them to the panic handler and raise a new panic whenever the result of a failed task is polled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be OK with that too. I was thinking of Result
as an analogy from std::thread::spawn
+ JoinHandle::join
to rayon::spawn_future
+ Future::poll
.
The CI failure is that my new inherent methods interrupted access to the extension trait methods in rayon-future's tests -- meh. |
rayon-core/src/future.rs
Outdated
} | ||
} | ||
|
||
pub fn spawn_future<F, T>(func: F) -> impl Future<Output = T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think I expected spawn_future
to take a future as argument, so that you would do something like:
let future = rayon::spawn_future(async move {
...
});
where the returned future
could then be awaited from other futures. This would be somewhat analagous to the spawn
function from async.rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I was wondering is if async-task could be useful to us here -- I still haven't fully grokked that crate. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm no expert here, but I think the difference is whether we want rayon to be a full executor, or just a new source of asynchronous events, and I was thinking more of the latter.
My intention was that you could still use tokio, async-std, or whatever with all of their abstractions working with file/network IO and such, and Rayon would just add something like an abstract CPU-IO. Other executors are usually latency-oriented, but Rayon is throughput-oriented with its greedy task stealing.
Anyway, I just tried async-task a bit, and it looks fine for the simple case:
pub fn spawn_future<F, T>(future: F) -> impl Future<Output = Option<T>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |task| crate::spawn(|| task.run()), ());
task.schedule();
handle
}
It's OK for ThreadPool::spawn_future
too, just tagged to a particular Registry
. But we run into trouble with Scope
, since async_task::spawn
is all 'static
. I don't think we can safely erase lifetimes here when we don't control the implementation behind it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can safely erase lifetimes here when we don't control the implementation behind it.
I think this was exactly the case where I introduced a bit of unsafety into the prior implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well yes, erasing lifetimes requires unsafe
, but I meant that I'm not sure we can do that safely even in the "I know better than the compiler" sense. In the prior implementation, we kept complete control, which we wouldn't have under async-task
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see what you meant by this:
I don't think we can safely erase lifetimes here when we don't control the implementation behind it.
I'm not sure I totally agree, though, but definitely we would want to be very explicitly about what we are assuming and to have async-task
commit (in a semver sense) to preserving those invariants.
@alexcrichton, @fitzgen, maybe you have some perspective from the WASM side? Would either of these signatures be better or worse for integrating with wasm/js futures?
I suspect the latter might be problematic, having to wait for unknown input futures from rayon threads, since we can't really unwind the entire thread out of wasm. But is the simpler |
I believe that should work and there aren't any technical restrictions on our side (although Alex knows more about our executor in a multithreaded context). My one concern, unrelated to wasm, is using |
Definitely rayon is "throughput optimized" and may not be the best choice to use for all your futures. But where I thought the In any case, I also don't think it causes a big problem if you block on I/O events in rayon. The future would simply not be in our thread-pools until the I/O event was "wakened". It's not like it would block in the normal sense. (You might not want to combine that with |
Just that it's a smaller API surface to consider in this PR if we don't name it. But the type is simple enough that it shouldn't be a problem to return the concrete type.
Yeah, so this is a case where I'm not certain how these things would work. I guess if the I brought up the WASM case because I thought there were some big caveats about notifying WASM threads about javascript futures. But maybe if the actual task suspension still looks like it happens outside of the threadpool, and we're respawning to get back in, it may work OK? Not sure. We should probably play with some real examples before we commit to anything. |
As to the effect on wasm I think the main question here would be what extra APIs rayon needs from the host. For example in our demos we have a If |
An update: So we talked at some length to @alexcrichton and @fitzgen on Discord and came to the conclusion that "WASM interop is not a major issue here". We left the meeting with the conclusion that I would spend a bit of time looking into what it would take to implement a (Side note: I know this will shock absolutely no one, but I think this is worth moving to an RFC. There are a number of smaller details concerning the APIs worth talking over and documenting. ) The set of APIs I think we want to support are something like this:
(And probably that Anyway, I dug into what an implementation would take. I think if we were to reimplement everything ourselves, the primary thing we need to create is a
I started writing this code, but it turns out that the async-task crate basically implements all of this logic already, so it really makes sense I think to build on that. The simplest integration (at the static level, and not "peak efficiency") just looks like this: pub fn spawn_future<R>(future: impl Future<Output = R> + Send + 'static) -> impl Future<Output = R>
where
R: Send + 'static,
{
let (task, handle) = async_task::spawn(
future,
move |task| rayon_core::spawn(move || task.run()),
(),
);
task.schedule();
async move { handle.await.unwrap() }
} I saw "not peak efficiency" because each call to I discussed with @stjepang -- if
To handle the |
I think it should be in core. It doesn't have to affect the general minimum rust if we gate the new functionality, whether by a cargo feature or just |
I'm about halfway through a straightforward port of the existing The library builds and feels approximately correct but I haven't ported the tests yet, thus about halfway. I think the most interesting part of that work has been how much simpler I don't have too many opinions about what the API should be. I do think that dropping a |
Any progress on this, for someone with a mix of CPU / IO bound work what's the recommended path? |
I haven't have the time lately to work on the more advanced ideas proposed, nor do I really have any expertise in I still think the very simple idea in this PR, |
@cuviper I'm also interested in being able to ergonomically offload compute-bound work (or possibly work that just still has to be done with blocking IO) in an async context. Thanks for working on this! My feeling is that the API that @nikomatsakis proposed is probably the right one because compute bound work might want to interleave some async IO operations and a future captures that sort of computation better than a closure. That said, for now a closure-based API is good enough for me. To hold me over while you all take the time to design a nice interface, I just threw together https://github.com/ethanpailes/rayon-future. Maybe it will be useful to someone else in the same boat. |
I noticed that the new gtk-rs release includes a threadpool with pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
&self,
func: F
) -> Result<impl Future<Output = T>, Error> I still like the simplicity of not trying to await future input on our part. It's also possible for us to add that in the short term, and maybe add a separate method that maps futures in&out later. |
Revisiting this -- maybe the answer is to give a different name to this The |
Add a way to spawn tasks with a returned `Future`. The task is immediately queued for the thread pool to execute.
Gentle poke here, there's renewed interest on Bevy's side in (re-)adopting rayon for thread pool management and task execution, particularly due to its better thread sleep management than our current use of Is there still interest in supporting this use case? If so, what's needed to advance this forward? There was mention of needing an RFC, but there doesn't seem to be a PR for one in the RFCs repo, open or closed. Right now we can complete a migration to using rayon-core and just use @nikomatsakis's non-"peak efficiency" implementation mentioned above, but it'd be desirable to eliminate the extra allocation on re-scheduling the runnable. |
I think it could be built on something like this, if you want to experiment with it: impl JobRef {
pub(super) fn new_async(runnable: async_task::Runnable<()>) -> JobRef {
// TODO: call registry.increment_terminate_count()?
JobRef {
pointer: runnable.into_raw().as_ptr(),
execute_fn: |this| unsafe {
// TODO: handle panics with registry.catch_unwind(..)
// TODO: should we always use NonNull for `Job`?
// TODO: call registry.terminate()?
let this = NonNull::new_unchecked(this as *mut ());
async_task::Runnable::<()>::from_raw(this).run();
},
}
}
} (I don't think there's any value in literally implementing |
For Rust 1.36+ with
std::future::Future
, add a way to spawn tasks witha returned
Future
. The task is immediately queued for the thread poolto execute.