-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Block in-place if running on a virtual thread #3870
Conversation
classOf[Thread] | ||
.getDeclaredMethod("isVirtual") | ||
.invoke(Thread.currentThread()) | ||
.asInstanceOf[Boolean] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not used method handle too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is test code.
Will |
@wb14123 yes. To be more precise:
|
var error: Throwable = null | ||
val r = | ||
try { | ||
cur.thunk() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we also use scala.concurrent.blocking(cur.thunk())
here so that if thunk()
is an operation that blocks the OS thread, JVM can spawn another OS thread? If it doesn't block the OS thread, since val ec = currentCtx
is virtual thread pool, JVM will still use the same OS thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala.concurrent.blocking
doesn't do anything for virtual threads, because virtual threads do not extend scala.concurrent.BlockContext
:)
if
thunk()
is an operation that blocks the OS thread, JVM can spawn another OS thread
The entire point of using virtual threads is that the JVM will manage all of this for you. Further reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, maybe I am wrong about that 🤔
The scheduler does not compensate for pinning by expanding its parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think it only creates a virtual thread if you submit the task to virtual thread executor. So maybe something like currentCtx.execute(cur.thunk)
(suppose currentCtx is the virtual thread executor)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I read the wrong section.
However, some blocking operations in the JDK do not unmount the virtual thread, and thus block both its carrier and the underlying OS thread. This is because of limitations either at the OS level (e.g., many filesystem operations) or at the JDK level (e.g.,
Object.wait()
). The implementation of these blocking operations will compensate for the capture of the OS thread by temporarily expanding the parallelism of the scheduler. Consequently, the number of platform threads in the scheduler'sForkJoinPool
may temporarily exceed the number of available processors.
It's very confusing. But the point is that the JVM is handling it 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that
blockingOp
andnonblockingOp
share the same virtual thread?
Yes they will share the same virtual thread. But that is not a problem: the second IO
should not run until the first IO
completes, so it does not matter if the virtual thread is blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry my bad... I meant to run them in parallel: IO.blocking(blockingOp) &> IO(nonblockingOp)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, an independent fiber will be created for each IO
, and those fibers will be submitted to the execution context. If you are using newVirtualThreadPerTaskExecutor
that means that each of those fibers will run on its own virtual thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, in this case it should be fine then, thanks for the patient explanation!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, the JVM is handling it. Also, some limits may be lifted in time, such as Object.wait
.
} else if (isVirtualThread(Thread.currentThread())) { | ||
var error: Throwable = null | ||
val r = | ||
try { | ||
cur.thunk() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very important to call out: there is a big assumption here that if we are running on a virtual thread, the model being used is that each fiber is running on a dedicated virtual thread. While this seems like the most sensible way to use virtual threads, it's not the only way.
For example, someone could build an execution context with a fixed number of virtual threads that poll from a shared task queue. In that case, blocking the virtual thread as we are doing here would be very problematic for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
someone could build an execution context with a fixed number of virtual threads
I think that is looking for trouble. The documentation literally says "Never Pool Virtual Threads"...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be cases in which limiting the number of possible virtual threads is desirable. For example, for limiting parallelism in certain instances. But I don't see the problem, because it would work as the user intended.
The only question is if we should integrate with BlockContext
. I think we can integrate, as I don't think you necessarily need to inherit Threads from it. But for now, I don't think it's necessary; mostly because I can't think of any reason for wanting to limit virtual threads while allowing more threads in case of blocking operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be cases in which limiting the number of possible virtual threads is desirable.
To be clear: there is actually no issue with limiting the number of virtual threads. The concern is if a virtual thread may be responsible for running more than one fiber. So long as every fiber has its own virtual thread, then there is no problem.
edit: I wrote that too late at night and it doesn't make sense, sorry :)
I think we can integrate, as I don't think you necessarily need to inherit Threads from it.
You can also install the BlockContext
as a thread-local, but making this work will add some allocations and overhead.
mh = lookup.findVirtual(Thread.class, "isVirtual", mt); | ||
} catch (Throwable t) { | ||
mh = | ||
MethodHandles.dropArguments( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems wasteful to produce a method handle that always returns false
when invoked. Maybe I'm missing something, maybe this ends up as being inlined, and it doesn't matter, but I would see this method more like:
static boolean isVirtualThread(final Thread thread) {
if (THREAD_IS_VIRTUAL_HANDLE != null)
try {
return (boolean) THREAD_IS_VIRTUAL_HANDLE.invokeExact(thread);
} catch (Throwable t) {}
return false;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I mean I have no idea without studying the assembly and/or benchmarking 😄
In your version, we depend on the JVM optimizing away the conditional and branching. In my version there is no branching, just a static method call, which would ideally be inlined.
} else if (isVirtualThread(Thread.currentThread())) { | ||
var error: Throwable = null | ||
val r = | ||
try { | ||
cur.thunk() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be cases in which limiting the number of possible virtual threads is desirable. For example, for limiting parallelism in certain instances. But I don't see the problem, because it would work as the user intended.
The only question is if we should integrate with BlockContext
. I think we can integrate, as I don't think you necessarily need to inherit Threads from it. But for now, I don't think it's necessary; mostly because I can't think of any reason for wanting to limit virtual threads while allowing more threads in case of blocking operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good 👍
} | ||
|
||
val next = if (error eq null) succeeded(r, 0) else failed(error, 0) | ||
runLoop(next, nextCancelation, nextAutoCede) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimization is good to have in the current implementation, but there's something I don't like...
I would have expected the ability to override runtime.blocking
, and evalOn
doesn't do that. Also, WorkStealingThreadPool
is special-cased, and this forces virtual threads to be special cased, too. And in both cases, runtime.blocking
is ignored, which smells like a semantic issue that causes confusion.
An alternative would have been to look at the runtime.blocking
thread-pool and notice that we are already on it. In which case no thread shift would be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative would have been to look at the
runtime.blocking
thread-pool and notice that we are already on it.
It's an interesting idea, but this would apply only to virtual threads anyway: this is the only pool on which it is safe to submit/run tasks whether they are blocking or not. There is no other ExecutionContext
that can directly handle both: the WorkStealingThreadPool
and ForkJoinPool
can handle blocking but only if it is appropriately wrapped in scala.concurrent.blocking(...)
so in general it is not safe to submit arbitrary blocking tasks directly to these ExecutionContext
s.
And in both cases,
runtime.blocking
is ignored, which smells like a semantic issue that causes confusion.
I agree, it's confusing 😕 currently, runtime.blocking
operates as a fallback, when you are not on the WSTP or a virtual thread.
Also,
WorkStealingThreadPool
is special-cased, and this forces virtual threads to be special cased, too.
We could relax the special-casing a little bit by detecting if the current thread implements BlockContext
, in which case it would be safe to use the blocking(...)
mechanism. Besides encompassing the WSTP this would allow us to support the FJP as well.
But it doesn't help with virtual threads which do not and cannot implement BlockContext
so is out-of-scope for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ty for tackling this!
The merge-base changed after approval.
Closes #3869, h/t Daniel for the idea.
If a fiber detects that it is running on a virtual thread then it will invoke a blocking operation in-place (similar to the
scala.concurrent.blocking
mechanism). This allows to do stuff like:This PR is targeted at
series/3.x
because we are pinned to Scala 3.2 inseries/3.5.x
and that doesn't support JDK 21. We could backport the implementation changes here to3.5.x
but we cannot test them.@alexandru wdyt?