diff --git a/build.sbt b/build.sbt index 11404c4e77..726a0425ad 100644 --- a/build.sbt +++ b/build.sbt @@ -897,7 +897,7 @@ lazy val tests: CrossProject = crossProject(JSPlatform, JVMPlatform, NativePlatf ) .jvmSettings( Test / fork := true, - Test / javaOptions += s"-Dsbt.classpath=${(Test / fullClasspath).value.map(_.data.getAbsolutePath).mkString(File.pathSeparator)}", + Test / javaOptions += s"-Dsbt.classpath=${(Test / fullClasspath).value.map(_.data.getAbsolutePath).mkString(File.pathSeparator)}" // Test / javaOptions += "-XX:ActiveProcessorCount=2", ) @@ -986,7 +986,16 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform) // adds method to sealed Hotswap ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.std.Hotswap.get"), // #3972, private trait - ProblemFilters.exclude[IncompatibleTemplateDefProblem]("cats.effect.std.Supervisor$State"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "cats.effect.std.Supervisor$State"), + // introduced by #3923 + // Rewrote Dispatcher + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Dispatcher$Mode"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Dispatcher$Mode$"), + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.std.Dispatcher$Mode$Parallel$"), + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.std.Dispatcher$Mode$Sequential$") ) ) .jsSettings( diff --git a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala index c3ca288698..197bd79bbb 100644 --- a/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala +++ b/std/shared/src/main/scala/cats/effect/std/Dispatcher.scala @@ -16,14 +16,27 @@ package cats.effect.std -import cats.effect.kernel.{Async, Outcome, Resource} +import cats.{Applicative, MonadThrow} +import cats.effect.kernel.{ + Async, + Concurrent, + Cont, + Deferred, + MonadCancel, + MonadCancelThrow, + Outcome, + Ref, + Resource, + Spawn, + Sync +} +import cats.effect.kernel.syntax.all._ import cats.effect.std.Dispatcher.parasiticEC import cats.syntax.all._ import scala.annotation.tailrec -import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} +import scala.util.Failure import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} @@ -71,10 +84,13 @@ trait Dispatcher[F[_]] extends DispatcherPlatform[F] { */ def unsafeRunAndForget[A](fa: F[A]): Unit = unsafeToFuture(fa).onComplete { - case Failure(ex) => ex.printStackTrace() + case Failure(ex) => reportFailure(ex) case _ => () }(parasiticEC) + protected def reportFailure(t: Throwable): Unit = + t.printStackTrace() + // package-private because it's just an internal utility which supports specific implementations // anyone who needs this type of thing should use unsafeToFuture and then onComplete private[std] def unsafeRunAsync[A](fa: F[A])(cb: Either[Throwable, A] => Unit): Unit = @@ -91,11 +107,6 @@ object Dispatcher { private[this] val Cpus: Int = Runtime.getRuntime().availableProcessors() - private[this] val Noop: () => Unit = () => () - private[this] val Open: () => Unit = () => () - - private[this] val Completed: Either[Throwable, Unit] = Right(()) - @deprecated( message = "use '.parallel' or '.sequential' instead; the former corresponds to the current semantics of '.apply'", @@ -107,16 +118,14 @@ object Dispatcher { * exits, all active effects will be canceled, and attempts to submit new effects will throw * an exception. */ - def parallel[F[_]: Async]: Resource[F, Dispatcher[F]] = - parallel[F](await = false) + def parallel[F[_]: Async]: Resource[F, Dispatcher[F]] = parallel(false) /** * Create a [[Dispatcher]] that can be used within a resource scope. Once the resource scope * exits, all active effects will be canceled, and attempts to submit new effects will throw * an exception. */ - def sequential[F[_]: Async]: Resource[F, Dispatcher[F]] = - sequential[F](await = false) + def sequential[F[_]: Async]: Resource[F, Dispatcher[F]] = sequential(false) /** * Create a [[Dispatcher]] that can be used within a resource scope. Once the resource scope @@ -154,7 +163,7 @@ object Dispatcher { * - false - cancel the active fibers */ def parallel[F[_]: Async](await: Boolean): Resource[F, Dispatcher[F]] = - apply(Mode.Parallel, await) + impl[F](true, await, true) /** * Create a [[Dispatcher]] that can be used within a resource scope. Once the resource scope @@ -190,240 +199,483 @@ object Dispatcher { * - false - cancel the active fiber */ def sequential[F[_]: Async](await: Boolean): Resource[F, Dispatcher[F]] = - apply(Mode.Sequential, await) - - private[this] def apply[F[_]](mode: Mode, await: Boolean)( - implicit F: Async[F]): Resource[F, Dispatcher[F]] = { - final case class Registration(action: F[Unit], prepareCancel: F[Unit] => Unit) - extends AtomicBoolean(true) - - sealed trait CancelState - case object CancelInit extends CancelState - final case class CanceledNoToken(promise: Promise[Unit]) extends CancelState - final case class CancelToken(cancelToken: () => Future[Unit]) extends CancelState - - val (workers, makeFork) = - mode match { - case Mode.Parallel => - (Cpus, Supervisor[F](await).map(s => s.supervise(_: F[Unit]).map(_.cancel))) - - case Mode.Sequential => - ( - 1, - Resource - .pure[F, F[Unit] => F[F[Unit]]]((_: F[Unit]).as(F.unit).handleError(_ => F.unit))) - } - - for { - fork <- makeFork + impl[F](false, await, false) + + // TODO decide if we want other people to use this + private[std] def sequentialCancelable[F[_]: Async]( + await: Boolean): Resource[F, Dispatcher[F]] = + impl[F](false, await, true) + + /* + * There are three fundamental modes here: sequential, parallel, and sequential-cancelable. There + * is very little overlap in semantics between the three apart from the submission side. The whole thing is split up into + * a submission queue with impure enqueue and cancel functions which is drained by the `Worker` and an + * internal execution protocol which also involves a queue. The `Worker` encapsulates all of the + * race conditions and negotiations with impure code, while the `Executor` manages running the + * tasks with appropriate semantics. In parallel mode, we shard the `Worker`s according to the + * number of CPUs and select a random queue (in impure code) as a target. This reduces contention + * at the cost of ordering, which is not guaranteed in parallel mode. With the sequential modes, there + * is only a single worker. + * + * On the impure side, the queue bit is the easy part: it's just a `UnsafeUnbounded` (queue) which + * accepts Registration(s). It's easiest to think of this a bit like an actor model, where the + * `Worker` is the actor and the enqueue is the send. Whenever we send a unit of work, that + * message has an `AtomicReference` which allows us to back-propagate a cancelation action. That + * cancelation action can be used in impure code by sending it back to us using the Finalizer + * message. There are certain race conditions involved in canceling work on the queue and work + * which is in the process of being taken off the queue, and those race conditions are negotiated + * between the impure code and the `Worker`. + * + * On the pure side, the three different `Executor`s are very distinct. In parallel mode, it's easy: + * we have a separate `Supervisor` which doesn't respawn actions, and we use that supervisor to + * spawn a new fiber for each task unit. Cancelation in this mode is easy: we just cancel the fiber. + * + * Sequential mode is the simplest of all: all work is executed in-place and cannot be canceled. + * The cancelation action in all cases is simply `unit` because the impure submission will not be + * seen until after the work is completed *anyway*, so there's no point in being fancy. + * + * For sequential-cancelable mode, we spawn a *single* executor fiber on the main supervisor (which respawns). + * This fiber is paired with a pure unbounded queue and a shutoff latch. New work is placed on the + * queue, which the fiber takes from in order and executes in-place. If the work self-cancels or + * errors, the executor will be restarted. In the case of external cancelation, we shut off the + * latch (to hold new work), drain the entire work queue into a scratch space, then cancel the + * executor fiber in-place so long as we're sure it's actively working on the target task. Once + * that cancelation completes (which will ultimately restart the executor fiber), we re-fill the + * queue and unlock the latch to allow new work (from the `Worker`). + */ + private[this] def impl[F[_]: Async]( + parallel: Boolean, + await: Boolean, + cancelable: Boolean): Resource[F, Dispatcher[F]] = { + val always = Some((_: Outcome[F, Throwable, _]) => true) + + // the outer supervisor is for the worker fibers + // the inner supervisor is for tasks (if parallel) and finalizers + Supervisor[F](await = await, checkRestart = always) flatMap { supervisor => + // we only need this flag to raise the IllegalStateException after closure (Supervisor can't do it for us) + + val termination = Resource.make(Sync[F].delay(new AtomicBoolean(false)))(doneR => + Sync[F].delay(doneR.set(true))) + + val awaitTermination = Resource.make(Concurrent[F].deferred[Unit])(_.complete(()).void) + + (awaitTermination, termination) flatMapN { (terminationLatch, doneR) => + val executorF = + if (parallel) + Executor.parallel[F](await) + else if (cancelable) + Executor.sequential(supervisor) + else + Resource.pure[F, Executor[F]](Executor.inplace[F]) + + // note this scopes the executors *outside* the workers, meaning the workers shut down first + // I think this is what we want, since it avoids enqueue race conditions + executorF flatMap { executor => + val workerF = Worker[F](executor, terminationLatch) + val workersF = + if (parallel) + workerF.replicateA(Cpus).map(_.toArray) + else + workerF.map(w => Array(w)) + + workersF evalMap { workers => + Async[F].executionContext flatMap { ec => + val launchAll = 0.until(workers.length).toList traverse_ { i => + supervisor.supervise(workers(i).run) + } - latches <- Resource.eval(F delay { - val latches = new Array[AtomicReference[() => Unit]](workers) - var i = 0 - while (i < workers) { - latches(i) = new AtomicReference(Noop) - i += 1 - } - latches - }) - states <- Resource.eval(F delay { - val states = Array.ofDim[AtomicReference[List[Registration]]](workers, workers) - var i = 0 - while (i < workers) { - var j = 0 - while (j < workers) { - states(i)(j) = new AtomicReference(Nil) - j += 1 - } - i += 1 - } - states - }) - ec <- Resource.eval(F.executionContext) - - // supervisor for the main loop, which needs to always restart unless the Supervisor itself is canceled - // critically, inner actions can be canceled without impacting the loop itself - supervisor <- Supervisor[F](await, Some((_: Outcome[F, Throwable, _]) => true)) - - _ <- { - def step( - state: Array[AtomicReference[List[Registration]]], - await: F[Unit], - doneR: AtomicBoolean): F[Unit] = - for { - done <- F.delay(doneR.get()) - regs <- F delay { - val buffer = mutable.ListBuffer.empty[Registration] - var i = 0 - while (i < workers) { - val st = state(i) - if (st.get() ne null) { - val list = if (done) st.getAndSet(null) else st.getAndSet(Nil) - if ((list ne null) && (list ne Nil)) { - buffer ++= list.reverse // FIFO order here is a form of fairness + launchAll.as(new Dispatcher[F] { + def unsafeToFutureCancelable[A](fa: F[A]): (Future[A], () => Future[Unit]) = { + def inner[E](fe: F[E], result: Promise[E], finalizer: Boolean) + : () => Future[Unit] = { + if (doneR.get()) { + throw new IllegalStateException("Dispatcher already closed") + } + + val stateR = new AtomicReference[RegState[F]](RegState.Unstarted) + + // forward atomicity guarantees onto promise completion + val promisory = MonadCancel[F] uncancelable { poll => + // invalidate the cancel action when we're done + val completeState = Sync[F].delay { + stateR.getAndSet(RegState.Completed) match { + case st: RegState.CancelRequested[_] => + // we already have a cancel, must complete it: + st.latch.success(()) + () + + case RegState.Completed => + throw new AssertionError("unexpected Completed state") + + case _ => + () + } + } + poll(fe.guarantee(completeState)).redeemWith( + e => Sync[F].delay(result.failure(e)), + a => Sync[F].delay(result.success(a))) + } + + val worker = + if (parallel) + workers(ThreadLocalRandom.current().nextInt(Cpus)) + else + workers(0) + + if (finalizer) { + worker.queue.unsafeOffer(Registration.Finalizer(promisory.void)) + + // cannot cancel a cancel + () => Future.failed(new UnsupportedOperationException) + } else { + val reg = new Registration.Primary(promisory.void, stateR) + worker.queue.unsafeOffer(reg) + + @tailrec + def cancel(): Future[Unit] = { + stateR.get() match { + case RegState.Unstarted => + val latch = Promise[Unit]() + + reg.action = null.asInstanceOf[F[Unit]] + + if (stateR.compareAndSet( + RegState.Unstarted, + RegState.CancelRequested(latch))) { + latch.future + } else { + cancel() + } + + case r: RegState.Running[_] => + val cancel = r.cancel // indirection needed for Scala 2.12 + + val latch = Promise[Unit]() + val _ = inner(cancel, latch, true) + latch.future + + case r: RegState.CancelRequested[_] => + r.latch.future + + case RegState.Completed => + Future.successful(()) + } + } + + cancel _ + } } - } - i += 1 - } - buffer.toList - } - _ <- - if (regs.isEmpty) { - await - } else { - regs traverse_ { - case r @ Registration(action, prepareCancel) => - val supervise: F[Unit] = - fork(action).flatMap(cancel => F.delay(prepareCancel(cancel))) - - // Check for task cancelation before executing. - F.delay(r.get()).ifM(supervise, F.delay(prepareCancel(F.unit))) + val result = Promise[A]() + (result.future, inner(fa, result, false)) } - } - } yield () - - def dispatcher( - doneR: AtomicBoolean, - latch: AtomicReference[() => Unit], - state: Array[AtomicReference[List[Registration]]]): F[Unit] = { - - val await = - F.async_[Unit] { cb => - if (!latch.compareAndSet(Noop, () => cb(Completed))) { - // state was changed between when we last set the latch and now; complete the callback immediately - cb(Completed) - } - } - - F.delay(latch.set(Noop)) *> // reset latch - // if we're marked as done, yield immediately to give other fibers a chance to shut us down - // we might loop on this a few times since we're marked as done before the supervisor is canceled - F.delay(doneR.get()).ifM(F.cede, step(state, await, doneR)) - } - 0.until(workers).toList traverse_ { n => - Resource.eval(F.delay(new AtomicBoolean(false))) flatMap { doneR => - val latch = latches(n) - val worker = dispatcher(doneR, latch, states(n)) - val release = F.delay(latch.getAndSet(Open)()) - Resource.make(supervisor.supervise(worker)) { _ => - F.delay(doneR.set(true)) *> step(states(n), F.unit, doneR) *> release + override def reportFailure(t: Throwable): Unit = + ec.reportFailure(t) + }) } } } } - } yield { - new Dispatcher[F] { - override def unsafeRunAndForget[A](fa: F[A]): Unit = { - unsafeToFutureCancelable(fa) - ._1 - .onComplete { - case Failure(ex) => ec.reportFailure(ex) - case _ => () - }(parasiticEC) - } + } + } + + private sealed abstract class RegState[+F[_]] extends Product with Serializable - def unsafeToFutureCancelable[E](fe: F[E]): (Future[E], () => Future[Unit]) = { - val promise = Promise[E]() + private object RegState { + case object Unstarted extends RegState[Nothing] + final case class Running[F[_]](cancel: F[Unit]) extends RegState[F] + final case class CancelRequested[F[_]](latch: Promise[Unit]) extends RegState[F] + case object Completed extends RegState[Nothing] + } + + private sealed abstract class Registration[F[_]] - val action = fe - .flatMap(e => F.delay(promise.success(e))) - .handleErrorWith(t => F.delay(promise.failure(t))) - .void + private object Registration { + final class Primary[F[_]](var action: F[Unit], val stateR: AtomicReference[RegState[F]]) + extends Registration[F] - val cancelState = new AtomicReference[CancelState](CancelInit) + final case class Finalizer[F[_]](action: F[Unit]) extends Registration[F] - def registerCancel(token: F[Unit]): Unit = { - val cancelToken = () => unsafeToFuture(token) + final case class PoisonPill[F[_]]() extends Registration[F] + } - @tailrec - def loop(): Unit = { - val state = cancelState.get() - state match { - case CancelInit => - if (!cancelState.compareAndSet(state, CancelToken(cancelToken))) { - loop() + // the signal is just a skolem for the atomic references; we never actually run it + private final class Worker[F[_]: Async]( + val queue: UnsafeAsyncQueue[F, Registration[F]], + supervisor: Supervisor[F], + executor: Executor[F], + terminationLatch: Deferred[F, Unit]) { + + private[this] val doneR = new AtomicBoolean(false) + + def run: F[Unit] = { + val step = queue.take flatMap { + case reg: Registration.Primary[F] => + Sync[F] defer { + reg.stateR.get() match { + case RegState.Unstarted => + val action = reg.action + + if (action == null) { + // this corresponds to a memory race where we see action's write before stateR's + val check = Spawn[F].cede *> Sync[F].delay(reg.stateR.get()) + check.iterateWhile(_ == RegState.Unstarted).flatMap { + case cr @ RegState.CancelRequested(latch) => + Sync[F].delay { + if (reg.stateR.compareAndSet(cr, RegState.Completed)) { + latch.success(()) + () + } else { + val s = reg.stateR.get() + throw new AssertionError(s"d => $s") + } + } + case s => + MonadThrow[F].raiseError[Unit](new AssertionError(s"a => $s")) } - case CanceledNoToken(promise) => - if (!cancelState.compareAndSet(state, CancelToken(cancelToken))) { - loop() - } else { - cancelToken().onComplete { - case Success(_) => promise.success(()) - case Failure(ex) => promise.failure(ex) - }(ec) + } else { + + executor(action) { cancelF => + Sync[F] defer { + if (reg + .stateR + .compareAndSet(RegState.Unstarted, RegState.Running(cancelF))) { + Applicative[F].unit + } else { + reg.stateR.get() match { + case cr @ RegState.CancelRequested(latch) => + if (reg.stateR.compareAndSet(cr, RegState.Running(cancelF))) { + supervisor + .supervise(cancelF.guarantee(Sync[F].delay { + latch.success(()) + () + })) + .void + } else { + reg.stateR.get() match { + case RegState.Completed => + Applicative[F].unit + case s => + throw new AssertionError(s"e => $s") + } + } + + case RegState.Completed => + Applicative[F].unit + + case s => + throw new AssertionError(s"b => $s") + } + } + } } - case _ => () - } - } + } - loop() - } + case s @ (RegState.Running(_) | RegState.Completed) => + throw new AssertionError(s"c => $s") - @tailrec - def enqueue(state: AtomicReference[List[Registration]], reg: Registration): Unit = { - val curr = state.get() - if (curr eq null) { - throw new IllegalStateException("dispatcher already shutdown") - } else { - val next = reg :: curr - if (!state.compareAndSet(curr, next)) enqueue(state, reg) + case RegState.CancelRequested(latch) => + Sync[F].delay(latch.success(())).void } } - val (state, lt) = if (workers > 1) { - val rand = ThreadLocalRandom.current() - val dispatcher = rand.nextInt(workers) - val inner = rand.nextInt(workers) + case Registration.Finalizer(action) => + supervisor.supervise(action).void - (states(dispatcher)(inner), latches(dispatcher)) - } else { - (states(0)(0), latches(0)) - } + case Registration.PoisonPill() => + Sync[F].delay(doneR.set(true)) + } - val reg = Registration(action, registerCancel _) - enqueue(state, reg) + // we're poisoned *first* but our supervisor is killed *last* + // when this happens, we just block on the termination latch to + // avoid weirdness. there's still a small gap even then, so we + // toss in a cede to avoid starvation pathologies + Sync[F].delay(doneR.get()).ifM(terminationLatch.get >> Spawn[F].cede, step >> run) + } + } - if (lt.get() ne Open) { - val f = lt.getAndSet(Open) - f() - } + private object Worker { + + def apply[F[_]: Async]( + executor: Executor[F], + terminationLatch: Deferred[F, Unit]): Resource[F, Worker[F]] = { + // we make a new supervisor just for cancelation actions + Supervisor[F](false) flatMap { supervisor => + val initF = Sync[F].delay( + new Worker[F]( + new UnsafeAsyncQueue[F, Registration[F]](), + supervisor, + executor, + terminationLatch)) + + Resource.make(initF)(w => Sync[F].delay(w.queue.unsafeOffer(Registration.PoisonPill()))) + } + } + } + + private abstract class Executor[F[_]] { + def apply(task: F[Unit])(registerCancel: F[Unit] => F[Unit]): F[Unit] + } + + private object Executor { - val cancel = { () => - reg.lazySet(false) - - @tailrec - def loop(): Future[Unit] = { - val state = cancelState.get() - state match { - case CancelInit => - val promise = Promise[Unit]() - if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) { - loop() - } else { - promise.future + // default sequential executor (ignores cancelation) + def inplace[F[_]: Applicative]: Executor[F] = + new Executor[F] { + def apply(task: F[Unit])(registerCancel: F[Unit] => F[Unit]): F[Unit] = { + // we can use unit as a cancel action here since it must always sequence *after* the task + // thus, the task must complete before the cancel action will be picked up + registerCancel(Applicative[F].unit) *> task + } + } + + // sequential executor which respects cancelation (at the cost of additional overhead); not used + def sequential[F[_]: Concurrent](supervisor: Supervisor[F]): Resource[F, Executor[F]] = { + sealed trait TaskState extends Product with Serializable + + object TaskState { + final case class Ready(task: F[Unit]) extends TaskState + case object Executing extends TaskState + final case class Canceling(latch: Deferred[F, Unit]) extends TaskState + case object Dead extends TaskState + } + + Resource.eval(Queue.unbounded[F, Ref[F, TaskState]]) flatMap { tasks => + // knock it out of the task taking + val evict = Concurrent[F].ref[TaskState](TaskState.Dead).flatMap(tasks.offer(_)) + + Resource.make(Concurrent[F].ref(false))(r => r.set(true) >> evict) evalMap { doneR => + Concurrent[F].ref[Option[Deferred[F, Unit]]](None) flatMap { shutoff => + val step = tasks.take flatMap { taskR => + taskR.getAndSet(TaskState.Executing) flatMap { + case TaskState.Ready(task) => + task guarantee { + taskR.getAndSet(TaskState.Dead) flatMap { + // if we finished during cancelation, we need to catch it before it kills us + case TaskState.Canceling(latch) => latch.complete(()).void + case _ => Applicative[F].unit + } } - case CanceledNoToken(promise) => - promise.future - case CancelToken(cancelToken) => - cancelToken() + + // Executing should be impossible + case TaskState.Executing | TaskState.Canceling(_) | TaskState.Dead => + Applicative[F].unit } } - loop() + lazy val loop: F[Unit] = doneR.get.ifM(Applicative[F].unit, step >> loop) + val spawnExecutor = supervisor.supervise(loop) + + spawnExecutor flatMap { fiber => + Concurrent[F].ref(fiber) map { fiberR => + new Executor[F] { + def apply(task: F[Unit])(registerCancel: F[Unit] => F[Unit]): F[Unit] = { + Concurrent[F].ref[TaskState](TaskState.Ready(task)) flatMap { taskR => + val cancelF = + Concurrent[F].deferred[Unit] flatMap { cancelLatch => + taskR flatModify { + case TaskState.Ready(_) | TaskState.Dead => + (TaskState.Dead, Applicative[F].unit) + + case TaskState.Canceling(cancelLatch) => + (TaskState.Canceling(cancelLatch), cancelLatch.get) + + case TaskState.Executing => + // we won the race for cancelation and it's already executing + val eff = for { + // lock the door + latch <- Concurrent[F].deferred[Unit] + _ <- shutoff.set(Some(latch)) + + // drain the task queue + scratch <- tasks.tryTakeN(None) + + // double check that execution didn't finish while we drained + _ <- cancelLatch.tryGet flatMap { + case Some(_) => + Applicative[F].unit + + case None => + for { + // kill the current executor + _ <- fiberR.get.flatMap(_.cancel) + + // restore all of the tasks + _ <- scratch.traverse_(tasks.offer(_)) + + // start a new fiber + _ <- spawnExecutor.flatMap(fiberR.set(_)) + + // allow everyone else back in + _ <- latch.complete(()) + _ <- shutoff.set(None) + } yield () + } + + _ <- cancelLatch.complete(()) + } yield () + + (TaskState.Canceling(cancelLatch), eff) + } + } + + // in rare cases, this can create mutual ordering issues with quickly enqueued tasks + val optBlock = shutoff.get flatMap { + case Some(latch) => latch.get + case None => Applicative[F].unit + } + + optBlock >> tasks.offer(taskR) >> registerCancel(cancelF) + } + } + } + } + } } - - (promise.future, cancel) } } } + + def parallel[F[_]: Concurrent](await: Boolean): Resource[F, Executor[F]] = + Supervisor[F](await = await) map { supervisor => + new Executor[F] { + def apply(task: F[Unit])(registerCancel: F[Unit] => F[Unit]): F[Unit] = + supervisor.supervise(task).flatMap(fiber => registerCancel(fiber.cancel)) + } + } } - private sealed trait Mode extends Product with Serializable + private val RightUnit: Right[Nothing, Unit] = Right(()) + + // MPSC assumption + private final class UnsafeAsyncQueue[F[_]: Async, A] + extends AtomicReference[Either[Throwable, Unit] => Unit](null) { latchR => + + private[this] val buffer = new UnsafeUnbounded[A]() + + def unsafeOffer(a: A): Unit = { + val _ = buffer.put(a) + val back = latchR.get() + if (back ne null) back(RightUnit) + } + + def take: F[A] = Async[F].cont[Unit, A] { + new Cont[F, Unit, A] { + def apply[G[_]: MonadCancelThrow] = { (k, get, lift) => + val takeG = lift(Sync[F].delay(buffer.take())) + val setLatchG = lift(Sync[F].delay(latchR.set(k))) + val unsetLatchG = lift(Sync[F].delay(latchR.lazySet(null))) + + takeG.handleErrorWith { _ => // emptiness is reported as a FailureSignal error + setLatchG *> (takeG <* unsetLatchG).handleErrorWith { _ => // double-check + get *> unsetLatchG *> lift(take) // recurse + } + } + } + + } + } - private object Mode { - case object Parallel extends Mode - case object Sequential extends Mode } } diff --git a/tests/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala b/tests/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala index 2dd61d8cad..ca10194e11 100644 --- a/tests/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/DispatcherSpec.scala @@ -24,15 +24,17 @@ import cats.syntax.all._ import scala.concurrent.{ExecutionContext, Promise} import scala.concurrent.duration._ +import java.util.concurrent.atomic.AtomicInteger + class DispatcherSpec extends BaseSpec with DetectPlatform { override def executionTimeout = 30.seconds - "sequential dispatcher" should { + "sequential dispatcher (cancelable = false)" should { "await = true" >> { val D = Dispatcher.sequential[IO](await = true) - sequential(D) + sequential(D, false) awaitTermination(D) @@ -41,12 +43,40 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { .replicateA(if (isJS || isNative) 1 else 10000) .as(true) } + + "await work queue drain on shutdown" in real { + val count = 1000 + + IO.ref(0) flatMap { resultsR => + val increments = D use { runner => + IO { + 0.until(count).foreach(_ => runner.unsafeRunAndForget(resultsR.update(_ + 1))) + } + } + + increments *> resultsR.get.flatMap(r => IO(r mustEqual count)) + } + } + + "terminating worker preserves task order" in real { + val count = 10 + + IO.ref(Vector[Int]()) flatMap { resultsR => + val appends = D use { runner => + IO { + 0.until(count).foreach(i => runner.unsafeRunAndForget(resultsR.update(_ :+ i))) + } + } + + appends *> resultsR.get.flatMap(r => IO(r mustEqual 0.until(count).toVector)) + } + } } "await = false" >> { val D = Dispatcher.sequential[IO](await = false) - sequential(D) + sequential(D, false) "cancel all inner effects when canceled" in real { var canceled = false @@ -62,9 +92,43 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { } } - private def sequential(dispatcher: Resource[IO, Dispatcher[IO]]) = { + "sequential dispatcher (cancelable = true)" should { + "await = true" >> { + val D = Dispatcher.sequentialCancelable[IO](await = true) + + sequential(D, true) + + awaitTermination(D) + + "not hang" in real { + D.use(dispatcher => IO(dispatcher.unsafeRunAndForget(IO.unit))) + .replicateA(if (isJS || isNative) 1 else 10000) + .as(true) + } + } + + "await = false" >> { + val D = Dispatcher.sequentialCancelable[IO](await = false) - common(dispatcher) + sequential(D, true) + + "cancel all inner effects when canceled" in real { + var canceled = false + + val body = D use { runner => + IO(runner.unsafeRunAndForget(IO.never.onCancel(IO { canceled = true }))) *> IO.never + } + + val action = body.start.flatMap(f => IO.sleep(500.millis) *> f.cancel) + + TestControl.executeEmbed(action *> IO(canceled must beTrue)) + } + } + } + + private def sequential(dispatcher: Resource[IO, Dispatcher[IO]], cancelable: Boolean) = { + + common(dispatcher, cancelable) "strictly sequentialize multiple IOs" in real { val length = 1000 @@ -86,22 +150,67 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { } yield ok } - "ignore action cancelation" in real { - var canceled = false - - val rec = dispatcher flatMap { runner => - val run = IO { - runner - .unsafeToFutureCancelable(IO.sleep(500.millis).onCancel(IO { canceled = true })) - ._2 + "reject new tasks after release action is submitted as a task" in ticked { + implicit ticker => + val test = dispatcher.allocated.flatMap { + case (runner, release) => + IO(runner.unsafeRunAndForget(release)) *> + IO.sleep(100.millis) *> + IO(runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException]) } - Resource eval { - run.flatMap(ct => IO.sleep(200.millis) >> IO.fromFuture(IO(ct()))) + test.void must completeAs(()) + } + + "invalidate cancelation action of task when complete" in real { + val test = dispatcher use { runner => + for { + latch1 <- IO.deferred[Unit] + latch2 <- IO.deferred[Unit] + latch3 <- IO.deferred[Unit] + + pair <- IO(runner.unsafeToFutureCancelable(IO.unit)) + (_, cancel) = pair + + _ <- IO( + runner.unsafeRunAndForget(latch1.complete(()) *> latch2.get *> latch3.complete(()))) + + _ <- latch1.get + _ <- IO.fromFuture(IO(cancel())) + _ <- latch2.complete(()) + + _ <- latch3.get // this will hang if the test is failing + } yield ok + } + + test.parReplicateA_(1000).as(ok) + } + + "invalidate cancelation action when racing with task" in real { + val test = dispatcher use { runner => + IO.ref(false) flatMap { resultR => + for { + latch1 <- IO.deferred[Unit] + latch2 <- IO.deferred[Unit] + + pair <- IO(runner.unsafeToFutureCancelable(latch1.get)) + (_, cancel) = pair + + _ <- latch1.complete(()) + // the particularly scary case is where the cancel action gets in queue before the next action + f <- IO(cancel()) + + // we're testing to make sure this task runs and isn't canceled + _ <- IO(runner.unsafeRunAndForget(resultR.set(true) *> latch2.complete(()))) + _ <- IO.fromFuture(IO.pure(f)) + _ <- latch2.get + + b <- resultR.get + } yield b } } - TestControl.executeEmbed(rec.use(_ => IO(canceled must beFalse))) + test.flatMap(b => IO(b must beTrue)).parReplicateA_(1000).as(ok) } } @@ -157,7 +266,7 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { private def parallel(dispatcher: Resource[IO, Dispatcher[IO]]) = { - common(dispatcher) + common(dispatcher, true) "run multiple IOs in parallel" in real { val num = 10 @@ -201,37 +310,66 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { } yield ok } - "forward cancelation onto the inner action" in real { - var canceled = false - - val rec = dispatcher flatMap { runner => - val run = IO { - runner.unsafeToFutureCancelable(IO.never.onCancel(IO { canceled = true }))._2 + // https://github.com/typelevel/cats-effect/issues/3898 + "not hang when cancelling" in real { + val test = dispatcher.use { dispatcher => + val action = IO.fromFuture { + IO { + val (_, cancel) = dispatcher.unsafeToFutureCancelable(IO.never) + cancel() + } } - Resource eval { - run.flatMap(ct => IO.sleep(500.millis) >> IO.fromFuture(IO(ct()))) - } + action.replicateA_(if (isJVM) 1000 else 1) } - TestControl.executeEmbed(rec.use(_ => IO(canceled must beTrue))) + if (isJVM) + test.parReplicateA_(100).as(ok) + else + test.as(ok) } - // https://github.com/typelevel/cats-effect/issues/3898 - "not hang when cancelling" in real { - dispatcher.use { dispatcher => - IO.fromFuture { - IO { - val (_, cancel) = dispatcher.unsafeToFutureCancelable(IO.never) - cancel() + "cancelation does not block a worker" in real { + TestControl executeEmbed { + dispatcher use { runner => + (IO.deferred[Unit], IO.deferred[Unit]) flatMapN { (latch1, latch2) => + val task = (latch1.complete(()) *> latch2.get).uncancelable + + IO(runner.unsafeToFutureCancelable(task)._2) flatMap { cancel => + latch1.get *> + IO(cancel()) *> + IO(runner.unsafeRunAndForget(latch2.complete(()))) *> + latch2.get.as(ok) + } } - }.replicateA_(1000) - .as(ok) + } } } + + "cancelation race does not block a worker" in real { + dispatcher + .use { runner => + IO.deferred[Unit] flatMap { latch => + val clogUp = IO { + val task = latch.get.uncancelable + runner.unsafeToFutureCancelable(task)._2 + }.flatMap { cancel => + // cancel concurrently + // We want to trigger race condition where task starts but then discovers it was canceled + IO(cancel()) + } + + clogUp.parReplicateA_(1000) *> + // now try to run a new task + IO.fromFuture(IO(runner.unsafeToFuture(latch.complete(())))) + } + } + .replicateA_(if (isJVM) 1000 else 1) + .as(ok) + } } - private def common(dispatcher: Resource[IO, Dispatcher[IO]]) = { + private def common(dispatcher: Resource[IO, Dispatcher[IO]], cancelable: Boolean) = { "run a synchronous IO" in real { val ioa = IO(1).map(_ + 2) @@ -250,9 +388,8 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { } "run several IOs back to back" in real { - @volatile - var counter = 0 - val increment = IO(counter += 1) + val counter = new AtomicInteger(0) + val increment = IO(counter.getAndIncrement()).void val num = 10 @@ -260,7 +397,7 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { Resource.eval(IO.fromFuture(IO(runner.unsafeToFuture(increment))).replicateA(num).void) } - rec.use(_ => IO(counter mustEqual num)) + rec.use(_ => IO(counter.get() mustEqual num)) } "raise an error on leaked runner" in real { @@ -339,46 +476,24 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { } "reject new tasks while shutting down" in real { - (IO.ref(false), IO.ref(false)) - .flatMapN { (resultR, rogueResultR) => - dispatcher - .allocated - .flatMap { - case (runner, release) => - IO(runner.unsafeRunAndForget( - IO.sleep(1.second).uncancelable.guarantee(resultR.set(true)))) *> - IO.sleep(100.millis) *> - release.both( - IO.sleep(500.nanos) *> - IO(runner.unsafeRunAndForget(rogueResultR.set(true))).attempt - ) - } - .flatMap { - case (_, rogueSubmitResult) => - for { - result <- resultR.get - rogueResult <- rogueResultR.get - _ <- IO(result must beTrue) - _ <- IO(if (rogueResult == false) { - // if the rogue task is not completed then we must have failed to submit it - rogueSubmitResult must beLeft - }) - } yield ok - } - } - .replicateA(5) - } - - "issue 3501: reject new tasks after release action is submitted as a task" in ticked { - implicit ticker => - val test = dispatcher.allocated.flatMap { + val test = (IO.deferred[Unit], IO.deferred[Unit]) flatMapN { (latch1, latch2) => + dispatcher.allocated flatMap { case (runner, release) => - IO(runner.unsafeRunAndForget(release)) *> - IO.sleep(100.millis) *> - IO(runner.unsafeRunAndForget(IO(ko)) must throwAn[IllegalStateException]) + for { + _ <- IO( + runner.unsafeRunAndForget(IO.unit.guarantee(latch1.complete(()) >> latch2.get))) + _ <- latch1.get + + challenge = IO(runner.unsafeRunAndForget(IO.unit)) + .delayBy(500.millis) // gross sleep to make sure we're actually in the release + .guarantee(latch2.complete(()).void) + + _ <- release &> challenge + } yield ko } + } - test.void must completeAs(()) + test.attempt.flatMap(r => IO(r must beLeft)).parReplicateA_(50).as(ok) } "cancel inner awaits when canceled" in ticked { implicit ticker => @@ -387,6 +502,86 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { test must completeAs(()) } + + if (!cancelable) { + "ignore action cancelation" in real { + var canceled = false + + val rec = dispatcher flatMap { runner => + val run = IO { + runner + .unsafeToFutureCancelable(IO.sleep(500.millis).onCancel(IO { canceled = true })) + ._2 + } + + Resource eval { + run.flatMap(ct => IO.sleep(200.millis) >> IO.fromFuture(IO(ct()))) + } + } + + TestControl.executeEmbed(rec.use(_ => IO(canceled must beFalse))) + } + } else { + "forward cancelation onto the inner action" in real { + val test = dispatcher use { runner => + IO.ref(false) flatMap { resultsR => + val action = IO.never.onCancel(resultsR.set(true)) + IO(runner.unsafeToFutureCancelable(action)) flatMap { + case (_, cancel) => + IO.sleep(500.millis) *> IO.fromFuture(IO(cancel())) *> resultsR.get + } + } + } + + TestControl.executeEmbed(test).flatMap(b => IO(b must beTrue)) + } + + "support multiple concurrent cancelations" in real { + dispatcher use { runner => + val count = new AtomicInteger(0) + + for { + latch0 <- IO.deferred[Unit] + latch1 <- IO.deferred[Unit] + latch2 <- IO.deferred[Unit] + + action = (latch0.complete(()) *> IO.never) + .onCancel(latch1.complete(()) *> latch2.get) + + pair <- IO(runner.unsafeToFutureCancelable(action)) + (_, cancel) = pair + + _ <- latch0.get + + ec <- IO.executionContext + cancelAction = IO(cancel().onComplete(_ => count.getAndIncrement())(ec)) + _ <- cancelAction + _ <- cancelAction + _ <- cancelAction + + _ <- latch1.get + _ <- IO.sleep(100.millis) + _ <- IO(count.get() mustEqual 0) + + _ <- latch2.complete(()) + _ <- IO.sleep(100.millis) + _ <- IO(count.get() mustEqual 3) + } yield ok + } + } + + "complete / cancel race" in real { + val tsk = dispatcher.use { dispatcher => + IO.fromFuture(IO { + val (_, cancel) = dispatcher.unsafeToFutureCancelable(IO.unit) + val cancelFut = cancel() + cancelFut + }) + } + + tsk.replicateA_(if (isJVM) 10000 else 1).as(ok) + } + } } private def awaitTermination(dispatcher: Resource[IO, Dispatcher[IO]]) = { @@ -421,11 +616,13 @@ class DispatcherSpec extends BaseSpec with DetectPlatform { "issue #3506: await unsafeRunAndForget" in ticked { implicit ticker => val result = for { - resultR <- IO.ref(false) - _ <- dispatcher.use { runner => IO(runner.unsafeRunAndForget(resultR.set(true))) } - result <- resultR.get - } yield result - result must completeAs(true) + latch <- IO.deferred[Unit] + + repro = (latch.complete(()) >> IO.never).uncancelable + _ <- dispatcher.use(runner => IO(runner.unsafeRunAndForget(repro)) >> latch.get) + } yield () + + result must nonTerminate } "cancel active fibers when an error is produced" in real {