Skip to content

Commit

Permalink
Add optionalVia and unsafeOptionalVia
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Aug 2, 2024
1 parent 5bf60da commit 2c1d6a1
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions.
* add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989))
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))

The Stream Testkit Java DSL has some extra functions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

val flow = Flow[Option[String]]

Source(data).via(
Flow.optionalVia(
flow,
Flow[String].map(_.toInt)
)(Keep.none)
).runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val flow = Flow[(Option[String], Int)]
.asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
.map(_._1)

SourceWithContext
.fromTuples(Source(data)).via(
FlowWithContext.unsafeOptionalVia(
flow,
Flow[String].map(_.toInt)
)(Keep.none)
)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
import Attributes._
val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("")
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

Source.optionalVia(
Source(data),
Flow[String].map(_.toInt)
)(Keep.none)
.runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

"A Source.run" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val source = SourceWithContext.fromTuples(Source(data))

SourceWithContext.unsafeOptionalVia(
source,
Flow[String].map(_.toInt)
)(Keep.none)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -39,6 +40,29 @@ object FlowWithContext {
under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(under)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combineMat How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
*/
@ApiMayChange
def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combineMat: (FMat, FViaMat) => Mat
): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] = {
import pekko.util.OptionConverters._
scaladsl.FlowWithContext.unsafeOptionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(_.toJava).asJava
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -38,6 +39,37 @@ object SourceWithContext {
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}

/**
* Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combineMat How to combine the materialized values of source and viaFlow
* @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original source's element had viaFlow
* applied.
*/
@ApiMayChange
def unsafeOptionalVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat],
viaFlow: Flow[SOut, FOut, FMat],
combineMat: (SMat, FMat) => Mat
): SourceWithContext[Optional[FOut], Ctx, Mat] = {
import pekko.util.OptionConverters._
scaladsl.SourceWithContext.unsafeOptionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combineMat).map(
_.toJava).asJava
}
}

/**
Expand Down
51 changes: 51 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,57 @@ object Flow {
*/
def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f)

/**
* Creates a FlowW from an existing base Flow outputting an optional element and
* applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combineMat How to combine the materialized values of flow and viaFlow
* @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
*/
@ApiMayChange
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat])(
combineMat: (FMat, FViaMat) => Mat
): Flow[FIn, Option[FViaOut], Mat] =
Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combineMat) { implicit b => (s, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Option[FOut]](2))
val merge = b.add(Merge[Option[FViaOut]](2))

val filterAvailable = Flow[Option[FOut]].collect {
case Some(f) => f
}

val filterUnavailable = Flow[Option[FOut]].filter { opt =>
opt.isEmpty
}.map {
_ => Option.empty[FViaOut]
}

val mapIntoOption = Flow[FViaOut].map {
f => Some(f)
}

s ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

FlowShape(s.in, merge.out)
})

/**
* A graph with the shape of a flow logically is a flow, this method makes
* it so also in type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance

import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.NotUsed
import pekko.japi.Pair
import pekko.stream._
Expand All @@ -36,6 +37,70 @@ object FlowWithContext {
def fromTuples[In, CtxIn, Out, CtxOut, Mat](
flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(flow)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combineMat How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
*/
@ApiMayChange
def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Option[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat])(
combineMat: (FMat, FViaMat) => Mat
): FlowWithContext[FIn, Ctx, Option[FViaOut], Ctx, Mat] =
FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combineMat) {
implicit b => (f, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2))
val merge = b.add(Merge[(Option[FViaOut], Ctx)](2))

val unzip = b.add(Unzip[FOut, Ctx]())
val zipper = b.add(Zip[FViaOut, Ctx]())

val filterAvailable = Flow[(Option[FOut], Ctx)].collect {
case (Some(f), ctx) => (f, ctx)
}

val filterUnavailable = Flow[(Option[FOut], Ctx)].filter { case (opt, _) =>
opt.isEmpty
}.map {
case (_, ctx) => (Option.empty[FViaOut], ctx)
}

val mapIntoOption = Flow[(FViaOut, Ctx)].map {
case (f, ctx) => (Some(f), ctx)
}

f ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> unzip.in

unzip.out0 ~> viaF ~> zipper.in0
unzip.out1 ~> zipper.in1

zipper.out ~> mapIntoOption ~> merge.in(0)

broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

FlowShape(f.in, merge.out)
}))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable }
import pekko.annotation.InternalApi
import pekko.annotation.{ ApiMayChange, InternalApi }
import pekko.stream._
import pekko.stream.impl._
import pekko.stream.impl.Stages.DefaultAttributes
Expand Down Expand Up @@ -308,6 +308,57 @@ object Source {
fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource)
}

/**
* Creates a Source from an existing base Source outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combineMat How to combine the materialized values of source and viaFlow
* @return a Source with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original source's element had viaFlow
* applied.
*/
@ApiMayChange
def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Option[SOut], SMat],
viaFlow: Flow[SOut, FOut, FMat])(
combineMat: (SMat, FMat) => Mat
): Source[Option[FOut], Mat] =
Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combineMat) { implicit b => (s, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Option[SOut]](2))
val merge = b.add(Merge[Option[FOut]](2))

val filterAvailable = Flow[Option[SOut]].collect {
case Some(f) => f
}

val filterUnavailable = Flow[Option[SOut]].filter { opt =>
opt.isEmpty
}.map {
_ => Option.empty[FOut]
}

val mapIntoOption = Flow[FOut].map {
f => Some(f)
}

s ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

SourceShape(merge.out)
})

/**
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.
Expand Down
Loading

0 comments on commit 2c1d6a1

Please sign in to comment.