From eb13fa6dee43b43cbdef2a29df3ff9b2353a54b6 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 16:54:08 +0200 Subject: [PATCH 01/16] Add a reproduction unit test for the blocking issue --- .../effect/unsafe/HelperThreadParkSpec.scala | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala new file mode 100644 index 0000000000..73ecbe6beb --- /dev/null +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.syntax.all._ + +import scala.concurrent.{Await, Promise} +import scala.concurrent.duration._ + +class HelperThreadParkSpec extends BaseSpec { + + "HelperThread" should { + "not give up when fibers are late" in real { + def smallRuntime(): IORuntime = { + lazy val rt: IORuntime = { + val (blocking, blockDown) = + IORuntime.createDefaultBlockingExecutionContext(threadPrefix = + s"io-blocking-${getClass.getName}") + val (scheduler, schedDown) = + IORuntime.createDefaultScheduler(threadPrefix = s"io-scheduler-${getClass.getName}") + val (compute, compDown) = + IORuntime.createDefaultComputeThreadPool( + rt, + threadPrefix = s"io-compute-${getClass.getName}", + threads = 2) + + IORuntime( + compute, + blocking, + scheduler, + { () => + compDown() + blockDown() + schedDown() + }, + IORuntimeConfig() + ) + } + + rt + } + + Resource.make(IO(smallRuntime()))(rt => IO(rt.shutdown())).use { rt => + val io = for { + p <- IO(Promise[Unit]()) + _ <- (IO.sleep(50.millis) *> IO( + rt.scheduler.sleep(100.millis, () => p.success(())))).start + _ <- IO(Await.result(p.future, Duration.Inf)) + } yield () + + List + .fill(10)(io.start) + .sequence + .flatMap(_.traverse(_.joinWithNever)) + .evalOn(rt.compute) >> IO(ok) + } + } + } +} From 9b0a530f43224b38e7c111bad70630b3f7de173e Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 16:54:28 +0200 Subject: [PATCH 02/16] Add a blocking stress test --- .../effect/unsafe/BlockingStressSpec.scala | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala new file mode 100644 index 0000000000..a2f08c6b78 --- /dev/null +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.syntax.traverse._ + +import scala.concurrent.{blocking, Await, Promise} +import scala.concurrent.duration._ +import scala.util.Random + +import java.util.concurrent.CountDownLatch + +class BlockingStressSpec extends BaseSpec { + + private val count = 171 + + "Blocking" should { + "work properly with many blocking actions and helper threads" in realWithRuntime { rt => + def io(latch: CountDownLatch) = for { + p <- IO(Promise[Unit]()) + d1 <- IO(Random.nextInt(50)) + d2 <- IO(Random.nextInt(100)) + _ <- (IO.sleep(d1.millis) *> IO( + rt.scheduler.sleep(d2.millis, () => p.success(())))).start + _ <- IO(Await.result(p.future, Duration.Inf)) + _ <- IO(latch.countDown()) + } yield () + + for { + latch <- IO(new CountDownLatch(count)) + _ <- List.fill(count)(io(latch).start.void).sequence.void + _ <- IO(blocking(latch.await())) + res <- IO(ok) + } yield res + } + } +} From 2be5bcac43624c832602ed4f0d92a6abbf8a07dc Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 17:05:18 +0200 Subject: [PATCH 03/16] Implement `LocalQueue#drain` properly (quote below) - "I didn't have time to write a short letter, so I wrote a long one instead." -- Mark Twain --- .../scala/cats/effect/unsafe/LocalQueue.scala | 19 ++++++++++++++----- .../scala/cats/effect/unsafe/ScalQueue.scala | 6 ++---- .../cats/effect/unsafe/WorkerThread.scala | 4 +--- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala index 743d52cbaf..fb6fffb219 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala @@ -137,6 +137,7 @@ import java.util.concurrent.atomic.AtomicInteger */ private final class LocalQueue { + import LocalQueue._ import LocalQueueConstants._ /** @@ -739,10 +740,10 @@ private final class LocalQueue { * * @note Can '''only''' be correctly called by the owner [[WorkerThread]]. * - * @param dst the destination array in which all remaining fibers are - * transferred + * @return the destination array which contains all of the fibers previously + * enqueued on the local queue */ - def drain(dst: Array[IOFiber[_]]): Unit = { + def drain(): Array[IOFiber[_]] = { // A plain, unsynchronized load of the tail of the local queue. val tl = tail @@ -758,7 +759,7 @@ private final class LocalQueue { if (tl == real) { // The tail and the "real" value of the head are equal. The queue is // empty. There is nothing more to be done. - return + return EmptyDrain } // Make sure to preserve the "steal" tag in the presence of a concurrent @@ -771,6 +772,7 @@ private final class LocalQueue { // secured. Proceed to null out the references to the fibers and // transfer them to the destination list. val n = unsignedShortSubtraction(tl, real) + val dst = new Array[IOFiber[_]](n) var i = 0 while (i < n) { val idx = index(real + i) @@ -781,9 +783,12 @@ private final class LocalQueue { } // The fibers have been transferred. Break out of the loop. - return + return dst } } + + // Unreachable code. + EmptyDrain } /** @@ -1051,3 +1056,7 @@ private final class LocalQueue { */ def getTailTag(): Int = tailPublisher.get() } + +private object LocalQueue { + private[LocalQueue] val EmptyDrain: Array[IOFiber[_]] = new Array(0) +} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala index dfb9e6e740..e2856f8cd5 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala @@ -110,10 +110,8 @@ private final class ScalQueue[A <: AnyRef](threadCount: Int) { var i = 0 while (i < len) { val fiber = as(i) - if (fiber ne null) { - val idx = random.nextInt(nq) - queues(idx).offer(fiber) - } + val idx = random.nextInt(nq) + queues(idx).offer(fiber) i += 1 } } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 663a7f1402..252ed52fe5 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -56,7 +56,6 @@ private final class WorkerThread( extends Thread with BlockContext { - import LocalQueueConstants._ import WorkStealingThreadPoolConstants._ /** @@ -482,8 +481,7 @@ private final class WorkerThread( } } else { // Drain the local queue to the `overflow` queue. - val drain = new Array[IOFiber[_]](LocalQueueCapacity) - queue.drain(drain) + val drain = queue.drain() overflow.offerAll(drain, random) if (blocking) { From 17b268c371337eb2221bff0b66d7670075e25cdb Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 17:15:25 +0200 Subject: [PATCH 04/16] Schedule fibers on the overflow queue when blocking --- .../src/main/scala/cats/effect/unsafe/WorkerThread.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 252ed52fe5..36a25e4736 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -131,12 +131,16 @@ private final class WorkerThread( * matches the reference of the pool provided when this [[WorkerThread]] was * constructed. * + * @note When blocking code is being executed on this worker thread, it is + * important to delegate all scheduling operation to the overflow queue + * from which all [[HelperThread]] instances operate. + * * @param threadPool a work stealing thread pool reference * @return `true` if this worker thread is owned by the provided work stealing * thread pool, `false` otherwise */ def isOwnedBy(threadPool: WorkStealingThreadPool): Boolean = - pool eq threadPool + (pool eq threadPool) && !blocking /** * The run loop of the [[WorkerThread]]. @@ -442,7 +446,7 @@ private final class WorkerThread( * A mechanism for executing support code before executing a blocking action. * * This is a slightly more involved implementation of the support code in - * anticipation of running blocking code, also implemented in [[WorkerThread]]. + * anticipation of running blocking code, also implemented in [[HelperThread]]. * * For a more detailed discussion on the design principles behind the support * for running blocking actions on the [[WorkStealingThreadPool]], check the From 0b9d9cf84b24a3fdf02425c62360aab52b5b7b66 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 17:20:23 +0200 Subject: [PATCH 05/16] Do not rely on worker threads helping with blocking --- .../cats/effect/unsafe/WorkerThread.scala | 123 +++++++----------- 1 file changed, 49 insertions(+), 74 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 36a25e4736..c456c9f8fc 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -460,82 +460,57 @@ private final class WorkerThread( * monomorphic callsites in the `IOFiber` runloop. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - // Try waking up a `WorkerThread` to steal fibers from the `LocalQueue` of - // this thread. - val rnd = random - if (pool.notifyParked(rnd)) { - // Successfully woke up another `WorkerThread` to help out with the - // anticipated blocking. Even if this thread ends up being blocked for - // some time, the other worker would be able to steal its fibers. - - if (blocking) { - // This `WorkerThread` is already inside an enclosing blocking region. - thunk - } else { - // Logically enter the blocking region. - blocking = true - - val result = thunk - - // Logically exit the blocking region. - blocking = false - - // Return the computed result from the blocking operation. - result - } + // Drain the local queue to the `overflow` queue. + val drain = queue.drain() + overflow.offerAll(drain, random) + + if (blocking) { + // This `WorkerThread` is already inside an enclosing blocking region. + // There is no need to spawn another `HelperThread`. Instead, directly + // execute the blocking action. + thunk } else { - // Drain the local queue to the `overflow` queue. - val drain = queue.drain() - overflow.offerAll(drain, random) - - if (blocking) { - // This `WorkerThread` is already inside an enclosing blocking region. - // There is no need to spawn another `HelperThread`. Instead, directly - // execute the blocking action. - thunk - } else { - // Spawn a new `HelperThread` to take the place of this thread, as the - // current thread prepares to execute a blocking action. - - // Logically enter the blocking region. - blocking = true - - // Spawn a new `HelperThread`. - val helper = - new HelperThread(threadPrefix, blockingThreadCounter, batched, overflow, pool) - helper.start() - - // With another `HelperThread` started, it is time to execute the blocking - // action. - val result = thunk - - // Blocking is finished. Time to signal the spawned helper thread. - helper.setSignal() - - // Do not proceed until the helper thread has fully died. This is terrible - // for performance, but it is justified in this case as the stability of - // the `WorkStealingThreadPool` is of utmost importance in the face of - // blocking, which in itself is **not** what the pool is optimized for. - // In practice however, unless looking at a completely pathological case - // of propagating blocking actions on every spawned helper thread, this is - // not an issue, as the `HelperThread`s are all executing `IOFiber[_]` - // instances, which mostly consist of non-blocking code. - try helper.join() - catch { - case _: InterruptedException => - // Propagate interruption to the helper thread. - Thread.interrupted() - helper.interrupt() - helper.join() - this.interrupt() - } - - // Logically exit the blocking region. - blocking = false - - // Return the computed result from the blocking operation - result + // Spawn a new `HelperThread` to take the place of this thread, as the + // current thread prepares to execute a blocking action. + + // Logically enter the blocking region. + blocking = true + + // Spawn a new `HelperThread`. + val helper = + new HelperThread(threadPrefix, blockingThreadCounter, batched, overflow, pool) + helper.start() + + // With another `HelperThread` started, it is time to execute the blocking + // action. + val result = thunk + + // Blocking is finished. Time to signal the spawned helper thread. + helper.setSignal() + + // Do not proceed until the helper thread has fully died. This is terrible + // for performance, but it is justified in this case as the stability of + // the `WorkStealingThreadPool` is of utmost importance in the face of + // blocking, which in itself is **not** what the pool is optimized for. + // In practice however, unless looking at a completely pathological case + // of propagating blocking actions on every spawned helper thread, this is + // not an issue, as the `HelperThread`s are all executing `IOFiber[_]` + // instances, which mostly consist of non-blocking code. + try helper.join() + catch { + case _: InterruptedException => + // Propagate interruption to the helper thread. + Thread.interrupted() + helper.interrupt() + helper.join() + this.interrupt() } + + // Logically exit the blocking region. + blocking = false + + // Return the computed result from the blocking operation + result } } } From fd9842c7f8187ad94853c11c5d5dfbf26a9e4d9f Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 17:30:04 +0200 Subject: [PATCH 06/16] Do not forget to drain the `cedeBypass` fiber --- .../src/main/scala/cats/effect/unsafe/WorkerThread.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index c456c9f8fc..ff03138ee0 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -461,8 +461,14 @@ private final class WorkerThread( */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { // Drain the local queue to the `overflow` queue. + val rnd = random val drain = queue.drain() - overflow.offerAll(drain, random) + overflow.offerAll(drain, rnd) + val cedeFiber = cedeBypass + if (cedeFiber ne null) { + cedeBypass = null + overflow.offer(cedeFiber, rnd) + } if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. From de056162943b5bbd1b3d09b13ec34832909aadd8 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 19:44:49 +0200 Subject: [PATCH 07/16] Try notifying worker threads when offering to the overflow queue --- .../jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala | 5 ++++- .../jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index dac2cca23d..87a145a6d1 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -113,7 +113,9 @@ private final class HelperThread( * @param fiber the fiber to be scheduled on the `overflow` queue */ def schedule(fiber: IOFiber[_]): Unit = { - overflow.offer(fiber, random) + val rnd = random + overflow.offer(fiber, rnd) + pool.notifyParked(rnd) () } @@ -166,6 +168,7 @@ private final class HelperThread( return } else { overflow.offerAll(batch, rnd) + pool.notifyParked(rnd) } } else { fiber.run() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index ff03138ee0..3e81e42a05 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -274,6 +274,7 @@ private final class WorkerThread( // Enqueue the batch at the back of the local queue and execute // the first fiber. val fiber = queue.enqueueBatch(batch) + pool.notifyParked(random) fiber.run() } fairness = 2 @@ -470,6 +471,8 @@ private final class WorkerThread( overflow.offer(cedeFiber, rnd) } + pool.notifyParked(rnd) + if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. // There is no need to spawn another `HelperThread`. Instead, directly From e41c58b62a199d83b82a6c6e1fdef6d9d6477cbf Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Mon, 6 Sep 2021 23:44:22 +0200 Subject: [PATCH 08/16] Unclog the batched queue before making progress on the overflow queue --- .../cats/effect/unsafe/HelperThread.scala | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 87a145a6d1..1109021a77 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -152,26 +152,19 @@ private final class HelperThread( // been shut down, or the `WorkerThread` which spawned this `HelperThread` // has finished blocking. while (!isInterrupted() && !signal.get()) { + // Check the batched queue. + val batch = batched.poll(rnd) + if (batch ne null) { + overflow.offerAll(batch, rnd) + pool.notifyParked(rnd) + } + val fiber = overflow.poll(rnd) - if (fiber eq null) { - // Fall back to checking the batched queue. - val batch = batched.poll(rnd) - if (batch eq null) { - // There are no more fibers neither in the overflow queue, nor in the - // batched queue. Since the queues are not a blocking queue, there is - // no point in busy waiting, especially since there is no guarantee - // that the `WorkerThread` which spawned this `HelperThread` will ever - // exit the blocking region, and new external work may never arrive on - // the `overflow` queue. This pathological case is not handled as it - // is a case of uncontrolled blocking on a fixed thread pool, an - // inherently careless and unsafe situation. - return - } else { - overflow.offerAll(batch, rnd) - pool.notifyParked(rnd) - } - } else { + if (fiber ne null) { fiber.run() + } else { + // Park. + return } } } From 73c56bbff04322f2e5b0c5723eb0360546700d2c Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 00:16:18 +0200 Subject: [PATCH 09/16] `HelperThread` gets a parking mechanism --- .../cats/effect/unsafe/HelperThread.scala | 46 +++++++++++++++---- .../scala/cats/effect/unsafe/ScalQueue.scala | 11 +++++ .../unsafe/WorkStealingThreadPool.scala | 40 +++++++++++++--- .../cats/effect/unsafe/WorkerThread.scala | 6 ++- 4 files changed, 86 insertions(+), 17 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 1109021a77..43da432576 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -17,10 +17,12 @@ package cats.effect package unsafe +import scala.annotation.tailrec import scala.concurrent.{BlockContext, CanAwait} import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.LockSupport /** * A helper thread which is spawned whenever a blocking action is being executed @@ -78,7 +80,7 @@ private final class HelperThread( * [[HelperThread]] signals that it has successfully exited the blocking code * region and that this [[HelperThread]] should finalize. */ - private[this] val signal: AtomicBoolean = new AtomicBoolean(false) + private[this] val signal: AtomicInteger = new AtomicInteger(1) /** * A flag which is set whenever a blocking code region is entered. This is @@ -103,7 +105,7 @@ private final class HelperThread( * and die. */ def setSignal(): Unit = { - signal.lazySet(true) + signal.set(2) } /** @@ -115,8 +117,15 @@ private final class HelperThread( def schedule(fiber: IOFiber[_]): Unit = { val rnd = random overflow.offer(fiber, rnd) - pool.notifyParked(rnd) - () + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } + } + + @tailrec + def unpark(): Unit = { + if (signal.get() == 0 && !signal.compareAndSet(0, 1)) + unpark() } /** @@ -148,23 +157,40 @@ private final class HelperThread( random = ThreadLocalRandom.current() val rnd = random + def parkLoop(): Unit = { + var cont = true + while (cont && !isInterrupted()) { + LockSupport.park(pool) + + cont = signal.get() == 0 + } + } + // Check for exit condition. Do not continue if the `WorkStealingPool` has // been shut down, or the `WorkerThread` which spawned this `HelperThread` // has finished blocking. - while (!isInterrupted() && !signal.get()) { + while (!isInterrupted() && signal.get() != 2) { // Check the batched queue. val batch = batched.poll(rnd) if (batch ne null) { overflow.offerAll(batch, rnd) - pool.notifyParked(rnd) + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } } val fiber = overflow.poll(rnd) if (fiber ne null) { fiber.run() } else { - // Park. - return + val cur = signal.get() + if (cur == 2) { + return + } else if (signal.compareAndSet(1, 0)) { + pool.transitionHelperToParked(this, rnd) + pool.notifyIfWorkPending(rnd) + parkLoop() + } } } } @@ -195,7 +221,9 @@ private final class HelperThread( val result = thunk // Blocking is finished. Time to signal the spawned helper thread. + pool.removeParkedHelper(helper, random) helper.setSignal() + LockSupport.unpark(helper) // Do not proceed until the helper thread has fully died. This is terrible // for performance, but it is justified in this case as the stability of diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala index e2856f8cd5..99d775b2ea 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala @@ -138,6 +138,17 @@ private final class ScalQueue[A <: AnyRef](threadCount: Int) { a } + def remove(a: A): Unit = { + val nq = numQueues + var i = 0 + var done = false + + while (!done && i < nq) { + done = queues(i).remove(a) + i += 1 + } + } + /** * Checks if this Scal queue is empty. * diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 5ac09a05ab..e33bc2f9be 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -75,6 +75,8 @@ private[effect] final class WorkStealingThreadPool( private[this] val localQueues: Array[LocalQueue] = new Array(threadCount) private[this] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount) + private[this] val helperThreads: ScalQueue[HelperThread] = new ScalQueue(threadCount) + /** * The batched queue on which spillover work from other local queues can end * up. @@ -222,6 +224,14 @@ private[effect] final class WorkStealingThreadPool( false } + private[unsafe] def notifyHelper(random: ThreadLocalRandom): Unit = { + val helper = helperThreads.poll(random) + if (helper ne null) { + helper.unpark() + LockSupport.unpark(helper) + } + } + /** * Checks the number of active and searching worker threads and decides * whether another thread should be notified of new work. @@ -258,15 +268,15 @@ private[effect] final class WorkStealingThreadPool( // If no work was found in the local queues of the worker threads, look for // work in the batched queue. - if (batchedQueue.nonEmpty()) { - notifyParked(random) + if (batchedQueue.nonEmpty() && !notifyParked(random)) { + notifyHelper(random) + return } // If no work was found in the local queues of the worker threads or in the // batched queue, look for work in the external queue. - if (overflowQueue.nonEmpty()) { - notifyParked(random) - () + if (overflowQueue.nonEmpty() && !notifyParked(random)) { + notifyHelper(random) } } @@ -339,6 +349,21 @@ private[effect] final class WorkStealingThreadPool( () } + private[unsafe] def transitionHelperToParked( + helper: HelperThread, + random: ThreadLocalRandom): Unit = { + helperThreads.offer(helper, random) + } + + private[unsafe] def removeParkedHelper( + helper: HelperThread, + random: ThreadLocalRandom): Unit = { + helperThreads.remove(helper) + if (!notifyParked(random)) { + notifyHelper(random) + } + } + /** * Schedules a fiber on this thread pool. * @@ -420,8 +445,9 @@ private[effect] final class WorkStealingThreadPool( private[this] def scheduleExternal(fiber: IOFiber[_]): Unit = { val random = ThreadLocalRandom.current() overflowQueue.offer(fiber, random) - notifyParked(random) - () + if (!notifyParked(random)) { + notifyHelper(random) + } } /** diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 3e81e42a05..6d8b9464b6 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -471,7 +471,9 @@ private final class WorkerThread( overflow.offer(cedeFiber, rnd) } - pool.notifyParked(rnd) + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. @@ -495,7 +497,9 @@ private final class WorkerThread( val result = thunk // Blocking is finished. Time to signal the spawned helper thread. + pool.removeParkedHelper(helper, random) helper.setSignal() + LockSupport.unpark(helper) // Do not proceed until the helper thread has fully died. This is terrible // for performance, but it is justified in this case as the stability of From 579b77d27d519927526e1628a9c4c9249d3f6a8a Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 00:21:29 +0200 Subject: [PATCH 10/16] Simplify the parking condition --- .../scala/cats/effect/unsafe/HelperThread.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 43da432576..1cd7fb655c 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -182,15 +182,10 @@ private final class HelperThread( val fiber = overflow.poll(rnd) if (fiber ne null) { fiber.run() - } else { - val cur = signal.get() - if (cur == 2) { - return - } else if (signal.compareAndSet(1, 0)) { - pool.transitionHelperToParked(this, rnd) - pool.notifyIfWorkPending(rnd) - parkLoop() - } + } else if (signal.compareAndSet(1, 0)) { + pool.transitionHelperToParked(this, rnd) + pool.notifyIfWorkPending(rnd) + parkLoop() } } } From a273b88ccc0430de726b8e037b34141661818d47 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 09:58:14 +0200 Subject: [PATCH 11/16] Always do some notifying before running blocking code --- .../src/main/scala/cats/effect/unsafe/HelperThread.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 1cd7fb655c..4f530d684c 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -194,6 +194,11 @@ private final class HelperThread( * A mechanism for executing support code before executing a blocking action. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + val rnd = random + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } + if (blocking) { // This `HelperThread` is already inside an enclosing blocking region. // There is no need to spawn another `HelperThread`. Instead, directly @@ -216,7 +221,7 @@ private final class HelperThread( val result = thunk // Blocking is finished. Time to signal the spawned helper thread. - pool.removeParkedHelper(helper, random) + pool.removeParkedHelper(helper, rnd) helper.setSignal() LockSupport.unpark(helper) From f6096dffe008b52692efb161069bdaa00609f4e1 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 10:00:02 +0200 Subject: [PATCH 12/16] Make the stress test even more stressful --- .../test/scala/cats/effect/unsafe/BlockingStressSpec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala index a2f08c6b78..946f1e5c83 100644 --- a/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala @@ -27,7 +27,10 @@ import java.util.concurrent.CountDownLatch class BlockingStressSpec extends BaseSpec { - private val count = 171 + override def executionTimeout: FiniteDuration = 30.seconds + + // This test spawns a lot of helper threads. + private val count = 1000 "Blocking" should { "work properly with many blocking actions and helper threads" in realWithRuntime { rt => @@ -38,7 +41,7 @@ class BlockingStressSpec extends BaseSpec { _ <- (IO.sleep(d1.millis) *> IO( rt.scheduler.sleep(d2.millis, () => p.success(())))).start _ <- IO(Await.result(p.future, Duration.Inf)) - _ <- IO(latch.countDown()) + _ <- IO(blocking(latch.countDown())) } yield () for { From a48bbf2314627036a6c2325df05398bf22de525c Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 10:03:30 +0200 Subject: [PATCH 13/16] Joining seems to bog down for as of yet unexplained reasons - I have confirmed that all helper threads fully die when blocking finishes by profiling the code --- .../cats/effect/unsafe/HelperThread.scala | 18 ------------------ .../cats/effect/unsafe/WorkerThread.scala | 18 ------------------ 2 files changed, 36 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 4f530d684c..67cf023308 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -225,24 +225,6 @@ private final class HelperThread( helper.setSignal() LockSupport.unpark(helper) - // Do not proceed until the helper thread has fully died. This is terrible - // for performance, but it is justified in this case as the stability of - // the `WorkStealingThreadPool` is of utmost importance in the face of - // blocking, which in itself is **not** what the pool is optimized for. - // In practice however, unless looking at a completely pathological case - // of propagating blocking actions on every spawned helper thread, this is - // not an issue, as the `HelperThread`s are all executing `IOFiber[_]` - // instances, which mostly consist of non-blocking code. - try helper.join() - catch { - case _: InterruptedException => - // Propagate interruption to the helper thread. - Thread.interrupted() - helper.interrupt() - helper.join() - this.interrupt() - } - // Logically exit the blocking region. blocking = false diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 6d8b9464b6..cb8df2fd48 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -501,24 +501,6 @@ private final class WorkerThread( helper.setSignal() LockSupport.unpark(helper) - // Do not proceed until the helper thread has fully died. This is terrible - // for performance, but it is justified in this case as the stability of - // the `WorkStealingThreadPool` is of utmost importance in the face of - // blocking, which in itself is **not** what the pool is optimized for. - // In practice however, unless looking at a completely pathological case - // of propagating blocking actions on every spawned helper thread, this is - // not an issue, as the `HelperThread`s are all executing `IOFiber[_]` - // instances, which mostly consist of non-blocking code. - try helper.join() - catch { - case _: InterruptedException => - // Propagate interruption to the helper thread. - Thread.interrupted() - helper.interrupt() - helper.join() - this.interrupt() - } - // Logically exit the blocking region. blocking = false From fa3914b603f868e0156ec1c8c8e9e364eebc4714 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 10:49:24 +0200 Subject: [PATCH 14/16] Add mima filters for the `LocalQueue` method signature change --- build.sbt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 793356165f..f6be07f102 100644 --- a/build.sbt +++ b/build.sbt @@ -362,7 +362,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform) // changes to `cats.effect.unsafe` package private code ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.IORuntime.this"), ProblemFilters.exclude[DirectMissingMethodProblem]( - "cats.effect.unsafe.IORuntime.$default$6") + "cats.effect.unsafe.IORuntime.$default$6"), + // introduced by #3182, Address issues with the blocking mechanism of the thread pool + // changes to `cats.effect.unsafe` package private code + ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.LocalQueue.drain") ) ) .jvmSettings( From bd1e2fe37a6523ce9664c2ad2b8d57cf273f5f0b Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 11:23:12 +0200 Subject: [PATCH 15/16] Add descriptive comments for the new code --- .../cats/effect/unsafe/HelperThread.scala | 24 ++++++++++- .../scala/cats/effect/unsafe/ScalQueue.scala | 14 ++++++ .../unsafe/WorkStealingThreadPool.scala | 43 +++++++++++++++++++ .../cats/effect/unsafe/WorkerThread.scala | 11 ++++- 4 files changed, 90 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 67cf023308..342c151703 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -122,6 +122,9 @@ private final class HelperThread( } } + /** + * Marks the [[HelperThread]] as eligible for resuming work. + */ @tailrec def unpark(): Unit = { if (signal.get() == 0 && !signal.compareAndSet(0, 1)) @@ -160,8 +163,10 @@ private final class HelperThread( def parkLoop(): Unit = { var cont = true while (cont && !isInterrupted()) { + // Park the thread until further notice. LockSupport.park(pool) + // Spurious wakeup check. cont = signal.get() == 0 } } @@ -183,6 +188,12 @@ private final class HelperThread( if (fiber ne null) { fiber.run() } else if (signal.compareAndSet(1, 0)) { + // There are currently no more fibers available on the overflow or + // batched queues. However, the thread that originally started this + // helper thread has not been unblocked. The fibers that will + // eventually unblock that original thread might not have arrived on + // the pool yet. The helper thread should suspend and await for a + // notification of incoming work. pool.transitionHelperToParked(this, rnd) pool.notifyIfWorkPending(rnd) parkLoop() @@ -192,8 +203,14 @@ private final class HelperThread( /** * A mechanism for executing support code before executing a blocking action. + * + * @note There is no reason to enclose any code in a `try/catch` block because + * the only way this code path can be exercised is through `IO.delay`, + * which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // Try waking up a `WorkerThread` to handle fibers from the overflow and + // batched queues. val rnd = random if (!pool.notifyParked(rnd)) { pool.notifyHelper(rnd) @@ -220,7 +237,12 @@ private final class HelperThread( // action. val result = thunk - // Blocking is finished. Time to signal the spawned helper thread. + // Blocking is finished. Time to signal the spawned helper thread and + // unpark it. Furthermore, the thread needs to be removed from the + // parked helper threads queue in the pool so that other threads don't + // mistakenly depend on it to bail them out of blocking situations, and + // of course, this also removes the last strong reference to the fiber, + // which needs to be released for gc purposes. pool.removeParkedHelper(helper, rnd) helper.setSignal() LockSupport.unpark(helper) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala index 99d775b2ea..7bae4715de 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala @@ -138,6 +138,20 @@ private final class ScalQueue[A <: AnyRef](threadCount: Int) { a } + /** + * Removes an element from this queue. + * + * @note The implementation delegates to the + * [[java.util.concurrent.ConcurrentLinkedQueue#remove remove]] method. + * + * @note This method runs in linear time relative to the size of the queue, + * which is not ideal and generally should not be used. However, this + * functionality is necessary for the blocking mechanism of the + * [[WorkStealingThreadPool]]. The runtime complexity of this method is + * acceptable for that purpose because threads are limited resources. + * + * @param a the element to be removed + */ def remove(a: A): Unit = { val nq = numQueues var i = 0 diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index e33bc2f9be..51f6997b8e 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -75,6 +75,19 @@ private[effect] final class WorkStealingThreadPool( private[this] val localQueues: Array[LocalQueue] = new Array(threadCount) private[this] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount) + /** + * References to helper threads. Helper threads must have a parking mechanism + * to address the situation where all worker threads are executing blocking + * actions, but the pool has been completely exhausted and the work that + * will unblock the worker threads has not arrived on the pool yet. Obviously, + * the helper threads cannot busy wait in this case, so they need to park and + * await a notification of newly arrived work. This queue also helps with the + * thundering herd problem. Namely, when only a single unit of work arrives + * and needs to be executed by a helper thread because all worker threads are + * blocked, only a single helper thread can be woken up. That thread can wake + * other helper threads in the future as the work available on the pool + * increases. + */ private[this] val helperThreads: ScalQueue[HelperThread] = new ScalQueue(threadCount) /** @@ -224,6 +237,13 @@ private[effect] final class WorkStealingThreadPool( false } + /** + * Potentially unparks a helper thread. + * + * @param random a reference to an uncontended source of randomness, to be + * passed along to the striped concurrent queues when executing + * their operations + */ private[unsafe] def notifyHelper(random: ThreadLocalRandom): Unit = { val helper = helperThreads.poll(random) if (helper ne null) { @@ -349,12 +369,35 @@ private[effect] final class WorkStealingThreadPool( () } + /** + * Enqueues the provided helper thread on the queue of parked helper threads. + * + * @param helper the helper thread to enqueue on the queue of parked threads + * @param random a reference to an uncontended source of randomness, to be + * passed along to the striped concurrent queues when executing + * their operations + */ private[unsafe] def transitionHelperToParked( helper: HelperThread, random: ThreadLocalRandom): Unit = { helperThreads.offer(helper, random) } + /** + * Removes the provided helper thread from the parked helper thread queue. + * + * This method is necessary for the situation when a worker/helper thread has + * finished executing the blocking actions and needs to signal its helper + * thread to end. At that point in time, the helper thread might be parked and + * enqueued. Furthermore, this method signals to other worker and helper + * threads that there could still be some leftover work on the pool and that + * they need to replace the exiting helper thread. + * + * @param helper the helper thread to remove from the queue of parked threads + * @param random a reference to an uncontended source of randomness, to be + * passed along to the striped concurrent queues when executing + * their operations + */ private[unsafe] def removeParkedHelper( helper: HelperThread, random: ThreadLocalRandom): Unit = { diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index cb8df2fd48..bf4140ab23 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -459,6 +459,10 @@ private final class WorkerThread( * * The reason why this code is duplicated, instead of inherited is to keep the * monomorphic callsites in the `IOFiber` runloop. + * + * @note There is no reason to enclose any code in a `try/catch` block because + * the only way this code path can be exercised is through `IO.delay`, + * which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { // Drain the local queue to the `overflow` queue. @@ -496,7 +500,12 @@ private final class WorkerThread( // action. val result = thunk - // Blocking is finished. Time to signal the spawned helper thread. + // Blocking is finished. Time to signal the spawned helper thread and + // unpark it. Furthermore, the thread needs to be removed from the + // parked helper threads queue in the pool so that other threads don't + // mistakenly depend on it to bail them out of blocking situations, and + // of course, this also removes the last strong reference to the fiber, + // which needs to be released for gc purposes. pool.removeParkedHelper(helper, random) helper.setSignal() LockSupport.unpark(helper) From ba3cfe1b497836abb8cd479ddf6c2b5b21693c7a Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Tue, 7 Sep 2021 11:26:59 +0200 Subject: [PATCH 16/16] Add a more detailed explanation of the helper thread signal variable --- .../src/main/scala/cats/effect/unsafe/HelperThread.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index 342c151703..0ca26c802d 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -79,6 +79,13 @@ private final class HelperThread( * Signalling mechanism through which the [[WorkerThread]] which spawned this * [[HelperThread]] signals that it has successfully exited the blocking code * region and that this [[HelperThread]] should finalize. + * + * This atomic integer encodes a state machine with 3 states. + * Value 0: the thread is parked + * Value 1: the thread is unparked and executing fibers + * Value 2: the thread has been signalled to finish up and exit + * + * The thread is spawned in the running state. */ private[this] val signal: AtomicInteger = new AtomicInteger(1)