Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ContextShift for Rerunnable #216

Merged
merged 1 commit into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.catbird.util.effect

import cats.effect.ContextShift
import com.twitter.util.{ Future, FuturePool, Promise }
import io.catbird.util.Rerunnable

import scala.Unit
import java.lang.Runnable
import java.util.concurrent.ExecutorService

import scala.concurrent.{ ExecutionContext, ExecutionContextExecutorService }

/**
* The goal here is to provide an implicit instance for `ContextShift[Rerunnable]`, so you can use libraries like
* `fs2` in a finagle-based application without converting between `Future` and `IO` everywhere.
*
* Usage:
* {{{
* implicit val rerunnableCS: ContextShift[Rerunnable] = RerunnableContextShift.global
* }}}
*/
object RerunnableContextShift {

final def fromExecutionContext(ec: ExecutionContext): ContextShift[Rerunnable] =
new RerunnableContextShift(ec)

final def fromExecutorService(es: ExecutorService): ContextShift[Rerunnable] =
fromExecutionContext(ExecutionContext.fromExecutorService(es))

final def fromExecutionContextExecutorService(eces: ExecutionContextExecutorService): ContextShift[Rerunnable] =
fromExecutorService(eces)

final lazy val global: ContextShift[Rerunnable] =
fromExecutionContext(scala.concurrent.ExecutionContext.global)

/**
* Mirrors the api of `scala.concurrent.ExecutionContext.Implicit.global`.
*
* Usage:
* {{{
* import io.catbird.util.effect.RerunnableContextShift.Implicits.global
* }}}
*/
object Implicits {
final implicit def global: ContextShift[Rerunnable] = RerunnableContextShift.global
}
}

final private[effect] class RerunnableContextShift private (ec: ExecutionContext) extends ContextShift[Rerunnable] {
private final lazy val futurePool = FuturePool.interruptible(ec.asInstanceOf[ExecutionContextExecutorService])

override def shift: Rerunnable[Unit] =
Rerunnable.withFuturePool(futurePool)(()) // This is a bit of a hack, but it will have to do

override def evalOn[A](targetEc: ExecutionContext)(fa: Rerunnable[A]): Rerunnable[A] =
for {
r <- executeOn(targetEc)(fa).liftToTry
_ <- shift
a <- Rerunnable.fromFuture(Future.value(r).lowerFromTry)
} yield a

private def executeOn[A](targetEc: ExecutionContext)(fa: Rerunnable[A]): Rerunnable[A] =
Rerunnable.fromFuture {
val p = Promise[A]()

targetEc.execute(new Runnable {
override def run(): Unit =
fa.run.proxyTo[A](p)
})

p
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.catbird.util.effect

import cats.effect.{ ContextShift, IO, Sync }
import com.twitter.util.{ Await, Future, FuturePool }
import io.catbird.util.Rerunnable
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuite

class RerunnableContextShiftSuite extends FixtureAnyFunSuite with ThreadPoolNamingSupport {

protected final class FixtureParam {
val futurePoolName = "future-pool"
val otherPoolName = "other-pool"
val ioPoolName = "io-pool"

val futurePool = FuturePool.interruptible(newNamedThreadPool(futurePoolName))
val otherPool = newNamedThreadPool(otherPoolName)
val ioPool = newNamedThreadPool(ioPoolName)

def newIO: IO[String] = IO(currentThreadName())

def newFuture: Future[String] = futurePool(currentThreadName())

def newRerunnable: Rerunnable[String] = Rerunnable(currentThreadName())
}

test("ContextShift[Rerunnable].shift shifts to the pool of the instance") { f =>
implicit val cs: ContextShift[Rerunnable] =
RerunnableContextShift.fromExecutionContext(f.ioPool)

val (poolName1, poolName2, poolName3) =
(for {
poolName1 <- Rerunnable.fromFuture(f.newFuture)

_ <- ContextShift[Rerunnable](cs).shift

poolName2 <- Sync[Rerunnable].delay(currentThreadName())

poolName3 <- Rerunnable.fromFuture(f.newFuture)
} yield (poolName1, poolName2, poolName3)).run.await

assert(poolName1 == f.futurePoolName)
assert(poolName2 == f.ioPoolName)
assert(poolName2 == f.ioPoolName)
}

test("ContextShift[Rerunnable].evalOn executes on correct pool and shifts back to previous pool") { f =>
implicit val cs: ContextShift[Rerunnable] =
RerunnableContextShift.fromExecutionContext(f.ioPool)

val (poolName1, poolName2, poolName3) =
(for {
poolName1 <- f.newRerunnable

poolName2 <- ContextShift[Rerunnable].evalOn(f.otherPool)(f.newRerunnable)

poolName3 <- f.newRerunnable
} yield (poolName1, poolName2, poolName3)).run.await

assert(poolName1 == currentThreadName()) // The first rerunnable is not explicitly evaluated on a dedicated pool
assert(poolName2 == f.otherPoolName)
assert(poolName3 == f.ioPoolName)
}

test("ContextShift[Rerunnable].evalOn executes on correct pool and shifts back to future pool") { f =>
implicit val cs: ContextShift[Rerunnable] =
RerunnableContextShift.fromExecutionContext(f.ioPool)

val (poolName1, poolName2, poolName3) =
(for {
poolName1 <- Rerunnable.fromFuture(f.newFuture) // The future was started on a dedicated pool (e.g. netty)

poolName2 <- ContextShift[Rerunnable].evalOn(f.otherPool)(f.newRerunnable)

poolName3 <- Rerunnable.fromFuture(f.newFuture)
} yield (poolName1, poolName2, poolName3)).run.await

assert(poolName1 == f.futurePoolName)
assert(poolName2 == f.otherPoolName)
assert(poolName3 == f.futurePoolName)
}

implicit private class FutureAwaitOps[A](future: Future[A]) {
def await: A = Await.result(future)
}

override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.catbird.util.effect

import java.lang.{ Runnable, Thread }
import java.util.concurrent.{ ExecutorService, Executors, ThreadFactory }
import java.util.concurrent.{ Executors, ThreadFactory }

import scala.concurrent.{ ExecutionContext, ExecutionContextExecutorService }

Expand Down