Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GenConcurrent#parSequenceN_ and GenConcurrent#parTraverseN_. #3916

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import cats.{
Applicative,
CommutativeApplicative,
Eval,
Foldable,
Functor,
Id,
Monad,
Expand Down Expand Up @@ -1420,6 +1421,18 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def parTraverseN[T[_]: Traverse, A, B](n: Int)(ta: T[A])(f: A => IO[B]): IO[T[B]] =
_asyncForIO.parTraverseN(n)(ta)(f)

/**
* Like `Parallel.parTraverse_`, but limits the degree of parallelism.
*/
def parTraverseN_[T[_]: Foldable, A, B](n: Int)(ta: T[A])(f: A => IO[B]): IO[Unit] =
_asyncForIO.parTraverseN_(n)(ta)(f)

/**
* Like `Parallel.parSequence_`, but limits the degree of parallelism.
*/
def parSequenceN_[T[_]: Foldable, A](n: Int)(tma: T[IO[A]]): IO[Unit] =
_asyncForIO.parSequenceN_(n)(tma)

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect.kernel

import cats.{Monoid, Semigroup, Traverse}
import cats.{Foldable, Monoid, Semigroup, Traverse}
import cats.data.{EitherT, IorT, Kleisli, OptionT, WriterT}
import cats.effect.kernel.instances.spawn._
import cats.effect.kernel.syntax.all._
Expand Down Expand Up @@ -123,6 +123,12 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] {
def parSequenceN[T[_]: Traverse, A](n: Int)(tma: T[F[A]]): F[T[A]] =
parTraverseN(n)(tma)(identity)

/**
* Like `Parallel.parSequence_`, but limits the degree of parallelism.
*/
def parSequenceN_[T[_]: Foldable, A](n: Int)(tma: T[F[A]]): F[Unit] =
parTraverseN_(n)(tma)(identity)

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism. Note that the semantics
* of this operation aim to maximise fairness: when a spot to execute becomes available, every
Expand All @@ -136,6 +142,19 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] {
MiniSemaphore[F](n).flatMap { sem => ta.parTraverse { a => sem.withPermit(f(a)) } }
}

/**
* Like `Parallel.parTraverse_`, but limits the degree of parallelism. Note that the semantics
* of this operation aim to maximise fairness: when a spot to execute becomes available, every
* task has a chance to claim it, and not only the next `n` tasks in `ta`
*/
def parTraverseN_[T[_]: Foldable, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[Unit] = {
require(n >= 1, s"Concurrency limit should be at least 1, was: $n")

implicit val F: GenConcurrent[F, E] = this

MiniSemaphore[F](n).flatMap { sem => ta.parTraverse_ { a => sem.withPermit(f(a)) } }
}

override def racePair[A, B](fa: F[A], fb: F[B])
: F[Either[(Outcome[F, E, A], Fiber[F, E, B]), (Fiber[F, E, A], Outcome[F, E, B])]] = {
implicit val F: GenConcurrent[F, E] = this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect.kernel.syntax

import cats.Traverse
import cats.{Foldable, Traverse}
import cats.effect.kernel.GenConcurrent

trait GenConcurrentSyntax {
Expand Down Expand Up @@ -52,11 +52,19 @@ final class ConcurrentParTraverseNOps[T[_], A] private[syntax] (
f: A => F[B]
)(implicit T: Traverse[T], F: GenConcurrent[F, _]): F[T[B]] =
F.parTraverseN(n)(wrapped)(f)

def parTraverseN_[F[_], B](n: Int)(
f: A => F[B]
)(implicit T: Foldable[T], F: GenConcurrent[F, _]): F[Unit] =
F.parTraverseN_(n)(wrapped)(f)
}

final class ConcurrentParSequenceNOps[T[_], F[_], A] private[syntax] (
private val wrapped: T[F[A]]
) extends AnyVal {
def parSequenceN(n: Int)(implicit T: Traverse[T], F: GenConcurrent[F, _]): F[T[A]] =
F.parSequenceN(n)(wrapped)

def parSequenceN_(n: Int)(implicit T: Foldable[T], F: GenConcurrent[F, _]): F[Unit] =
F.parSequenceN_(n)(wrapped)
}
10 changes: 10 additions & 0 deletions kernel/shared/src/test/scala/cats/effect/kernel/SyntaxSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@ class SyntaxSpec extends Specification {
result: F[List[Int]]
}

{
val result = List(1).parTraverseN_(3)(F.pure)
result: F[Unit]
}

{
val result = List(target).parSequenceN(3)
result: F[List[A]]
}

{
val result = List(target).parSequenceN_(3)
result: F[Unit]
}

{
val result = target.parReplicateAN(3)(5)
result: F[List[A]]
Expand Down
27 changes: 27 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOPropSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,33 @@ class IOPropSpec extends BaseSpec with Discipline {
}
}

"parTraverseN_" should {

"never exceed the maximum bound of concurrent tasks" in realProp {
for {
length <- Gen.chooseNum(0, 50)
limit <- Gen.chooseNum(1, 15, 2, 5)
} yield length -> limit
} {
case (length, limit) =>
Queue.unbounded[IO, Int].flatMap { q =>
val task = q.offer(1) >> IO.sleep(7.millis) >> q.offer(-1)
val testRun = List.fill(length)(task).parSequenceN_(limit)
def check(acc: Int = 0): IO[Unit] =
q.tryTake.flatMap {
case None => IO.unit
case Some(n) =>
val newAcc = acc + n
if (newAcc > limit)
IO.raiseError(new Exception(s"Limit of $limit exceeded, was $newAcc"))
else check(newAcc)
}

testRun >> check().mustEqual(())
}
}
}

"parSequenceN" should {
"give the same result as parSequence" in realProp(
Gen.posNum[Int].flatMap(n => arbitrary[List[Int]].map(n -> _))) {
Expand Down
28 changes: 28 additions & 0 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,34 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {

}

"parTraverseN_" should {

"throw when n < 1" in real {
IO.defer {
List.empty[Int].parTraverseN_(0)(_.pure[IO])
}.mustFailWith[IllegalArgumentException]
}

"propagate errors" in real {
List(1, 2, 3)
.parTraverseN_(2) { (n: Int) =>
if (n == 2) IO.raiseError(new RuntimeException) else n.pure[IO]
}
.mustFailWith[RuntimeException]
}

"be cancelable" in ticked { implicit ticker =>
val p = for {
f <- List(1, 2, 3).parTraverseN_(2)(_ => IO.never).start
_ <- IO.sleep(100.millis)
_ <- f.cancel
} yield true

p must completeAs(true)
}

}

"parallel" should {
"run parallel actually in parallel" in real {
val x = IO.sleep(2.seconds) >> IO.pure(1)
Expand Down
Loading