Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parallel traverseFilter functions #3467

Merged
merged 2 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions core/src/main/scala/cats/Parallel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,74 @@ object Parallel extends ParallelArityFunctions2 {
def apply[M[_], F[_]](implicit P: Parallel.Aux[M, F]): Parallel.Aux[M, F] = P
def apply[M[_]](implicit P: Parallel[M], D: DummyImplicit): Parallel.Aux[M, P.F] = P

/**
* Like `TraverseFilter#traverseFilter`, but uses the applicative instance
* corresponding to the Parallel instance instead.
*
* Example:
* {{{
* scala> import cats.implicits._
* scala> import cats.data._
* scala> val list: List[Int] = List(1, 2, 3, 4)
* scala> def validate(n: Int): EitherNec[String, Option[Int]] =
* | if (n > 100) Left(NonEmptyChain.one("Too large"))
* | else if (n % 3 =!= 0) Right(Some(n))
* | else Right(None)
* scala> list.parTraverseFilter(validate)
* res0: EitherNec[String, List[Int]] = Right(List(1, 2, 4))
* }}}
*/
def parTraverseFilter[T[_], M[_], A, B](
ta: T[A]
)(f: A => M[Option[B]])(implicit T: TraverseFilter[T], P: Parallel[M]): M[T[B]] = {
val ftb: P.F[T[B]] = T.traverseFilter[P.F, A, B](ta)(a => P.parallel(f(a)))(P.applicative)

P.sequential(ftb)
}

/**
* Like `TraverseFilter#sequenceFilter`, but uses the applicative instance
* corresponding to the Parallel instance instead.
*
* Example:
* {{{
* scala> import cats.implicits._
* scala> import cats.data._
* scala> val list: List[EitherNec[String, Option[Int]]] = List(Left(NonEmptyChain.one("Error")), Left(NonEmptyChain.one("Warning!")))
* scala> list.parSequenceFilter
* res0: EitherNec[String, List[Int]] = Left(Chain(Error, Warning!))
* }}}
*/
def parSequenceFilter[T[_], M[_], A](ta: T[M[Option[A]]])(implicit T: TraverseFilter[T], P: Parallel[M]): M[T[A]] = {
val fta: P.F[T[A]] = T.traverseFilter[P.F, M[Option[A]], A](ta)(P.parallel.apply(_))(P.applicative)

P.sequential(fta)
}

/**
* Like `TraverseFilter#filterA`, but uses the applicative instance
* corresponding to the Parallel instance instead.
*
* Example:
* {{{
* scala> import cats.implicits._
* scala> import cats.data._
* scala> val list: List[Int] = List(1, 2, 3, 4)
* scala> def validate(n: Int): EitherNec[String, Boolean] =
* | if (n > 100) Left(NonEmptyChain.one("Too large"))
* | else Right(n % 3 =!= 0)
* scala> list.parFilterA(validate)
* res0: EitherNec[String, List[Int]] = Right(List(1, 2, 4))
* }}}
*/
def parFilterA[T[_], M[_], A](
ta: T[A]
)(f: A => M[Boolean])(implicit T: TraverseFilter[T], P: Parallel[M]): M[T[A]] = {
val fta: P.F[T[A]] = T.filterA(ta)(a => P.parallel(f(a)))(P.applicative)

P.sequential(fta)
}

/**
* Like `Traverse[A].sequence`, but uses the applicative instance
* corresponding to the Parallel instance instead.
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/cats/syntax/all.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ trait AllSyntax
with VectorSyntax
with WriterSyntax
with ParallelFoldMapASyntax
with ParallelTraverseFilterSyntax

trait AllSyntaxBinCompat0 extends UnorderedTraverseSyntax with ApplicativeErrorExtension with TrySyntax

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/cats/syntax/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ package object syntax {
with ParallelBitraverseSyntax
with ParallelUnorderedTraverseSyntax
with ParallelFoldMapASyntax
with ParallelTraverseFilterSyntax
object partialOrder extends PartialOrderSyntax
object profunctor extends ProfunctorSyntax
object reducible extends ReducibleSyntax with ReducibleSyntaxBinCompat0
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/cats/syntax/parallel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cats.{
Monoid,
Parallel,
Traverse,
TraverseFilter,
UnorderedTraverse
}

Expand Down Expand Up @@ -41,6 +42,18 @@ trait ParallelFlatSyntax {
): ParallelFlatSequenceOps[T, M, A] = new ParallelFlatSequenceOps[T, M, A](tmta)
}

trait ParallelTraverseFilterSyntax {
implicit final def catsSyntaxParallelTraverseFilter[T[_]: TraverseFilter, A](
ta: T[A]
): ParallelTraverseFilterOps[T, A] =
new ParallelTraverseFilterOps[T, A](ta)

implicit final def catsSyntaxParallelSequenceFilter[T[_]: TraverseFilter, M[_]: Parallel, A](
tmoa: T[M[Option[A]]]
): ParallelSequenceFilterOps[T, M, A] =
new ParallelSequenceFilterOps[T, M, A](tmoa)
}

trait ParallelTraverseSyntax {
implicit final def catsSyntaxParallelTraverse_[T[_]: Foldable, A](ta: T[A]): ParallelTraversable_Ops[T, A] =
new ParallelTraversable_Ops[T, A](ta)
Expand Down Expand Up @@ -100,6 +113,19 @@ final class ParallelTraversableOps[T[_], A](private val ta: T[A]) extends AnyVal

}

final class ParallelTraverseFilterOps[T[_], A](private val ta: T[A]) extends AnyVal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better to consolidate these, so that all enrichment methods for T[A] are on a single enrichment class, mostly in order to reduce the number of implicits in cats.syntax.all, but also just to simplify this code. In this case that would mean putting these methods on something named ParallelTraversableOps or ParallelFoldMapAOps, though, which also seems kind of bad.

I think for now this version is probably best, but we should definitely revisit this in Cats 3.

def parTraverseFilter[M[_]: Parallel, B](f: A => M[Option[B]])(implicit T: TraverseFilter[T]): M[T[B]] =
Parallel.parTraverseFilter(ta)(f)

def parFilterA[M[_]: Parallel](f: A => M[Boolean])(implicit T: TraverseFilter[T]): M[T[A]] =
Parallel.parFilterA(ta)(f)
}

final class ParallelSequenceFilterOps[T[_], M[_], A](private val tmoa: T[M[Option[A]]]) extends AnyVal {
def parSequenceFilter(implicit P: Parallel[M], T: TraverseFilter[T]): M[T[A]] =
Parallel.parSequenceFilter(tmoa)
}

final class ParallelTraversable_Ops[T[_], A](private val ta: T[A]) extends AnyVal {
def parTraverse_[M[_], B](f: A => M[B])(implicit T: Foldable[T], P: Parallel[M]): M[Unit] =
Parallel.parTraverse_(ta)(f)
Expand Down