Skip to content

Commit

Permalink
Merge pull request #3691 from homycdev/evalOnExecutor
Browse files Browse the repository at this point in the history
evalOnExecutor
  • Loading branch information
durban authored Jul 9, 2023
2 parents 86b65a9 + 08ec252 commit 93518d4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
16 changes: 16 additions & 0 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,27 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
*/
def evalOn(ec: ExecutionContext): IO[A] = IO.EvalOn(this, ec)

/**
* Shifts the execution of the current IO to the specified [[java.util.concurrent.Executor]].
*
* @see
* [[evalOn]]
*/
def evalOnExecutor(executor: Executor): IO[A] =
IO.asyncForIO.evalOnExecutor(this, executor)

def startOn(ec: ExecutionContext): IO[FiberIO[A @uncheckedVariance]] = start.evalOn(ec)

def startOnExecutor(executor: Executor): IO[FiberIO[A @uncheckedVariance]] =
IO.asyncForIO.startOnExecutor(this, executor)

def backgroundOn(ec: ExecutionContext): ResourceIO[IO[OutcomeIO[A @uncheckedVariance]]] =
Resource.make(startOn(ec))(_.cancel).map(_.join)

def backgroundOnExecutor(
executor: Executor): ResourceIO[IO[OutcomeIO[A @uncheckedVariance]]] =
IO.asyncForIO.backgroundOnExecutor(this, executor)

/**
* Given an effect which might be [[uncancelable]] and a finalizer, produce an effect which
* can be canceled by running the finalizer. This combinator is useful for handling scenarios
Expand Down
43 changes: 43 additions & 0 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
*/
def evalOn[A](fa: F[A], ec: ExecutionContext): F[A]

/**
* [[Async.evalOn]] with provided [[java.util.concurrent.Executor]]
*/
def evalOnExecutor[A](fa: F[A], executor: Executor): F[A] = {
require(executor != null, "Cannot pass null Executor as an argument")
executor match {
case ec: ExecutionContext =>
evalOn[A](fa, ec: ExecutionContext)
case executor =>
flatMap(executionContext) { refEc =>
val newEc: ExecutionContext =
ExecutionContext.fromExecutor(executor, refEc.reportFailure)
evalOn[A](fa, newEc)
}
}
}

/**
* [[Async.evalOn]] as a natural transformation.
*/
Expand All @@ -174,6 +191,14 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
def apply[A](fa: F[A]): F[A] = evalOn(fa, ec)
}

/**
* [[Async.evalOnExecutor]] as a natural transformation.
*/
def evalOnExecutorK(executor: Executor): F ~> F =
new (F ~> F) {
def apply[A](fa: F[A]): F[A] = evalOnExecutor(fa, executor)
}

/**
* Start a new fiber on a different execution context.
*
Expand All @@ -182,6 +207,14 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
def startOn[A](fa: F[A], ec: ExecutionContext): F[Fiber[F, Throwable, A]] =
evalOn(start(fa), ec)

/**
* Start a new fiber on a different executor.
*
* See [[GenSpawn.start]] for more details.
*/
def startOnExecutor[A](fa: F[A], executor: Executor): F[Fiber[F, Throwable, A]] =
evalOnExecutor(start(fa), executor)

/**
* Start a new background fiber on a different execution context.
*
Expand All @@ -192,6 +225,16 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
ec: ExecutionContext): Resource[F, F[Outcome[F, Throwable, A]]] =
Resource.make(startOn(fa, ec))(_.cancel)(this).map(_.join)

/**
* Start a new background fiber on a different executor.
*
* See [[GenSpawn.background]] for more details.
*/
def backgroundOnExecutor[A](
fa: F[A],
executor: Executor): Resource[F, F[Outcome[F, Throwable, A]]] =
Resource.make(startOnExecutor(fa, executor))(_.cancel)(this).map(_.join)

/**
* Obtain a reference to the current execution context.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package cats.effect.kernel.syntax

import cats.effect.kernel.{Async, Fiber, Outcome, Resource, Sync}
import cats.effect.kernel._

import scala.concurrent.ExecutionContext

import java.util.concurrent.Executor

trait AsyncSyntax {
implicit def asyncOps[F[_], A](wrapped: F[A]): AsyncOps[F, A] =
new AsyncOps(wrapped)
Expand All @@ -30,13 +32,23 @@ final class AsyncOps[F[_], A] private[syntax] (private[syntax] val wrapped: F[A]
def evalOn(ec: ExecutionContext)(implicit F: Async[F]): F[A] =
Async[F].evalOn(wrapped, ec)

def evalOnExecutor(executor: Executor)(implicit F: Async[F]): F[A] =
Async[F].evalOnExecutor(wrapped, executor)

def startOn(ec: ExecutionContext)(implicit F: Async[F]): F[Fiber[F, Throwable, A]] =
Async[F].startOn(wrapped, ec)

def startOnExecutor(executor: Executor)(implicit F: Async[F]): F[Fiber[F, Throwable, A]] =
Async[F].startOnExecutor(wrapped, executor)

def backgroundOn(ec: ExecutionContext)(
implicit F: Async[F]): Resource[F, F[Outcome[F, Throwable, A]]] =
Async[F].backgroundOn(wrapped, ec)

def backgroundOnExecutor(executor: Executor)(
implicit F: Async[F]): Resource[F, F[Outcome[F, Throwable, A]]] =
Async[F].backgroundOnExecutor(wrapped, executor)

def syncStep[G[_]: Sync](limit: Int)(implicit F: Async[F]): G[Either[F[A], A]] =
Async[F].syncStep[G, A](wrapped, limit)
}

0 comments on commit 93518d4

Please sign in to comment.