From b4ecd33042c371bd74b85372d68cb8b1c24a5cb0 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:38:19 +0000 Subject: [PATCH 01/11] Execute `IO.blocking` on WSTP without `BlockContext` indirection --- .../effect/unsafe/WorkStealingThreadPool.scala | 8 +++++--- .../scala/cats/effect/unsafe/WorkerThread.scala | 15 ++++++++++----- .../src/main/scala/cats/effect/IOFiber.scala | 2 +- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index cd740b0b41..e17ae8ac61 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -485,14 +485,16 @@ private[effect] final class WorkStealingThreadPool( } /** - * Checks if the blocking code can be executed in the current context (only returns true for - * worker threads that belong to this execution context). + * Checks if the blocking code can be executed in the current context and, if so, prepares it + * for blocking. Only returns true for worker threads that belong to this execution context. */ private[effect] def canExecuteBlockingCode(): Boolean = { val thread = Thread.currentThread() if (thread.isInstanceOf[WorkerThread]) { val worker = thread.asInstanceOf[WorkerThread] - worker.canExecuteBlockingCodeOn(this) + if (worker.canExecuteBlockingCodeOn(this)) + worker.prepareBlocking() + true } else { false } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 849e71a2d4..e3e6fb96ae 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -776,7 +776,7 @@ private final class WorkerThread( } /** - * A mechanism for executing support code before executing a blocking action. + * Support code that must be run before executing a blocking action on this thread. * * The current thread creates a replacement worker thread (or reuses a cached one) that will * take its place in the pool and does a complete transfer of ownership of the data structures @@ -797,7 +797,7 @@ private final class WorkerThread( * There is no reason to enclose any code in a `try/catch` block because the only way this * code path can be exercised is through `IO.delay`, which already handles exceptions. */ - override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + def prepareBlocking(): Unit = { val rnd = random pool.notifyParked(rnd) @@ -806,7 +806,6 @@ private final class WorkerThread( // This `WorkerThread` is already inside an enclosing blocking region. // There is no need to spawn another `WorkerThread`. Instead, directly // execute the blocking action. - thunk } else { // Spawn a new `WorkerThread` to take the place of this thread, as the // current thread prepares to execute a blocking action. @@ -853,11 +852,17 @@ private final class WorkerThread( pool.blockedWorkerThreadCounter.incrementAndGet() clone.start() } - - thunk } } + /** + * A mechanism for executing support code before executing a blocking action. + */ + override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + prepareBlocking() + thunk + } + private[this] def init(newIdx: Int): Unit = { _index = newIdx queue = pool.localQueues(newIdx) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index ffea35d49c..c3b59b3724 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -994,7 +994,7 @@ private final class IOFiber[A]( var error: Throwable = null val r = try { - scala.concurrent.blocking(cur.thunk()) + cur.thunk() } catch { case t if NonFatal(t) => error = t From e4a30c5f7598db2091aca8bf7af76126742591bb Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:44:33 +0000 Subject: [PATCH 02/11] Remove stale comment --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index e3e6fb96ae..05be8fc974 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -804,8 +804,7 @@ private final class WorkerThread( if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. - // There is no need to spawn another `WorkerThread`. Instead, directly - // execute the blocking action. + // There is no need to spawn another `WorkerThread`. } else { // Spawn a new `WorkerThread` to take the place of this thread, as the // current thread prepares to execute a blocking action. From 9e8f282eebe3eeda7183603baa8d6273739cf252 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:48:00 +0000 Subject: [PATCH 03/11] Update comments --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 05be8fc974..99c3ffeff4 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -803,7 +803,7 @@ private final class WorkerThread( pool.notifyParked(rnd) if (blocking) { - // This `WorkerThread` is already inside an enclosing blocking region. + // This `WorkerThread` has already been prepared for blocking. // There is no need to spawn another `WorkerThread`. } else { // Spawn a new `WorkerThread` to take the place of this thread, as the @@ -817,7 +817,7 @@ private final class WorkerThread( cedeBypass = null } - // Logically enter the blocking region. + // Logically become a blocking thread blocking = true val prefix = pool.blockerThreadPrefix From de501427201ff2fcd56513da02f26c4b291ae090 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:50:34 +0000 Subject: [PATCH 04/11] We like full sentences here. --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 99c3ffeff4..893df5d223 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -817,7 +817,7 @@ private final class WorkerThread( cedeBypass = null } - // Logically become a blocking thread + // Logically become a blocking thread. blocking = true val prefix = pool.blockerThreadPrefix From c2ef8bc67f1cc6862d5c9c737f6e107406c27c40 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 10:47:57 -0800 Subject: [PATCH 05/11] Fix conditionals --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index e17ae8ac61..dd0c564ecd 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -492,12 +492,12 @@ private[effect] final class WorkStealingThreadPool( val thread = Thread.currentThread() if (thread.isInstanceOf[WorkerThread]) { val worker = thread.asInstanceOf[WorkerThread] - if (worker.canExecuteBlockingCodeOn(this)) + if (worker.canExecuteBlockingCodeOn(this)) { worker.prepareBlocking() - true - } else { - false + return true + } } + false } /** From 7f05819684c4e374f504444ef04f847da4f5dc99 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:02:24 +0000 Subject: [PATCH 06/11] Fix off-by-1 in `CallbackStack#pack` --- .../scala/cats/effect/CallbackStack.scala | 2 +- .../scala/cats/effect/CallbackStackSpec.scala | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 8ba7b08a2d..0a23745921 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -127,7 +127,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) } else { if (child == null) { // bottomed out - removed + removed + 1 } else { // note this can cause the bound to go negative, which is fine child.packInternal(bound - 1, removed + 1, parent) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala new file mode 100644 index 0000000000..806f2eb40b --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +class CallbackStackSpec extends BaseSpec { + + "CallbackStack" should { + "correctly report the number removed" in { + val stack = CallbackStack[Unit](null) + val pushed = stack.push(_ => ()) + val handle = pushed.currentHandle() + pushed.clearCurrent(handle) + stack.pack(1) must beEqualTo(1) + } + } + +} From 444512f0ddda7ebce125e44de1e9ec9b6f5218c9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:54:56 +0000 Subject: [PATCH 07/11] Isolate thread state change to `prepareBlocking()` --- .../effect/unsafe/WorkStealingThreadPool.scala | 1 + .../effect/unsafe/WorkStealingThreadPool.scala | 18 +++++++++++++----- .../src/main/scala/cats/effect/IOFiber.scala | 2 ++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index f47fc7889a..702fa612a8 100644 --- a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -36,6 +36,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private () task: Runnable, fallback: Scheduler): Runnable private[effect] def canExecuteBlockingCode(): Boolean + private[effect] def prepareBlocking(): Unit private[unsafe] def liveTraces(): ( Map[Runnable, Trace], Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])], diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index dd0c564ecd..a3e66f897d 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -492,12 +492,20 @@ private[effect] final class WorkStealingThreadPool( val thread = Thread.currentThread() if (thread.isInstanceOf[WorkerThread]) { val worker = thread.asInstanceOf[WorkerThread] - if (worker.canExecuteBlockingCodeOn(this)) { - worker.prepareBlocking() - return true - } + worker.canExecuteBlockingCodeOn(this) + } else { + false } - false + } + + /** + * Prepares the current thread for running blocking code. This should be called only if + * [[canExecuteBlockingCode]] returns `true`. + */ + private[effect] def prepareBlocking(): Unit = { + val thread = Thread.currentThread() + val worker = thread.asInstanceOf[WorkerThread] + worker.prepareBlocking() } /** diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index c3b59b3724..402cbd75f2 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -991,6 +991,8 @@ private final class IOFiber[A]( if (ec.isInstanceOf[WorkStealingThreadPool]) { val wstp = ec.asInstanceOf[WorkStealingThreadPool] if (wstp.canExecuteBlockingCode()) { + wstp.prepareBlocking() + var error: Throwable = null val r = try { From bf03fb3004943071c049e7f06684e7e3f15d4950 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:57:57 +0000 Subject: [PATCH 08/11] Restore scaladoc --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index a3e66f897d..88e0cceff3 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -485,8 +485,8 @@ private[effect] final class WorkStealingThreadPool( } /** - * Checks if the blocking code can be executed in the current context and, if so, prepares it - * for blocking. Only returns true for worker threads that belong to this execution context. + * Checks if the blocking code can be executed in the current context (only returns true for + * worker threads that belong to this execution context). */ private[effect] def canExecuteBlockingCode(): Boolean = { val thread = Thread.currentThread() From 862b60cd1f27369881b2c320a3d06a975c703d75 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:59:20 +0000 Subject: [PATCH 09/11] Bikeshed --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 2 +- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 4 ++-- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 4 ++-- core/shared/src/main/scala/cats/effect/IOFiber.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 702fa612a8..1b0ff7b5a2 100644 --- a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -36,7 +36,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private () task: Runnable, fallback: Scheduler): Runnable private[effect] def canExecuteBlockingCode(): Boolean - private[effect] def prepareBlocking(): Unit + private[effect] def prepareForBlocking(): Unit private[unsafe] def liveTraces(): ( Map[Runnable, Trace], Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])], diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 88e0cceff3..ab815b25e3 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -502,10 +502,10 @@ private[effect] final class WorkStealingThreadPool( * Prepares the current thread for running blocking code. This should be called only if * [[canExecuteBlockingCode]] returns `true`. */ - private[effect] def prepareBlocking(): Unit = { + private[effect] def prepareForBlocking(): Unit = { val thread = Thread.currentThread() val worker = thread.asInstanceOf[WorkerThread] - worker.prepareBlocking() + worker.prepareForBlocking() } /** diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 893df5d223..35d265d251 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -797,7 +797,7 @@ private final class WorkerThread( * There is no reason to enclose any code in a `try/catch` block because the only way this * code path can be exercised is through `IO.delay`, which already handles exceptions. */ - def prepareBlocking(): Unit = { + def prepareForBlocking(): Unit = { val rnd = random pool.notifyParked(rnd) @@ -858,7 +858,7 @@ private final class WorkerThread( * A mechanism for executing support code before executing a blocking action. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - prepareBlocking() + prepareForBlocking() thunk } diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 402cbd75f2..d0f86707a6 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -991,7 +991,7 @@ private final class IOFiber[A]( if (ec.isInstanceOf[WorkStealingThreadPool]) { val wstp = ec.asInstanceOf[WorkStealingThreadPool] if (wstp.canExecuteBlockingCode()) { - wstp.prepareBlocking() + wstp.prepareForBlocking() var error: Throwable = null val r = From 0450001e9150de9b268604db473ac1a254bd49c6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 22:06:45 +0000 Subject: [PATCH 10/11] Relocate scaladoc note --- .../src/main/scala/cats/effect/unsafe/WorkerThread.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 35d265d251..ab3a068407 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -792,10 +792,6 @@ private final class WorkerThread( * continue, it will be cached for a period of time instead. Finally, the `blocking` flag is * useful when entering nested blocking regions. In this case, there is no need to spawn a * replacement worker thread. - * - * @note - * There is no reason to enclose any code in a `try/catch` block because the only way this - * code path can be exercised is through `IO.delay`, which already handles exceptions. */ def prepareForBlocking(): Unit = { val rnd = random @@ -856,6 +852,10 @@ private final class WorkerThread( /** * A mechanism for executing support code before executing a blocking action. + * + * @note + * There is no reason to enclose any code in a `try/catch` block because the only way this + * code path can be exercised is through `IO.delay`, which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { prepareForBlocking() From b442eb2e3516a17592be9083c77748e2ce0549a6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 22:42:22 +0000 Subject: [PATCH 11/11] Reset auto-cede counter after `blocking` --- core/shared/src/main/scala/cats/effect/IOFiber.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index ffea35d49c..f39d6114d7 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -1003,7 +1003,8 @@ private final class IOFiber[A]( } val next = if (error eq null) succeeded(r, 0) else failed(error, 0) - runLoop(next, nextCancelation, nextAutoCede) + // reset auto-cede counter + runLoop(next, nextCancelation, runtime.autoYieldThreshold) } else { blockingFallback(cur) }