Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parTraverseN and parSequenceN #498

Merged
merged 4 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/shared/src/main/scala/cats/effect/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import simulacrum._
import cats.implicits._
import cats.data._
import cats.effect.IO.{Delay, Pure, RaiseError}
import cats.effect.concurrent.{Ref, Deferred}
import cats.effect.concurrent.{Deferred, Ref, Semaphore}
import cats.effect.internals.{Callback, IORunLoop}

import scala.annotation.implicitNotFound
Expand Down Expand Up @@ -319,6 +319,26 @@ object Async {
}
}

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
def parTraverseN[T[_]: Traverse, M[_], F[_], A, B](n: Long)(ta: T[A])(f: A => M[B])(implicit M: Async[M], P: Parallel[M, F]): M[T[B]] =
for {
semaphore <- Semaphore.uncancelable(n)(M)
sideeffffect marked this conversation as resolved.
Show resolved Hide resolved
tb <- ta.parTraverse { a =>
semaphore.withPermit(f(a))
}
} yield tb

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_]: Traverse, M[_], F[_], A](n: Long)(tma: T[M[A]])(implicit M: Async[M], P: Parallel[M, F]): M[T[A]] =
for {
semaphore <- Semaphore.uncancelable(n)(M)
mta <- tma.map(semaphore.withPermit).parSequence
} yield mta

/**
* [[Async]] instance built for `cats.data.EitherT` values initialized
* with any `F` data type that also implements `Async`.
Expand Down
22 changes: 21 additions & 1 deletion core/shared/src/main/scala/cats/effect/Concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package cats
package effect

import cats.data._
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.concurrent.{Deferred, Ref, Semaphore}
import cats.effect.ExitCase.Canceled
import cats.effect.IO.{Delay, Pure, RaiseError}
import cats.effect.internals.Callback.{rightUnit, successUnit}
Expand Down Expand Up @@ -584,6 +584,26 @@ object Concurrent {
}
}

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
def parTraverseN[T[_]: Traverse, M[_], F[_], A, B](n: Long)(ta: T[A])(f: A => M[B])(implicit M: Concurrent[M], P: Parallel[M, F]): M[T[B]] =
for {
semaphore <- Semaphore(n)(M)
tb <- ta.parTraverse { a =>
semaphore.withPermit(f(a))
}
} yield tb

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_]: Traverse, M[_], F[_], A](n: Long)(tma: T[M[A]])(implicit M: Concurrent[M], P: Parallel[M, F]): M[T[A]] =
for {
semaphore <- Semaphore(n)(M)
mta <- tma.map(semaphore.withPermit).parSequence
} yield mta

/**
* [[Concurrent]] instance built for `cats.data.EitherT` values initialized
* with any `F` data type that also implements `Concurrent`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package cats.effect.syntax

trait AllCatsEffectSyntax
extends BracketSyntax
with AsyncSyntax
with ConcurrentSyntax
with EffectSyntax
with ConcurrentEffectSyntax
with ParallelNSyntax
41 changes: 41 additions & 0 deletions core/shared/src/main/scala/cats/effect/syntax/AsyncSyntax.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.syntax

import cats.effect.Async
import cats.{Parallel, Traverse}

trait AsyncSyntax extends Async.ToAsyncOps {

implicit def catsEffectSyntaxAsyncObj[F[_]](F: Async[F]): AsyncObjOps[F] =
new AsyncObjOps[F](F)
}

final class AsyncObjOps[F[_]](private val F: Async[F]) extends AnyVal {

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
def parTraverseN[T[_], G[_], A, B](n: Long)(ta: T[A])(f: A => F[B])(implicit T: Traverse[T], P: Parallel[F, G]): F[T[B]] =
Async.parTraverseN(n)(ta)(f)(T, F, P)

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_], G[_], A](n: Long)(tma: T[F[A]])(implicit T: Traverse[T], P: Parallel[F, G]): F[T[A]] =
Async.parSequenceN(n)(tma)(T, F, P)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package cats.effect.syntax

import scala.concurrent.duration.FiniteDuration
import cats.{Parallel, Traverse}

import scala.concurrent.duration.FiniteDuration
import cats.effect.{Concurrent, Timer}


trait ConcurrentSyntax extends Concurrent.ToConcurrentOps {
implicit def catsEffectSyntaxConcurrent[F[_], A](fa: F[A]): ConcurrentOps[F, A] =
new ConcurrentOps[F, A](fa)

implicit def catsEffectSyntaxConcurrentObj[F[_]](F: Concurrent[F]): ConcurrentObjOps[F] =
new ConcurrentObjOps[F](F)
}

final class ConcurrentOps[F[_], A](val self: F[A]) extends AnyVal {
Expand All @@ -33,3 +37,18 @@ final class ConcurrentOps[F[_], A](val self: F[A]) extends AnyVal {
def timeoutTo(duration: FiniteDuration, fallback: F[A])(implicit F: Concurrent[F], timer: Timer[F]): F[A] =
Concurrent.timeoutTo(self, duration, fallback)
}

final class ConcurrentObjOps[F[_]](private val F: Concurrent[F]) extends AnyVal {

/**
* Like `Parallel.parTraverse`, but limits the degree of parallelism.
*/
def parTraverseN[T[_], G[_], A, B](n: Long)(ta: T[A])(f: A => F[B])(implicit T: Traverse[T], P: Parallel[F, G]): F[T[B]] =
Concurrent.parTraverseN(n)(ta)(f)(T, F, P)

/**
* Like `Parallel.parSequence`, but limits the degree of parallelism.
*/
def parSequenceN[T[_], G[_], A](n: Long)(tma: T[F[A]])(implicit T: Traverse[T], P: Parallel[F, G]): F[T[A]] =
Concurrent.parSequenceN(n)(tma)(T, F, P)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.syntax

import cats.effect.Concurrent
import cats.effect.implicits._
import cats.{Monad, Parallel, Traverse}

import scala.language.{higherKinds, implicitConversions}

trait ParallelNSyntax {

implicit final def catsSyntaxParallelTraverseNConcurrent[T[_]: Traverse, A](ta: T[A]): ParallelTraversableNConcurrentOps[T, A] =
new ParallelTraversableNConcurrentOps[T, A](ta)

implicit final def catsSyntaxParallelSequenceNConcurrent[T[_]: Traverse, M[_]: Monad, A](
tma: T[M[A]]
): ParallelSequenceNConcurrentOps[T, M, A] = new ParallelSequenceNConcurrentOps[T, M, A](tma)

}

final class ParallelSequenceNConcurrentOps[T[_], M[_], A](private val tma: T[M[A]]) extends AnyVal {

def parSequenceN[F[_]](n: Long)(implicit M: Concurrent[M], T: Traverse[T], P: Parallel[M, F]): M[T[A]] =
M.parSequenceN(n)(tma)
}

final class ParallelTraversableNConcurrentOps[T[_], A](private val ta: T[A]) extends AnyVal {

def parTraverseN[M[_], F[_], B](n: Long)(f: A => M[B])(implicit M: Concurrent[M], T: Traverse[T], P: Parallel[M, F]): M[T[B]] =
M.parTraverseN(n)(ta)(f)
}
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/cats/effect/syntax/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package cats.effect
package object syntax {
object all extends AllCatsEffectSyntax
object bracket extends BracketSyntax
object async extends AsyncSyntax
object concurrent extends ConcurrentSyntax
object concurrentEffect extends ConcurrentEffectSyntax
object effect extends EffectSyntax
object paralleln extends ParallelNSyntax
}
59 changes: 59 additions & 0 deletions core/shared/src/test/scala/cats/effect/AsyncTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

import cats.Eq
import cats.effect.concurrent.Ref
import cats.effect.implicits._
import cats.implicits._
import org.scalatest.compatible.Assertion
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.{Matchers, Succeeded}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

class AsyncTests extends AsyncFunSuite with Matchers {

implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit val timer: Timer[IO] = IO.timer(executionContext)
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

private val smallDelay: IO[Unit] = timer.sleep(20.millis)

private def awaitEqual[A: Eq](t: IO[A], success: A): IO[Unit] =
t.flatMap(a => if (Eq[A].eqv(a, success)) IO.unit else smallDelay *> awaitEqual(t, success))

private def run(t: IO[Unit]): Future[Assertion] = t.as(Succeeded).unsafeToFuture

test("F.parTraverseN(n)(collection)(f)") {
val finalValue = 100
val r = Ref.unsafe[IO, Int](0)
val list = List.range(0, finalValue)
val modifies = implicitly[Async[IO]].parTraverseN(3)(list)(_ => IO.shift *> r.update(_ + 1))
run(IO.shift *> modifies.start *> awaitEqual(r.get, finalValue))
}

test("F.parSequenceN(n)(collection)") {
val finalValue = 100
val r = Ref.unsafe[IO, Int](0)
val list = List.fill(finalValue)(IO.shift *> r.update(_ + 1))
val modifies = implicitly[Async[IO]].parSequenceN(3)(list)
run(IO.shift *> modifies.start *> awaitEqual(r.get, finalValue))
}

}
75 changes: 75 additions & 0 deletions core/shared/src/test/scala/cats/effect/ConcurrentTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

import cats.Eq
import cats.effect.concurrent.Ref
import cats.effect.implicits._
import cats.implicits._
import org.scalatest.compatible.Assertion
import org.scalatest.funsuite.AsyncFunSuite
import org.scalatest.{Matchers, Succeeded}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

class ConcurrentTests extends AsyncFunSuite with Matchers {

implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit val timer: Timer[IO] = IO.timer(executionContext)
implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

private val smallDelay: IO[Unit] = timer.sleep(20.millis)

private def awaitEqual[A: Eq](t: IO[A], success: A): IO[Unit] =
t.flatMap(a => if (Eq[A].eqv(a, success)) IO.unit else smallDelay *> awaitEqual(t, success))

private def run(t: IO[Unit]): Future[Assertion] = t.as(Succeeded).unsafeToFuture

test("F.parTraverseN(n)(collection)(f)") {
val finalValue = 100
val r = Ref.unsafe[IO, Int](0)
val list = List.range(0, finalValue)
val modifies = implicitly[Concurrent[IO]].parTraverseN(3)(list)(_ => IO.shift *> r.update(_ + 1))
run(IO.shift *> modifies.start *> awaitEqual(r.get, finalValue))
}

test("collection.parTraverseN(n)(f)") {
val finalValue = 100
val r = Ref.unsafe[IO, Int](0)
val list = List.range(0, finalValue)
val modifies = list.parTraverseN(3)(_ => IO.shift *> r.update(_ + 1))
run(IO.shift *> modifies.start *> awaitEqual(r.get, finalValue))
}

test("F.parSequenceN(n)(collection)") {
val finalValue = 100
val r = Ref.unsafe[IO, Int](0)
val list = List.fill(finalValue)(IO.shift *> r.update(_ + 1))
val modifies = implicitly[Concurrent[IO]].parSequenceN(3)(list)
run(IO.shift *> modifies.start *> awaitEqual(r.get, finalValue))
}

test("collection.parSequenceN(n)") {
val finalValue = 100
val r = Ref.unsafe[IO, Int](0)
val list = List.fill(finalValue)(IO.shift *> r.update(_ + 1))
val modifies = list.parSequenceN(3)
run(IO.shift *> modifies.start *> awaitEqual(r.get, finalValue))
}

}
22 changes: 21 additions & 1 deletion core/shared/src/test/scala/cats/effect/SyntaxTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package cats.effect

import cats.{Parallel, Traverse}
import cats.effect.syntax.AllCatsEffectSyntax

import scala.concurrent.duration._

object SyntaxTests extends AllCatsEffectSyntax {
Expand All @@ -40,7 +42,17 @@ object SyntaxTests extends AllCatsEffectSyntax {
typed[F[A]](acquire.guaranteeCase(finalCase))
}

def concurrentSyntax[F[_]: Concurrent, A, B](implicit timer: Timer[F]) = {
def asyncSyntax[T[_]: Traverse, F[_], G[_], A, B](implicit F: Async[F], P: Parallel[F, G]) = {
val n = mock[Long]
val ta = mock[T[A]]
val f = mock[A => F[B]]
val tma = mock[T[F[A]]]

typed[F[T[B]]](F.parTraverseN(n)(ta)(f))
typed[F[T[A]]](F.parSequenceN(n)(tma))
}

def concurrentSyntax[T[_]: Traverse, F[_], G[_], A, B](implicit F: Concurrent[F], P: Parallel[F, G], timer: Timer[F]) = {
val fa = mock[F[A]]
val fa2 = mock[F[A]]
val fb = mock[F[B]]
Expand All @@ -50,6 +62,14 @@ object SyntaxTests extends AllCatsEffectSyntax {
typed[F[Either[(A, Fiber[F, B]), (Fiber[F, A], B)]]](fa.racePair(fb))
typed[F[A]](fa.timeout(1.second))
typed[F[A]](fa.timeoutTo(1.second, fa2))

val n = mock[Long]
val ta = mock[T[A]]
val f = mock[A => F[B]]
val tma = mock[T[F[A]]]

typed[F[T[B]]](F.parTraverseN(n)(ta)(f))
typed[F[T[A]]](F.parSequenceN(n)(tma))
}

def effectSyntax[F[_]: Effect, A] = {
Expand Down