Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block in-place if running on a virtual thread #3870

Merged
merged 11 commits into from
Nov 23, 2023

Conversation

armanbilge
Copy link
Member

Closes #3869, h/t Daniel for the idea.

If a fiber detects that it is running on a virtual thread then it will invoke a blocking operation in-place (similar to the scala.concurrent.blocking mechanism). This allows to do stuff like:

IO.interruptible(...).evalOnExecutor(Executors.newVirtualThreadPerTaskExecutor())

This PR is targeted at series/3.x because we are pinned to Scala 3.2 in series/3.5.x and that doesn't support JDK 21. We could backport the implementation changes here to 3.5.x but we cannot test them.

@alexandru wdyt?

classOf[Thread]
.getDeclaredMethod("isVirtual")
.invoke(Thread.currentThread())
.asInstanceOf[Boolean]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not used method handle too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is test code.

@wb14123
Copy link

wb14123 commented Oct 11, 2023

Will evalOnExecutor shift both blocking and unblocking IOs to the virtual threads pool?

@armanbilge
Copy link
Member Author

@wb14123 yes. To be more precise:

  1. evalOn (and evalOnExecutor) modify the compute execution context for a task, but not the blocking EC.
  2. After the changes in this PR, a fiber running on a virtual thread will run blocking operations in-place i.e. on the same virtual thread that it is already running on.

var error: Throwable = null
val r =
try {
cur.thunk()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also use scala.concurrent.blocking(cur.thunk()) here so that if thunk() is an operation that blocks the OS thread, JVM can spawn another OS thread? If it doesn't block the OS thread, since val ec = currentCtx is virtual thread pool, JVM will still use the same OS thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala.concurrent.blocking doesn't do anything for virtual threads, because virtual threads do not extend scala.concurrent.BlockContext :)

if thunk() is an operation that blocks the OS thread, JVM can spawn another OS thread

The entire point of using virtual threads is that the JVM will manage all of this for you. Further reading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe I am wrong about that 🤔

The scheduler does not compensate for pinning by expanding its parallelism

https://openjdk.org/jeps/425

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it only creates a virtual thread if you submit the task to virtual thread executor. So maybe something like currentCtx.execute(cur.thunk) (suppose currentCtx is the virtual thread executor)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I read the wrong section.

However, some blocking operations in the JDK do not unmount the virtual thread, and thus block both its carrier and the underlying OS thread. This is because of limitations either at the OS level (e.g., many filesystem operations) or at the JDK level (e.g., Object.wait()). The implementation of these blocking operations will compensate for the capture of the OS thread by temporarily expanding the parallelism of the scheduler. Consequently, the number of platform threads in the scheduler's ForkJoinPool may temporarily exceed the number of available processors.

It's very confusing. But the point is that the JVM is handling it 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that blockingOp and nonblockingOp share the same virtual thread?

Yes they will share the same virtual thread. But that is not a problem: the second IO should not run until the first IO completes, so it does not matter if the virtual thread is blocked.

Copy link

@wb14123 wb14123 Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my bad... I meant to run them in parallel: IO.blocking(blockingOp) &> IO(nonblockingOp)

Copy link
Member Author

@armanbilge armanbilge Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, an independent fiber will be created for each IO, and those fibers will be submitted to the execution context. If you are using newVirtualThreadPerTaskExecutor that means that each of those fibers will run on its own virtual thread.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, in this case it should be fine then, thanks for the patient explanation!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the JVM is handling it. Also, some limits may be lifted in time, such as Object.wait.

Comment on lines +999 to +1003
} else if (isVirtualThread(Thread.currentThread())) {
var error: Throwable = null
val r =
try {
cur.thunk()
Copy link
Member Author

@armanbilge armanbilge Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very important to call out: there is a big assumption here that if we are running on a virtual thread, the model being used is that each fiber is running on a dedicated virtual thread. While this seems like the most sensible way to use virtual threads, it's not the only way.

For example, someone could build an execution context with a fixed number of virtual threads that poll from a shared task queue. In that case, blocking the virtual thread as we are doing here would be very problematic for them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

someone could build an execution context with a fixed number of virtual threads

I think that is looking for trouble. The documentation literally says "Never Pool Virtual Threads"...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be cases in which limiting the number of possible virtual threads is desirable. For example, for limiting parallelism in certain instances. But I don't see the problem, because it would work as the user intended.

The only question is if we should integrate with BlockContext. I think we can integrate, as I don't think you necessarily need to inherit Threads from it. But for now, I don't think it's necessary; mostly because I can't think of any reason for wanting to limit virtual threads while allowing more threads in case of blocking operations.

Copy link
Member Author

@armanbilge armanbilge Oct 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be cases in which limiting the number of possible virtual threads is desirable.

To be clear: there is actually no issue with limiting the number of virtual threads. The concern is if a virtual thread may be responsible for running more than one fiber. So long as every fiber has its own virtual thread, then there is no problem.

edit: I wrote that too late at night and it doesn't make sense, sorry :)


I think we can integrate, as I don't think you necessarily need to inherit Threads from it.

You can also install the BlockContext as a thread-local, but making this work will add some allocations and overhead.

mh = lookup.findVirtual(Thread.class, "isVirtual", mt);
} catch (Throwable t) {
mh =
MethodHandles.dropArguments(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems wasteful to produce a method handle that always returns false when invoked. Maybe I'm missing something, maybe this ends up as being inlined, and it doesn't matter, but I would see this method more like:

  static boolean isVirtualThread(final Thread thread) {
    if (THREAD_IS_VIRTUAL_HANDLE != null)
        try {
          return (boolean) THREAD_IS_VIRTUAL_HANDLE.invokeExact(thread);
        } catch (Throwable t) {}
    return false;
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I mean I have no idea without studying the assembly and/or benchmarking 😄

In your version, we depend on the JVM optimizing away the conditional and branching. In my version there is no branching, just a static method call, which would ideally be inlined.

Comment on lines +999 to +1003
} else if (isVirtualThread(Thread.currentThread())) {
var error: Throwable = null
val r =
try {
cur.thunk()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be cases in which limiting the number of possible virtual threads is desirable. For example, for limiting parallelism in certain instances. But I don't see the problem, because it would work as the user intended.

The only question is if we should integrate with BlockContext. I think we can integrate, as I don't think you necessarily need to inherit Threads from it. But for now, I don't think it's necessary; mostly because I can't think of any reason for wanting to limit virtual threads while allowing more threads in case of blocking operations.

Copy link
Member

@alexandru alexandru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good 👍

}

val next = if (error eq null) succeeded(r, 0) else failed(error, 0)
runLoop(next, nextCancelation, nextAutoCede)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimization is good to have in the current implementation, but there's something I don't like...

I would have expected the ability to override runtime.blocking, and evalOn doesn't do that. Also, WorkStealingThreadPool is special-cased, and this forces virtual threads to be special cased, too. And in both cases, runtime.blocking is ignored, which smells like a semantic issue that causes confusion.

An alternative would have been to look at the runtime.blocking thread-pool and notice that we are already on it. In which case no thread shift would be necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative would have been to look at the runtime.blocking thread-pool and notice that we are already on it.

It's an interesting idea, but this would apply only to virtual threads anyway: this is the only pool on which it is safe to submit/run tasks whether they are blocking or not. There is no other ExecutionContext that can directly handle both: the WorkStealingThreadPool and ForkJoinPool can handle blocking but only if it is appropriately wrapped in scala.concurrent.blocking(...) so in general it is not safe to submit arbitrary blocking tasks directly to these ExecutionContexts.

And in both cases, runtime.blocking is ignored, which smells like a semantic issue that causes confusion.

I agree, it's confusing 😕 currently, runtime.blocking operates as a fallback, when you are not on the WSTP or a virtual thread.

Also, WorkStealingThreadPool is special-cased, and this forces virtual threads to be special cased, too.

We could relax the special-casing a little bit by detecting if the current thread implements BlockContext, in which case it would be safe to use the blocking(...) mechanism. Besides encompassing the WSTP this would allow us to support the FJP as well.

But it doesn't help with virtual threads which do not and cannot implement BlockContext so is out-of-scope for this PR.

@armanbilge armanbilge added this to the v3.6.0 milestone Nov 15, 2023
djspiewak
djspiewak previously approved these changes Nov 23, 2023
Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty for tackling this!

@armanbilge armanbilge dismissed djspiewak’s stale review November 23, 2023 18:56

The merge-base changed after approval.

@djspiewak djspiewak merged commit bd93f74 into typelevel:series/3.x Nov 23, 2023
32 of 36 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Virtual Threads (Project Loom) in IO.blocking / IO.interruptible
6 participants