Skip to content

Commit

Permalink
Merge pull request #3532 from kamilkloch/io-implicits
Browse files Browse the repository at this point in the history
Embed `cats.syntax.all._` into `IO`.
  • Loading branch information
djspiewak authored Mar 5, 2024
2 parents 776528c + 7cc6d71 commit df63f8b
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cats.effect.benchmarks
import cats.effect.IO
import cats.effect.std._
import cats.effect.unsafe.implicits.global
import cats.syntax.all._

import org.openjdk.jmh.annotations._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cats.effect.benchmarks
import cats.effect.IO
import cats.effect.std._
import cats.effect.unsafe.implicits.global
import cats.syntax.all._

import org.openjdk.jmh.annotations._

Expand Down
142 changes: 127 additions & 15 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import cats.{
Id,
Monad,
Monoid,
NonEmptyParallel,
Now,
Parallel,
Semigroup,
Expand All @@ -42,16 +43,11 @@ import cats.effect.kernel.GenTemporal.handleDuration
import cats.effect.std.{Backpressure, Console, Env, Supervisor, UUIDGen}
import cats.effect.tracing.{Tracing, TracingEvent}
import cats.effect.unsafe.IORuntime
import cats.syntax._
import cats.syntax.all._

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{
CancellationException,
ExecutionContext,
Future,
Promise,
TimeoutException
}
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -164,6 +160,15 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def <&[B](that: IO[B]): IO[A] =
both(that).map { case (a, _) => a }

/**
* Transform certain errors using `pf` and rethrow them. Non matching errors and successful
* values are not affected by this function.
*
* Implements `ApplicativeError.adaptError`.
*/
def adaptError[E](pf: PartialFunction[Throwable, Throwable]): IO[A] =
recoverWith(pf.andThen(IO.raiseError[A] _))

/**
* Replaces the result of this IO with the given value.
*/
Expand All @@ -188,6 +193,15 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def attempt: IO[Either[Throwable, A]] =
IO.Attempt(this)

/**
* Reifies the value or error of the source and performs an effect on the result, then
* recovers the original value or error back into `IO`.
*
* Implements `MonadError.attemptTap`.
*/
def attemptTap[B](f: Either[Throwable, A] => IO[B]): IO[A] =
attempt.flatTap(f).rethrow

/**
* Replaces failures in this IO with an empty Option.
*/
Expand Down Expand Up @@ -577,6 +591,36 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def onError(f: Throwable => IO[Unit]): IO[A] =
handleErrorWith(t => f(t).voidError *> IO.raiseError(t))

/**
* Like `Parallel.parProductL`
*/
def parProductL[B](iob: IO[B])(implicit P: NonEmptyParallel[IO]): IO[A] =
P.parProductL[A, B](this)(iob)

/**
* Like `Parallel.parProductR`
*/
def parProductR[B](iob: IO[B])(implicit P: NonEmptyParallel[IO]): IO[B] =
P.parProductR[A, B](this)(iob)

/**
* Like `Parallel.parProduct`
*/
def parProduct[B](iob: IO[B])(implicit P: NonEmptyParallel[IO]): IO[(A, B)] =
Parallel.parProduct(this, iob)(P)

/**
* Like `Parallel.parReplicateA`
*/
def parReplicateA(n: Int): IO[List[A]] =
List.fill(n)(this).parSequence

/**
* Like `Parallel.parReplicateA_`
*/
def parReplicateA_(n: Int): IO[Unit] =
List.fill(n)(this).parSequence_

def race[B](that: IO[B]): IO[Either[A, B]] =
IO.race(this, that)

Expand Down Expand Up @@ -1109,7 +1153,45 @@ private[effect] trait IOLowPriorityImplicits {
}
}

object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
object IO extends IOCompanionPlatform with IOLowPriorityImplicits with TupleParallelSyntax {

implicit final def catsSyntaxParallelSequence1[T[_], A](
toia: T[IO[A]]): ParallelSequenceOps1[T, IO, A] = new ParallelSequenceOps1(toia)

This comment has been minimized.

Copy link
@SimY4

SimY4 Mar 6, 2024

nit: typo toia -> tioa


implicit final def catsSyntaxParallelSequence_[T[_], A](
tioa: T[IO[A]]): ParallelSequence_Ops[T, IO, A] =
new ParallelSequence_Ops(tioa)

implicit final def catsSyntaxParallelUnorderedSequence[T[_], A](
tioa: T[IO[A]]): ParallelUnorderedSequenceOps[T, IO, A] =
new ParallelUnorderedSequenceOps(tioa)

implicit final def catsSyntaxParallelFlatSequence1[T[_], A](
tioa: T[IO[T[A]]]): ParallelFlatSequenceOps1[T, IO, A] =
new ParallelFlatSequenceOps1(tioa)

implicit final def catsSyntaxParallelUnorderedFlatSequence[T[_], A](
tiota: T[IO[T[A]]]): ParallelUnorderedFlatSequenceOps[T, IO, A] =
new ParallelUnorderedFlatSequenceOps(tiota)

implicit final def catsSyntaxParallelSequenceFilter[T[_], A](
x: T[IO[Option[A]]]): ParallelSequenceFilterOps[T, IO, A] =
new ParallelSequenceFilterOps(x)

implicit class IOFlatSequenceOps[T[_], A](tiota: T[IO[T[A]]]) {
def flatSequence(
implicit T: Traverse[T],
G: Applicative[IO],
F: cats.FlatMap[T]): IO[T[A]] = {
tiota.sequence(T, G).map(F.flatten)
}
}

implicit class IOSequenceOps[T[_], A](tioa: T[IO[A]]) {
def sequence(implicit T: Traverse[T], G: Applicative[IO]): IO[T[A]] = T.sequence(tioa)(G)

def sequence_(implicit F: Foldable[T], G: Applicative[IO]): IO[Unit] = F.sequence_(tioa)(G)
}

@static private[this] val _alignForIO = new IOAlign
@static private[this] val _asyncForIO: kernel.Async[IO] = new IOAsync
Expand Down Expand Up @@ -1420,6 +1502,18 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
*/
def some[A](a: A): IO[Option[A]] = pure(Some(a))

/**
* Like `Parallel.parTraverse`
*/
def parTraverse[T[_]: Traverse, A, B](ta: T[A])(f: A => IO[B]): IO[T[B]] =
ta.parTraverse(f)

/**
* Like `Parallel.parTraverse_`
*/
def parTraverse_[T[_]: Foldable, A, B](ta: T[A])(f: A => IO[B]): IO[Unit] =
ta.parTraverse_(f)

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
Expand All @@ -1433,22 +1527,34 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
_asyncForIO.parTraverseN_(n)(ta)(f)

/**
* Like `Parallel.parSequence_`, but limits the degree of parallelism.
* Like `Parallel.parSequence`
*/
def parSequenceN_[T[_]: Foldable, A](n: Int)(tma: T[IO[A]]): IO[Unit] =
_asyncForIO.parSequenceN_(n)(tma)
def parSequence[T[_]: Traverse, A](tioa: T[IO[A]]): IO[T[A]] =
tioa.parSequence

/**
* Like `Parallel.parSequence_`
*/
def parSequence_[T[_]: Foldable, A](tioa: T[IO[A]]): IO[Unit] =
tioa.parSequence_

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_]: Traverse, A](n: Int)(tma: T[IO[A]]): IO[T[A]] =
_asyncForIO.parSequenceN(n)(tma)
def parSequenceN[T[_]: Traverse, A](n: Int)(tioa: T[IO[A]]): IO[T[A]] =
_asyncForIO.parSequenceN(n)(tioa)

/**
* Like `Parallel.parSequence_`, but limits the degree of parallelism.
*/
def parSequenceN_[T[_]: Foldable, A](n: Int)(tma: T[IO[A]]): IO[Unit] =
_asyncForIO.parSequenceN_(n)(tma)

/**
* Like `Parallel.parReplicateA`, but limits the degree of parallelism.
*/
def parReplicateAN[A](n: Int)(replicas: Int, ma: IO[A]): IO[List[A]] =
_asyncForIO.parReplicateAN(n)(replicas, ma)
def parReplicateAN[A](n: Int)(replicas: Int, ioa: IO[A]): IO[List[A]] =
_asyncForIO.parReplicateAN(n)(replicas, ioa)

/**
* Lifts a pure value into `IO`.
Expand Down Expand Up @@ -1519,6 +1625,12 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def trace: IO[Trace] =
IOTrace

def traverse[T[_]: Traverse, A, B](ta: T[A])(f: A => IO[B]): IO[T[B]] =
ta.traverse(f)(_asyncForIO)

def traverse_[T[_]: Foldable, A, B](ta: T[A])(f: A => IO[B]): IO[Unit] =
ta.traverse_(f)(_asyncForIO)

private[effect] def runtime: IO[IORuntime] = ReadRT

def pollers: IO[List[Any]] =
Expand Down
1 change: 0 additions & 1 deletion tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package cats.effect

import cats.effect.unsafe.IORuntime
import cats.syntax.all._

import scala.concurrent.duration._

Expand Down
53 changes: 53 additions & 0 deletions tests/shared/src/test/scala-2.13+/cats/effect/IOImplicitSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

class IOImplicitSpec extends BaseSpec {

"Can resolve IO sequence ops without import of cats.syntax.all" in { // compilation test
for {
_ <- List(IO(1)).sequence_
_ <- Option(IO(1)).sequence
_ <- Option(IO(1)).sequence_
_ <- List(IO(List(1))).flatSequence
} yield ()
true
}

"Can resolve IO.Par ops without import of cats.syntax.all" in { // compilation test
for {
_ <- Option(IO(1)).parSequence
_ <- Option(IO(1)).parSequence_
_ <- IO(1).parReplicateA(2)
_ <- IO(1).parReplicateA_(2)
_ <- IO(1).parProduct(IO(2))
_ <- IO(1).parProductL(IO(2))
_ <- IO(1).parProductR(IO(2))
_ <- List(IO(Option(1))).parSequenceFilter
_ <- List(IO(1)).parUnorderedSequence
_ <- List(IO(List(1))).parFlatSequence
_ <- List(IO(List(1))).parUnorderedFlatSequence
_ <- (IO(1), IO(2)).parMapN(_ + _)
_ <- (IO(1), IO(2)).parTupled
_ <- (IO(1), IO(2)).parFlatMapN { case (x, y) => IO.pure(x + y) }
_ <- (IO(1), IO(2), IO(3)).parMapN(_ + _ + _)
_ <- (IO(1), IO(2), IO(3)).parTupled
_ <- (IO(1), IO(2), IO(3)).parFlatMapN { case (x, y, z) => IO.pure(x + y + z) }
} yield ()
true
}
}
26 changes: 26 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
(IO.pure(42) orElse IO.raiseError[Int](TestException)) must completeAs(42)
}

"adaptError is a no-op for a successful effect" in ticked { implicit ticker =>
IO(42).adaptError { case x => x } must completeAs(42)
}

"adaptError is a no-op for a non-matching error" in ticked { implicit ticker =>
case object TestException1 extends RuntimeException
case object TestException2 extends RuntimeException
IO.raiseError[Unit](TestException1).adaptError {
case TestException2 => TestException2
} must failAs(TestException1)
}

"adaptError transforms the error in a failed effect" in ticked { implicit ticker =>
case object TestException1 extends RuntimeException
case object TestException2 extends RuntimeException
IO.raiseError[Unit](TestException1).adaptError {
case TestException1 => TestException2
} must failAs(TestException2)
}

"attempt is redeem with Left(_) for recover and Right(_) for map" in ticked {
implicit ticker =>
forAll { (io: IO[Int]) => io.attempt eqv io.redeem(Left(_), Right(_)) }
Expand All @@ -126,6 +146,12 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
}
}

"attemptTap(f) is an alias for attempt.flatTap(f).rethrow" in ticked { implicit ticker =>
forAll { (io: IO[Int], f: Either[Throwable, Int] => IO[Int]) =>
io.attemptTap(f) eqv io.attempt.flatTap(f).rethrow
}
}

"rethrow is inverse of attempt" in ticked { implicit ticker =>
forAll { (io: IO[Int]) => io.attempt.rethrow eqv io }
}
Expand Down

0 comments on commit df63f8b

Please sign in to comment.