diff --git a/build.sbt b/build.sbt index 793356165f..f6be07f102 100644 --- a/build.sbt +++ b/build.sbt @@ -362,7 +362,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform) // changes to `cats.effect.unsafe` package private code ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.IORuntime.this"), ProblemFilters.exclude[DirectMissingMethodProblem]( - "cats.effect.unsafe.IORuntime.$default$6") + "cats.effect.unsafe.IORuntime.$default$6"), + // introduced by #3182, Address issues with the blocking mechanism of the thread pool + // changes to `cats.effect.unsafe` package private code + ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.unsafe.LocalQueue.drain") ) ) .jvmSettings( diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala index dac2cca23d..0ca26c802d 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/HelperThread.scala @@ -17,10 +17,12 @@ package cats.effect package unsafe +import scala.annotation.tailrec import scala.concurrent.{BlockContext, CanAwait} import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.LockSupport /** * A helper thread which is spawned whenever a blocking action is being executed @@ -77,8 +79,15 @@ private final class HelperThread( * Signalling mechanism through which the [[WorkerThread]] which spawned this * [[HelperThread]] signals that it has successfully exited the blocking code * region and that this [[HelperThread]] should finalize. + * + * This atomic integer encodes a state machine with 3 states. + * Value 0: the thread is parked + * Value 1: the thread is unparked and executing fibers + * Value 2: the thread has been signalled to finish up and exit + * + * The thread is spawned in the running state. */ - private[this] val signal: AtomicBoolean = new AtomicBoolean(false) + private[this] val signal: AtomicInteger = new AtomicInteger(1) /** * A flag which is set whenever a blocking code region is entered. This is @@ -103,7 +112,7 @@ private final class HelperThread( * and die. */ def setSignal(): Unit = { - signal.lazySet(true) + signal.set(2) } /** @@ -113,8 +122,20 @@ private final class HelperThread( * @param fiber the fiber to be scheduled on the `overflow` queue */ def schedule(fiber: IOFiber[_]): Unit = { - overflow.offer(fiber, random) - () + val rnd = random + overflow.offer(fiber, rnd) + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } + } + + /** + * Marks the [[HelperThread]] as eligible for resuming work. + */ + @tailrec + def unpark(): Unit = { + if (signal.get() == 0 && !signal.compareAndSet(0, 1)) + unpark() } /** @@ -146,37 +167,62 @@ private final class HelperThread( random = ThreadLocalRandom.current() val rnd = random + def parkLoop(): Unit = { + var cont = true + while (cont && !isInterrupted()) { + // Park the thread until further notice. + LockSupport.park(pool) + + // Spurious wakeup check. + cont = signal.get() == 0 + } + } + // Check for exit condition. Do not continue if the `WorkStealingPool` has // been shut down, or the `WorkerThread` which spawned this `HelperThread` // has finished blocking. - while (!isInterrupted() && !signal.get()) { - val fiber = overflow.poll(rnd) - if (fiber eq null) { - // Fall back to checking the batched queue. - val batch = batched.poll(rnd) - if (batch eq null) { - // There are no more fibers neither in the overflow queue, nor in the - // batched queue. Since the queues are not a blocking queue, there is - // no point in busy waiting, especially since there is no guarantee - // that the `WorkerThread` which spawned this `HelperThread` will ever - // exit the blocking region, and new external work may never arrive on - // the `overflow` queue. This pathological case is not handled as it - // is a case of uncontrolled blocking on a fixed thread pool, an - // inherently careless and unsafe situation. - return - } else { - overflow.offerAll(batch, rnd) + while (!isInterrupted() && signal.get() != 2) { + // Check the batched queue. + val batch = batched.poll(rnd) + if (batch ne null) { + overflow.offerAll(batch, rnd) + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) } - } else { + } + + val fiber = overflow.poll(rnd) + if (fiber ne null) { fiber.run() + } else if (signal.compareAndSet(1, 0)) { + // There are currently no more fibers available on the overflow or + // batched queues. However, the thread that originally started this + // helper thread has not been unblocked. The fibers that will + // eventually unblock that original thread might not have arrived on + // the pool yet. The helper thread should suspend and await for a + // notification of incoming work. + pool.transitionHelperToParked(this, rnd) + pool.notifyIfWorkPending(rnd) + parkLoop() } } } /** * A mechanism for executing support code before executing a blocking action. + * + * @note There is no reason to enclose any code in a `try/catch` block because + * the only way this code path can be exercised is through `IO.delay`, + * which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // Try waking up a `WorkerThread` to handle fibers from the overflow and + // batched queues. + val rnd = random + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } + if (blocking) { // This `HelperThread` is already inside an enclosing blocking region. // There is no need to spawn another `HelperThread`. Instead, directly @@ -198,26 +244,15 @@ private final class HelperThread( // action. val result = thunk - // Blocking is finished. Time to signal the spawned helper thread. + // Blocking is finished. Time to signal the spawned helper thread and + // unpark it. Furthermore, the thread needs to be removed from the + // parked helper threads queue in the pool so that other threads don't + // mistakenly depend on it to bail them out of blocking situations, and + // of course, this also removes the last strong reference to the fiber, + // which needs to be released for gc purposes. + pool.removeParkedHelper(helper, rnd) helper.setSignal() - - // Do not proceed until the helper thread has fully died. This is terrible - // for performance, but it is justified in this case as the stability of - // the `WorkStealingThreadPool` is of utmost importance in the face of - // blocking, which in itself is **not** what the pool is optimized for. - // In practice however, unless looking at a completely pathological case - // of propagating blocking actions on every spawned helper thread, this is - // not an issue, as the `HelperThread`s are all executing `IOFiber[_]` - // instances, which mostly consist of non-blocking code. - try helper.join() - catch { - case _: InterruptedException => - // Propagate interruption to the helper thread. - Thread.interrupted() - helper.interrupt() - helper.join() - this.interrupt() - } + LockSupport.unpark(helper) // Logically exit the blocking region. blocking = false diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala index 743d52cbaf..fb6fffb219 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala @@ -137,6 +137,7 @@ import java.util.concurrent.atomic.AtomicInteger */ private final class LocalQueue { + import LocalQueue._ import LocalQueueConstants._ /** @@ -739,10 +740,10 @@ private final class LocalQueue { * * @note Can '''only''' be correctly called by the owner [[WorkerThread]]. * - * @param dst the destination array in which all remaining fibers are - * transferred + * @return the destination array which contains all of the fibers previously + * enqueued on the local queue */ - def drain(dst: Array[IOFiber[_]]): Unit = { + def drain(): Array[IOFiber[_]] = { // A plain, unsynchronized load of the tail of the local queue. val tl = tail @@ -758,7 +759,7 @@ private final class LocalQueue { if (tl == real) { // The tail and the "real" value of the head are equal. The queue is // empty. There is nothing more to be done. - return + return EmptyDrain } // Make sure to preserve the "steal" tag in the presence of a concurrent @@ -771,6 +772,7 @@ private final class LocalQueue { // secured. Proceed to null out the references to the fibers and // transfer them to the destination list. val n = unsignedShortSubtraction(tl, real) + val dst = new Array[IOFiber[_]](n) var i = 0 while (i < n) { val idx = index(real + i) @@ -781,9 +783,12 @@ private final class LocalQueue { } // The fibers have been transferred. Break out of the loop. - return + return dst } } + + // Unreachable code. + EmptyDrain } /** @@ -1051,3 +1056,7 @@ private final class LocalQueue { */ def getTailTag(): Int = tailPublisher.get() } + +private object LocalQueue { + private[LocalQueue] val EmptyDrain: Array[IOFiber[_]] = new Array(0) +} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala index dfb9e6e740..7bae4715de 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala @@ -110,10 +110,8 @@ private final class ScalQueue[A <: AnyRef](threadCount: Int) { var i = 0 while (i < len) { val fiber = as(i) - if (fiber ne null) { - val idx = random.nextInt(nq) - queues(idx).offer(fiber) - } + val idx = random.nextInt(nq) + queues(idx).offer(fiber) i += 1 } } @@ -140,6 +138,31 @@ private final class ScalQueue[A <: AnyRef](threadCount: Int) { a } + /** + * Removes an element from this queue. + * + * @note The implementation delegates to the + * [[java.util.concurrent.ConcurrentLinkedQueue#remove remove]] method. + * + * @note This method runs in linear time relative to the size of the queue, + * which is not ideal and generally should not be used. However, this + * functionality is necessary for the blocking mechanism of the + * [[WorkStealingThreadPool]]. The runtime complexity of this method is + * acceptable for that purpose because threads are limited resources. + * + * @param a the element to be removed + */ + def remove(a: A): Unit = { + val nq = numQueues + var i = 0 + var done = false + + while (!done && i < nq) { + done = queues(i).remove(a) + i += 1 + } + } + /** * Checks if this Scal queue is empty. * diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 5ac09a05ab..51f6997b8e 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -75,6 +75,21 @@ private[effect] final class WorkStealingThreadPool( private[this] val localQueues: Array[LocalQueue] = new Array(threadCount) private[this] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount) + /** + * References to helper threads. Helper threads must have a parking mechanism + * to address the situation where all worker threads are executing blocking + * actions, but the pool has been completely exhausted and the work that + * will unblock the worker threads has not arrived on the pool yet. Obviously, + * the helper threads cannot busy wait in this case, so they need to park and + * await a notification of newly arrived work. This queue also helps with the + * thundering herd problem. Namely, when only a single unit of work arrives + * and needs to be executed by a helper thread because all worker threads are + * blocked, only a single helper thread can be woken up. That thread can wake + * other helper threads in the future as the work available on the pool + * increases. + */ + private[this] val helperThreads: ScalQueue[HelperThread] = new ScalQueue(threadCount) + /** * The batched queue on which spillover work from other local queues can end * up. @@ -222,6 +237,21 @@ private[effect] final class WorkStealingThreadPool( false } + /** + * Potentially unparks a helper thread. + * + * @param random a reference to an uncontended source of randomness, to be + * passed along to the striped concurrent queues when executing + * their operations + */ + private[unsafe] def notifyHelper(random: ThreadLocalRandom): Unit = { + val helper = helperThreads.poll(random) + if (helper ne null) { + helper.unpark() + LockSupport.unpark(helper) + } + } + /** * Checks the number of active and searching worker threads and decides * whether another thread should be notified of new work. @@ -258,15 +288,15 @@ private[effect] final class WorkStealingThreadPool( // If no work was found in the local queues of the worker threads, look for // work in the batched queue. - if (batchedQueue.nonEmpty()) { - notifyParked(random) + if (batchedQueue.nonEmpty() && !notifyParked(random)) { + notifyHelper(random) + return } // If no work was found in the local queues of the worker threads or in the // batched queue, look for work in the external queue. - if (overflowQueue.nonEmpty()) { - notifyParked(random) - () + if (overflowQueue.nonEmpty() && !notifyParked(random)) { + notifyHelper(random) } } @@ -339,6 +369,44 @@ private[effect] final class WorkStealingThreadPool( () } + /** + * Enqueues the provided helper thread on the queue of parked helper threads. + * + * @param helper the helper thread to enqueue on the queue of parked threads + * @param random a reference to an uncontended source of randomness, to be + * passed along to the striped concurrent queues when executing + * their operations + */ + private[unsafe] def transitionHelperToParked( + helper: HelperThread, + random: ThreadLocalRandom): Unit = { + helperThreads.offer(helper, random) + } + + /** + * Removes the provided helper thread from the parked helper thread queue. + * + * This method is necessary for the situation when a worker/helper thread has + * finished executing the blocking actions and needs to signal its helper + * thread to end. At that point in time, the helper thread might be parked and + * enqueued. Furthermore, this method signals to other worker and helper + * threads that there could still be some leftover work on the pool and that + * they need to replace the exiting helper thread. + * + * @param helper the helper thread to remove from the queue of parked threads + * @param random a reference to an uncontended source of randomness, to be + * passed along to the striped concurrent queues when executing + * their operations + */ + private[unsafe] def removeParkedHelper( + helper: HelperThread, + random: ThreadLocalRandom): Unit = { + helperThreads.remove(helper) + if (!notifyParked(random)) { + notifyHelper(random) + } + } + /** * Schedules a fiber on this thread pool. * @@ -420,8 +488,9 @@ private[effect] final class WorkStealingThreadPool( private[this] def scheduleExternal(fiber: IOFiber[_]): Unit = { val random = ThreadLocalRandom.current() overflowQueue.offer(fiber, random) - notifyParked(random) - () + if (!notifyParked(random)) { + notifyHelper(random) + } } /** diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 663a7f1402..bf4140ab23 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -56,7 +56,6 @@ private final class WorkerThread( extends Thread with BlockContext { - import LocalQueueConstants._ import WorkStealingThreadPoolConstants._ /** @@ -132,12 +131,16 @@ private final class WorkerThread( * matches the reference of the pool provided when this [[WorkerThread]] was * constructed. * + * @note When blocking code is being executed on this worker thread, it is + * important to delegate all scheduling operation to the overflow queue + * from which all [[HelperThread]] instances operate. + * * @param threadPool a work stealing thread pool reference * @return `true` if this worker thread is owned by the provided work stealing * thread pool, `false` otherwise */ def isOwnedBy(threadPool: WorkStealingThreadPool): Boolean = - pool eq threadPool + (pool eq threadPool) && !blocking /** * The run loop of the [[WorkerThread]]. @@ -271,6 +274,7 @@ private final class WorkerThread( // Enqueue the batch at the back of the local queue and execute // the first fiber. val fiber = queue.enqueueBatch(batch) + pool.notifyParked(random) fiber.run() } fairness = 2 @@ -443,7 +447,7 @@ private final class WorkerThread( * A mechanism for executing support code before executing a blocking action. * * This is a slightly more involved implementation of the support code in - * anticipation of running blocking code, also implemented in [[WorkerThread]]. + * anticipation of running blocking code, also implemented in [[HelperThread]]. * * For a more detailed discussion on the design principles behind the support * for running blocking actions on the [[WorkStealingThreadPool]], check the @@ -455,85 +459,62 @@ private final class WorkerThread( * * The reason why this code is duplicated, instead of inherited is to keep the * monomorphic callsites in the `IOFiber` runloop. + * + * @note There is no reason to enclose any code in a `try/catch` block because + * the only way this code path can be exercised is through `IO.delay`, + * which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - // Try waking up a `WorkerThread` to steal fibers from the `LocalQueue` of - // this thread. + // Drain the local queue to the `overflow` queue. val rnd = random - if (pool.notifyParked(rnd)) { - // Successfully woke up another `WorkerThread` to help out with the - // anticipated blocking. Even if this thread ends up being blocked for - // some time, the other worker would be able to steal its fibers. - - if (blocking) { - // This `WorkerThread` is already inside an enclosing blocking region. - thunk - } else { - // Logically enter the blocking region. - blocking = true - - val result = thunk + val drain = queue.drain() + overflow.offerAll(drain, rnd) + val cedeFiber = cedeBypass + if (cedeFiber ne null) { + cedeBypass = null + overflow.offer(cedeFiber, rnd) + } - // Logically exit the blocking region. - blocking = false + if (!pool.notifyParked(rnd)) { + pool.notifyHelper(rnd) + } - // Return the computed result from the blocking operation. - result - } + if (blocking) { + // This `WorkerThread` is already inside an enclosing blocking region. + // There is no need to spawn another `HelperThread`. Instead, directly + // execute the blocking action. + thunk } else { - // Drain the local queue to the `overflow` queue. - val drain = new Array[IOFiber[_]](LocalQueueCapacity) - queue.drain(drain) - overflow.offerAll(drain, random) - - if (blocking) { - // This `WorkerThread` is already inside an enclosing blocking region. - // There is no need to spawn another `HelperThread`. Instead, directly - // execute the blocking action. - thunk - } else { - // Spawn a new `HelperThread` to take the place of this thread, as the - // current thread prepares to execute a blocking action. - - // Logically enter the blocking region. - blocking = true - - // Spawn a new `HelperThread`. - val helper = - new HelperThread(threadPrefix, blockingThreadCounter, batched, overflow, pool) - helper.start() - - // With another `HelperThread` started, it is time to execute the blocking - // action. - val result = thunk - - // Blocking is finished. Time to signal the spawned helper thread. - helper.setSignal() - - // Do not proceed until the helper thread has fully died. This is terrible - // for performance, but it is justified in this case as the stability of - // the `WorkStealingThreadPool` is of utmost importance in the face of - // blocking, which in itself is **not** what the pool is optimized for. - // In practice however, unless looking at a completely pathological case - // of propagating blocking actions on every spawned helper thread, this is - // not an issue, as the `HelperThread`s are all executing `IOFiber[_]` - // instances, which mostly consist of non-blocking code. - try helper.join() - catch { - case _: InterruptedException => - // Propagate interruption to the helper thread. - Thread.interrupted() - helper.interrupt() - helper.join() - this.interrupt() - } - - // Logically exit the blocking region. - blocking = false - - // Return the computed result from the blocking operation - result - } + // Spawn a new `HelperThread` to take the place of this thread, as the + // current thread prepares to execute a blocking action. + + // Logically enter the blocking region. + blocking = true + + // Spawn a new `HelperThread`. + val helper = + new HelperThread(threadPrefix, blockingThreadCounter, batched, overflow, pool) + helper.start() + + // With another `HelperThread` started, it is time to execute the blocking + // action. + val result = thunk + + // Blocking is finished. Time to signal the spawned helper thread and + // unpark it. Furthermore, the thread needs to be removed from the + // parked helper threads queue in the pool so that other threads don't + // mistakenly depend on it to bail them out of blocking situations, and + // of course, this also removes the last strong reference to the fiber, + // which needs to be released for gc purposes. + pool.removeParkedHelper(helper, random) + helper.setSignal() + LockSupport.unpark(helper) + + // Logically exit the blocking region. + blocking = false + + // Return the computed result from the blocking operation + result } } } diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala new file mode 100644 index 0000000000..946f1e5c83 --- /dev/null +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/BlockingStressSpec.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.syntax.traverse._ + +import scala.concurrent.{blocking, Await, Promise} +import scala.concurrent.duration._ +import scala.util.Random + +import java.util.concurrent.CountDownLatch + +class BlockingStressSpec extends BaseSpec { + + override def executionTimeout: FiniteDuration = 30.seconds + + // This test spawns a lot of helper threads. + private val count = 1000 + + "Blocking" should { + "work properly with many blocking actions and helper threads" in realWithRuntime { rt => + def io(latch: CountDownLatch) = for { + p <- IO(Promise[Unit]()) + d1 <- IO(Random.nextInt(50)) + d2 <- IO(Random.nextInt(100)) + _ <- (IO.sleep(d1.millis) *> IO( + rt.scheduler.sleep(d2.millis, () => p.success(())))).start + _ <- IO(Await.result(p.future, Duration.Inf)) + _ <- IO(blocking(latch.countDown())) + } yield () + + for { + latch <- IO(new CountDownLatch(count)) + _ <- List.fill(count)(io(latch).start.void).sequence.void + _ <- IO(blocking(latch.await())) + res <- IO(ok) + } yield res + } + } +} diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala new file mode 100644 index 0000000000..73ecbe6beb --- /dev/null +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/HelperThreadParkSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.syntax.all._ + +import scala.concurrent.{Await, Promise} +import scala.concurrent.duration._ + +class HelperThreadParkSpec extends BaseSpec { + + "HelperThread" should { + "not give up when fibers are late" in real { + def smallRuntime(): IORuntime = { + lazy val rt: IORuntime = { + val (blocking, blockDown) = + IORuntime.createDefaultBlockingExecutionContext(threadPrefix = + s"io-blocking-${getClass.getName}") + val (scheduler, schedDown) = + IORuntime.createDefaultScheduler(threadPrefix = s"io-scheduler-${getClass.getName}") + val (compute, compDown) = + IORuntime.createDefaultComputeThreadPool( + rt, + threadPrefix = s"io-compute-${getClass.getName}", + threads = 2) + + IORuntime( + compute, + blocking, + scheduler, + { () => + compDown() + blockDown() + schedDown() + }, + IORuntimeConfig() + ) + } + + rt + } + + Resource.make(IO(smallRuntime()))(rt => IO(rt.shutdown())).use { rt => + val io = for { + p <- IO(Promise[Unit]()) + _ <- (IO.sleep(50.millis) *> IO( + rt.scheduler.sleep(100.millis, () => p.success(())))).start + _ <- IO(Await.result(p.future, Duration.Inf)) + } yield () + + List + .fill(10)(io.start) + .sequence + .flatMap(_.traverse(_.joinWithNever)) + .evalOn(rt.compute) >> IO(ok) + } + } + } +}