Skip to content

Commit

Permalink
Merge pull request #595 from rossabaker/1.4.0-backports
Browse files Browse the repository at this point in the history
1.4.0: omnibus bug fix, feature, and scaladoc backport
  • Loading branch information
rossabaker authored Jul 31, 2019
2 parents 609a69b + a947318 commit c30d75b
Show file tree
Hide file tree
Showing 17 changed files with 502 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ benchmarks/results
.java-version
.DS_Store
.metals
.bloop
28 changes: 28 additions & 0 deletions core/js/src/main/scala/cats/effect/internals/BlockerPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package internals

import scala.concurrent.ExecutionContext

private[effect] trait BlockerPlatform {

/** Blocker that delegates to the global execution context. */
lazy val global: Blocker = liftExecutionContext(ExecutionContext.Implicits.global)

def liftExecutionContext(ec: ExecutionContext): Blocker
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package internals

import scala.concurrent.ExecutionContext
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}
import cats.data.NonEmptyList

private[effect] trait BlockerPlatform {

/**
* Creates a blocker that is backed by a cached thread pool.
*/
def apply[F[_]](implicit F: Sync[F]): Resource[F, Blocker] =
fromExecutorService(F.delay(Executors.newCachedThreadPool(new ThreadFactory {
def newThread(r: Runnable) = {
val t = new Thread(r, "cats-effect-blocker")
t.setDaemon(true)
t
}
})))

/**
* Creates a blocker backed by the `ExecutorService` returned by the
* supplied task. The executor service is shut down upon finalization
* of the returned resource.
*
* If there are pending tasks in the thread pool at time the returned
* `Blocker` is finalized, the finalizer fails with a `Blocker.OutstandingTasksAtShutdown`
* exception.
*/
def fromExecutorService[F[_]](makeExecutorService: F[ExecutorService])(implicit F: Sync[F]): Resource[F, Blocker] =
Resource.make(makeExecutorService) { ec =>
val tasks = F.delay {
val tasks = ec.shutdownNow()
val b = List.newBuilder[Runnable]
val itr = tasks.iterator
while (itr.hasNext)
b += itr.next
NonEmptyList.fromList(b.result)
}
F.flatMap(tasks) {
case Some(t) => F.raiseError(new OutstandingTasksAtShutdown(t))
case None => F.unit
}
}.map(es => liftExecutorService(es))

/**
* Creates a blocker that delegates to the supplied executor service.
*/
def liftExecutorService(es: ExecutorService): Blocker =
liftExecutionContext(ExecutionContext.fromExecutorService(es))

/**
* Creates a blocker that delegates to the supplied execution context.
*
* This must not be used with general purpose contexts like
* `scala.concurrent.ExecutionContext.Implicits.global'.
*/
def liftExecutionContext(ec: ExecutionContext): Blocker

/** Thrown if there are tasks queued in the thread pool at the time a `Blocker` is finalized. */
final class OutstandingTasksAtShutdown(val tasks: NonEmptyList[Runnable]) extends IllegalStateException("There were outstanding tasks at time of shutdown of the thread pool")
}
43 changes: 40 additions & 3 deletions core/shared/src/main/scala/cats/effect/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import simulacrum._
import cats.implicits._
import cats.data._
import cats.effect.IO.{Delay, Pure, RaiseError}
import cats.effect.concurrent.{Ref, Deferred}
import cats.effect.concurrent.{Deferred, Ref, Semaphore}
import cats.effect.internals.{Callback, IORunLoop}
import cats.effect.internals.TrampolineEC.immediate

import scala.annotation.implicitNotFound
import scala.concurrent.ExecutionContext
import scala.util.Either
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Either, Failure, Success}

/**
* A monad that can describe asynchronous or synchronous computations
Expand Down Expand Up @@ -276,6 +277,22 @@ object Async {
})
}

def fromFuture[F[_], A](fa: F[Future[A]])(implicit F: Async[F], cs: ContextShift[F]): F[A] =
F.guarantee(fa.flatMap(f => f.value match {
case Some(result) =>
result match {
case Success(a) => F.pure(a)
case Failure(e) => F.raiseError[A](e)
}
case _ =>
F.async[A] { cb =>
f.onComplete(r => cb(r match {
case Success(a) => Right(a)
case Failure(e) => Left(e)
}))(immediate)
}
}))(cs.shift)

/**
* Lifts any `IO` value into any data type implementing [[Async]].
*
Expand Down Expand Up @@ -319,6 +336,26 @@ object Async {
}
}

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
def parTraverseN[T[_]: Traverse, M[_], F[_], A, B](n: Long)(ta: T[A])(f: A => M[B])(implicit M: Async[M], P: Parallel[M, F]): M[T[B]] =
for {
semaphore <- Semaphore.uncancelable(n)(M)
tb <- ta.parTraverse { a =>
semaphore.withPermit(f(a))
}
} yield tb

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_]: Traverse, M[_], F[_], A](n: Long)(tma: T[M[A]])(implicit M: Async[M], P: Parallel[M, F]): M[T[A]] =
for {
semaphore <- Semaphore.uncancelable(n)(M)
mta <- tma.map(semaphore.withPermit).parSequence
} yield mta

/**
* [[Async]] instance built for `cats.data.EitherT` values initialized
* with any `F` data type that also implements `Async`.
Expand Down
58 changes: 58 additions & 0 deletions core/shared/src/main/scala/cats/effect/Blocker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats
package effect

import scala.concurrent.ExecutionContext

import cats.effect.internals.BlockerPlatform

/**
* An execution context that is safe to use for blocking operations.
*
* Used in conjunction with [[ContextShift]], this type allows us to write functions
* that require a special `ExecutionContext` for evaluation, while discouraging the
* use of a shared, general purpose pool (e.g. the global context).
*
* Instances of this class should *not* be passed implicitly.
*/
final class Blocker private (val blockingContext: ExecutionContext) extends AnyVal {

/**
* Like `Sync#delay` but the supplied thunk is evaluated on the blocking
* execution context.
*/
def delay[F[_], A](thunk: => A)(implicit F: Sync[F], cs: ContextShift[F]): F[A] =
blockOn(F.delay(thunk))

/**
* Evaluates the supplied task on the blocking execution context via `blockOn`.
*/
def blockOn[F[_], A](fa: F[A])(implicit cs: ContextShift[F]): F[A] =
cs.evalOn(this.blockingContext)(fa)
}

object Blocker extends BlockerPlatform {

/**
* Creates a blocker that delegates to the supplied execution context.
*
* This must not be used with general purpose contexts like
* `scala.concurrent.ExecutionContext.Implicits.global'.
*/
def liftExecutionContext(ec: ExecutionContext): Blocker = new Blocker(ec)
}
43 changes: 42 additions & 1 deletion core/shared/src/main/scala/cats/effect/Concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package cats
package effect

import cats.data._
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.concurrent.{Deferred, Ref, Semaphore}
import cats.effect.ExitCase.Canceled
import cats.effect.IO.{Delay, Pure, RaiseError}
import cats.effect.internals.Callback.{rightUnit, successUnit}
Expand Down Expand Up @@ -516,6 +516,47 @@ object Concurrent {
CancelableF(k)
}

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
def parTraverseN[T[_]: Traverse, M[_], F[_], A, B](n: Long)(ta: T[A])(f: A => M[B])(implicit M: Concurrent[M], P: Parallel[M, F]): M[T[B]] =
for {
semaphore <- Semaphore(n)(M)
tb <- ta.parTraverse { a =>
semaphore.withPermit(f(a))
}
} yield tb

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_]: Traverse, M[_], F[_], A](n: Long)(tma: T[M[A]])(implicit M: Concurrent[M], P: Parallel[M, F]): M[T[A]] =
for {
semaphore <- Semaphore(n)(M)
mta <- tma.map(semaphore.withPermit).parSequence
} yield mta

/**
* This is the default [[Concurrent.continual]] implementation.
*/
def continual[F[_], A, B](fa: F[A])(f: Either[Throwable, A] => F[B])
(implicit F: Concurrent[F]): F[B] = {
import cats.effect.implicits._
import scala.util.control.NoStackTrace

Deferred.uncancelable[F, Either[Throwable, B]].flatMap { r =>
fa.start.bracket( fiber =>
fiber.join.guaranteeCase {
case ExitCase.Completed | ExitCase.Error(_) =>
(fiber.join.attempt.flatMap(f)).attempt.flatMap(r.complete)
case _ => fiber.cancel >>
r.complete(Left(new Exception("Continual fiber cancelled") with NoStackTrace))
}.attempt
)(_ => r.get.void)
.flatMap(_ => r.get.rethrow)
}
}

/**
* [[Concurrent]] instance built for `cats.data.EitherT` values initialized
* with any `F` data type that also implements `Concurrent`.
Expand Down
5 changes: 3 additions & 2 deletions core/shared/src/main/scala/cats/effect/ContextShift.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ trait ContextShift[F[_]] {
* back to the default execution environment of `F` at the completion of `fa`,
* regardless of success or failure.
*
* The primary use case for this method is executing blocking code on a
* dedicated execution context.
* The primary use case for this method is executing code on a
* specific execution context. To execute blocking code, consider using
* the `blockOn(blocker)` method instead.
*
* @param ec Execution context where the evaluation has to be scheduled
* @param fa Computation to evaluate using `ec`
Expand Down
Loading

0 comments on commit c30d75b

Please sign in to comment.