Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed cats.syntax.all._ into IO. #3532

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cats.effect.benchmarks
import cats.effect.IO
import cats.effect.std._
import cats.effect.unsafe.implicits.global
import cats.syntax.all._

import org.openjdk.jmh.annotations._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cats.effect.benchmarks
import cats.effect.IO
import cats.effect.std._
import cats.effect.unsafe.implicits.global
import cats.syntax.all._

import org.openjdk.jmh.annotations._

Expand Down
142 changes: 127 additions & 15 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import cats.{
Id,
Monad,
Monoid,
NonEmptyParallel,
Now,
Parallel,
Semigroup,
Expand All @@ -42,16 +43,11 @@ import cats.effect.kernel.GenTemporal.handleDuration
import cats.effect.std.{Backpressure, Console, Env, Supervisor, UUIDGen}
import cats.effect.tracing.{Tracing, TracingEvent}
import cats.effect.unsafe.IORuntime
import cats.syntax._
import cats.syntax.all._

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{
CancellationException,
ExecutionContext,
Future,
Promise,
TimeoutException
}
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -162,6 +158,15 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def <&[B](that: IO[B]): IO[A] =
both(that).map { case (a, _) => a }

/**
* Transform certain errors using `pf` and rethrow them. Non matching errors and successful
* values are not affected by this function.
*
* Implements `ApplicativeError.adaptError`.
*/
def adaptError[E](pf: PartialFunction[Throwable, Throwable]): IO[A] =
recoverWith(pf.andThen(IO.raiseError[A] _))

/**
* Replaces the result of this IO with the given value.
*/
Expand All @@ -186,6 +191,15 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def attempt: IO[Either[Throwable, A]] =
IO.Attempt(this)

/**
* Reifies the value or error of the source and performs an effect on the result, then
* recovers the original value or error back into `IO`.
*
* Implements `MonadError.attemptTap`.
*/
def attemptTap[B](f: Either[Throwable, A] => IO[B]): IO[A] =
attempt.flatTap(f).rethrow

/**
* Replaces failures in this IO with an empty Option.
*/
Expand Down Expand Up @@ -575,6 +589,36 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def onError(f: Throwable => IO[Unit]): IO[A] =
handleErrorWith(t => f(t).voidError *> IO.raiseError(t))

/**
* Like `Parallel.parProductL`
*/
def parProductL[B](iob: IO[B])(implicit P: NonEmptyParallel[IO]): IO[A] =
P.parProductL[A, B](this)(iob)

/**
* Like `Parallel.parProductR`
*/
def parProductR[B](iob: IO[B])(implicit P: NonEmptyParallel[IO]): IO[B] =
P.parProductR[A, B](this)(iob)

/**
* Like `Parallel.parProduct`
*/
def parProduct[B](iob: IO[B])(implicit P: NonEmptyParallel[IO]): IO[(A, B)] =
Parallel.parProduct(this, iob)(P)

/**
* Like `Parallel.parReplicateA`
*/
def parReplicateA(n: Int): IO[List[A]] =
List.fill(n)(this).parSequence

/**
* Like `Parallel.parReplicateA_`
*/
def parReplicateA_(n: Int): IO[Unit] =
List.fill(n)(this).parSequence_

def race[B](that: IO[B]): IO[Either[A, B]] =
IO.race(this, that)

Expand Down Expand Up @@ -1107,7 +1151,45 @@ private[effect] trait IOLowPriorityImplicits {
}
}

object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
object IO extends IOCompanionPlatform with IOLowPriorityImplicits with TupleParallelSyntax {

implicit final def catsSyntaxParallelSequence1[T[_], A](
toia: T[IO[A]]): ParallelSequenceOps1[T, IO, A] = new ParallelSequenceOps1(toia)

implicit final def catsSyntaxParallelSequence_[T[_], A](
tioa: T[IO[A]]): ParallelSequence_Ops[T, IO, A] =
new ParallelSequence_Ops(tioa)

implicit final def catsSyntaxParallelUnorderedSequence[T[_], A](
tioa: T[IO[A]]): ParallelUnorderedSequenceOps[T, IO, A] =
new ParallelUnorderedSequenceOps(tioa)

implicit final def catsSyntaxParallelFlatSequence1[T[_], A](
tioa: T[IO[T[A]]]): ParallelFlatSequenceOps1[T, IO, A] =
new ParallelFlatSequenceOps1(tioa)

implicit final def catsSyntaxParallelUnorderedFlatSequence[T[_], A](
tiota: T[IO[T[A]]]): ParallelUnorderedFlatSequenceOps[T, IO, A] =
new ParallelUnorderedFlatSequenceOps(tiota)

implicit final def catsSyntaxParallelSequenceFilter[T[_], A](
x: T[IO[Option[A]]]): ParallelSequenceFilterOps[T, IO, A] =
new ParallelSequenceFilterOps(x)

implicit class IOFlatSequenceOps[T[_], A](tiota: T[IO[T[A]]]) {
def flatSequence(
implicit T: Traverse[T],
G: Applicative[IO],
F: cats.FlatMap[T]): IO[T[A]] = {
tiota.sequence(T, G).map(F.flatten)
}
}

implicit class IOSequenceOps[T[_], A](tioa: T[IO[A]]) {
def sequence(implicit T: Traverse[T], G: Applicative[IO]): IO[T[A]] = T.sequence(tioa)(G)

def sequence_(implicit F: Foldable[T], G: Applicative[IO]): IO[Unit] = F.sequence_(tioa)(G)
}

/**
* Newtype encoding for an `IO` datatype that has a `cats.Applicative` capable of doing
Expand Down Expand Up @@ -1415,6 +1497,18 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
*/
def some[A](a: A): IO[Option[A]] = pure(Some(a))

/**
* Like `Parallel.parTraverse`
*/
def parTraverse[T[_]: Traverse, A, B](ta: T[A])(f: A => IO[B]): IO[T[B]] =
ta.parTraverse(f)

/**
* Like `Parallel.parTraverse_`
*/
def parTraverse_[T[_]: Foldable, A, B](ta: T[A])(f: A => IO[B]): IO[Unit] =
ta.parTraverse_(f)

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
Expand All @@ -1428,22 +1522,34 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
_asyncForIO.parTraverseN_(n)(ta)(f)

/**
* Like `Parallel.parSequence_`, but limits the degree of parallelism.
* Like `Parallel.parSequence`
*/
def parSequenceN_[T[_]: Foldable, A](n: Int)(tma: T[IO[A]]): IO[Unit] =
_asyncForIO.parSequenceN_(n)(tma)
def parSequence[T[_]: Traverse, A](tioa: T[IO[A]]): IO[T[A]] =
tioa.parSequence

/**
* Like `Parallel.parSequence_`
*/
def parSequence_[T[_]: Foldable, A](tioa: T[IO[A]]): IO[Unit] =
tioa.parSequence_

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_]: Traverse, A](n: Int)(tma: T[IO[A]]): IO[T[A]] =
_asyncForIO.parSequenceN(n)(tma)
def parSequenceN[T[_]: Traverse, A](n: Int)(tioa: T[IO[A]]): IO[T[A]] =
_asyncForIO.parSequenceN(n)(tioa)

/**
* Like `Parallel.parSequence_`, but limits the degree of parallelism.
*/
def parSequenceN_[T[_]: Foldable, A](n: Int)(tma: T[IO[A]]): IO[Unit] =
_asyncForIO.parSequenceN_(n)(tma)

/**
* Like `Parallel.parReplicateA`, but limits the degree of parallelism.
*/
def parReplicateAN[A](n: Int)(replicas: Int, ma: IO[A]): IO[List[A]] =
_asyncForIO.parReplicateAN(n)(replicas, ma)
def parReplicateAN[A](n: Int)(replicas: Int, ioa: IO[A]): IO[List[A]] =
_asyncForIO.parReplicateAN(n)(replicas, ioa)

/**
* Lifts a pure value into `IO`.
Expand Down Expand Up @@ -1514,6 +1620,12 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def trace: IO[Trace] =
IOTrace

def traverse[T[_]: Traverse, A, B](ta: T[A])(f: A => IO[B]): IO[T[B]] =
ta.traverse(f)(_asyncForIO)

def traverse_[T[_]: Foldable, A, B](ta: T[A])(f: A => IO[B]): IO[Unit] =
ta.traverse_(f)(_asyncForIO)

private[effect] def runtime: IO[IORuntime] = ReadRT

def pollers: IO[List[Any]] =
Expand Down
1 change: 0 additions & 1 deletion tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package cats.effect

import cats.effect.unsafe.IORuntime
import cats.syntax.all._

import scala.concurrent.duration._

Expand Down
53 changes: 53 additions & 0 deletions tests/shared/src/test/scala-2.13+/cats/effect/IOImplicitSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

class IOImplicitSpec extends BaseSpec {

"Can resolve IO sequence ops without import of cats.syntax.all" in { // compilation test
for {
_ <- List(IO(1)).sequence_
_ <- Option(IO(1)).sequence
_ <- Option(IO(1)).sequence_
_ <- List(IO(List(1))).flatSequence
} yield ()
true
}

"Can resolve IO.Par ops without import of cats.syntax.all" in { // compilation test
for {
_ <- Option(IO(1)).parSequence
_ <- Option(IO(1)).parSequence_
_ <- IO(1).parReplicateA(2)
_ <- IO(1).parReplicateA_(2)
_ <- IO(1).parProduct(IO(2))
_ <- IO(1).parProductL(IO(2))
_ <- IO(1).parProductR(IO(2))
_ <- List(IO(Option(1))).parSequenceFilter
_ <- List(IO(1)).parUnorderedSequence
_ <- List(IO(List(1))).parFlatSequence
_ <- List(IO(List(1))).parUnorderedFlatSequence
_ <- (IO(1), IO(2)).parMapN(_ + _)
_ <- (IO(1), IO(2)).parTupled
_ <- (IO(1), IO(2)).parFlatMapN { case (x, y) => IO.pure(x + y) }
_ <- (IO(1), IO(2), IO(3)).parMapN(_ + _ + _)
_ <- (IO(1), IO(2), IO(3)).parTupled
_ <- (IO(1), IO(2), IO(3)).parFlatMapN { case (x, y, z) => IO.pure(x + y + z) }
} yield ()
true
}
}
26 changes: 26 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
(IO.pure(42) orElse IO.raiseError[Int](TestException)) must completeAs(42)
}

"adaptError is a no-op for a successful effect" in ticked { implicit ticker =>
IO(42).adaptError { case x => x } must completeAs(42)
}

"adaptError is a no-op for a non-matching error" in ticked { implicit ticker =>
case object TestException1 extends RuntimeException
case object TestException2 extends RuntimeException
IO.raiseError[Unit](TestException1).adaptError {
case TestException2 => TestException2
} must failAs(TestException1)
}

"adaptError transforms the error in a failed effect" in ticked { implicit ticker =>
case object TestException1 extends RuntimeException
case object TestException2 extends RuntimeException
IO.raiseError[Unit](TestException1).adaptError {
case TestException1 => TestException2
} must failAs(TestException2)
}

"attempt is redeem with Left(_) for recover and Right(_) for map" in ticked {
implicit ticker =>
forAll { (io: IO[Int]) => io.attempt eqv io.redeem(Left(_), Right(_)) }
Expand All @@ -126,6 +146,12 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
}
}

"attemptTap(f) is an alias for attempt.flatTap(f).rethrow" in ticked { implicit ticker =>
forAll { (io: IO[Int], f: Either[Throwable, Int] => IO[Int]) =>
io.attemptTap(f) eqv io.attempt.flatTap(f).rethrow
}
}

"rethrow is inverse of attempt" in ticked { implicit ticker =>
forAll { (io: IO[Int]) => io.attempt.rethrow eqv io }
}
Expand Down
Loading