diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md index 4a0ae7611b9..9571bdda364 100644 --- a/docs/src/main/paradox/release-notes/releases-1.1.md +++ b/docs/src/main/paradox/release-notes/releases-1.1.md @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions. * add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989)) * add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244)) * added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269)) +* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) The Stream Testkit Java DSL has some extra functions. diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md new file mode 100644 index 00000000000..170bac8d817 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md @@ -0,0 +1,34 @@ +# optionalVia + +For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.optionalVia](Source$) { scala="#optionalVia%5BSOut,FOut,SMat,FMat,Mat](source:org.apache.pekko.stream.scaladsl.Source%5BOption%5BSOut],SMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BSOut,FOut,FMat])(combine:(SMat,FMat)=%3EMat):org.apache.pekko.stream.scaladsl.Source%5BOption%5BFOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Source,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" } +@apidoc[Flow.optionalVia](Flow$) { scala="#optionalVia%5BFIn,FOut,FViaOut,FMat,FViaMat,Mat](flow:org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFOut],FMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BFOut,FViaOut,FViaMat])(combine:(FMat,FViaMat)=%3EMat):org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFViaOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" } + +## Description + +For a stream containing optional elements, transforms each element by applying +the given `viaFlow` and passing the value downstream as an optional value. + +Scala +: @@snip [OptionalVia.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala) { #optionalVia } + +Java +: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #optionalVia } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** while the provided viaFlow is runs with defined elements + +**backpressures** when the viaFlow runs for the defined elements and downstream backpressures + +**completes** when the upstream completes + +@@@ + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index cc6d698a5b6..3de82bd35fd 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -180,6 +180,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.| |Source/Flow|@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.| +|Source/Flow|@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.| |Source/Flow|@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.| |Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| |Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.| @@ -558,6 +559,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [onErrorComplete](Source-or-Flow/onErrorComplete.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) +* [optionalVia](Source-or-Flow/optionalVia.md) * [orElse](Source-or-Flow/orElse.md) * [Partition](Partition.md) * [prefixAndTail](Source-or-Flow/prefixAndTail.md) diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 4385dd2de1f..3bab8fb3de8 100644 --- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -532,6 +532,23 @@ void foldAsyncExample() { // #foldAsync } + void optionalViaExample() { + + // #optionalVia + Flow flow = Flow.fromFunction(Integer::parseInt); + + Source, NotUsed> source = + Source.from( + Arrays.asList(Optional.of("1"), Optional.empty(), Optional.empty(), Optional.of("4"))); + + Source.optionalVia(source, flow, Keep.none()).runForeach(System.out::println, system); + // Optional[1] + // Optional.empty + // Optional.empty + // Optional[4] + // #optionalVia + } + void takeExample() { // #take Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system); diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala new file mode 100644 index 00000000000..64714df49ec --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.sourceorflow + +object OptionalVia { + def optionalViaExample(): Unit = { + import org.apache.pekko.actor.ActorSystem + import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Source } + + implicit val system: ActorSystem = ActorSystem() + + // #optionalVia + Source.optionalVia( + Source(List(Some("1"), None, None, Some("4"))), + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none).runForeach(println) + // Some(1) + // None + // None + // Some(4) + // #optionalVia + } + +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index 632f98a320e..3c918db1ecb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r "should be created from a function easily" in { Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10) } + + "Apply a viaFlow with optional elements using optionalVia" in { + val data = List(Some("1"), None, None, Some("4")) + + val flow = Flow[Option[String]] + + Source(data).via( + Flow.optionalVia( + flow, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + ).runWith(TestSink.probe[Option[Int]]) + .request(4) + .expectNext(Some(1), None, None, Some(4)) + .expectComplete() + } } /** diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 0036cf34a13..9ac07e2a39b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec { .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) .expectComplete() } + + "Apply a viaFlow with optional elements using unsafeOptionalVia" in { + val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4)) + + val flow = Flow[(Option[String], Int)] + .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) + .map(_._1) + + SourceWithContext + .fromTuples(Source(data)).via( + FlowWithContext.unsafeOptionalDataVia( + flow, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + ) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 2444e724298..a54a17a4c7f 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout { import Attributes._ val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("") } + + "Apply a viaFlow with optional elements using optionalVia" in { + val data = List(Some("1"), None, None, Some("4")) + + Source.optionalVia( + Source(data), + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + .runWith(TestSink.probe[Option[Int]]) + .request(4) + .expectNext(Some(1), None, None, Some(4)) + .expectComplete() + } } "A Source.run" must { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 137b83ee888..62dda8a96fa 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) .expectComplete() } + + "Apply a viaFlow with optional elements using unsafeOptionalVia" in { + val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4)) + + val source = SourceWithContext.fromTuples(Source(data)) + + SourceWithContext.unsafeOptionalDataVia( + source, + Flow.fromFunction { (string: String) => string.toInt } + )(Keep.none) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1963a97f584..043b5fe50cb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -69,6 +69,32 @@ object Flow { def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] = Flow.create[I]().map(f) + /** + * Creates a Flow from an existing base Flow outputting an optional element and + * applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of flow and viaFlow + * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Optional which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat], + combine: function.Function2[FMat, FViaMat, Mat] + ): Flow[FIn, Optional[FViaOut], Mat] = + scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava + /** Create a `Flow` which can process elements of type `T`. */ def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index cb76749dbd9..2e058f8d11c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance @@ -25,6 +26,7 @@ import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ object FlowWithContext { @@ -39,6 +41,38 @@ object FlowWithContext { under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(under) + /** + * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of flow and viaFlow + * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Optional which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat]( + flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat], + combine: function.Function2[FMat, FViaMat, Mat] + ): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] = + scaladsl.FlowWithContext.unsafeOptionalDataVia(flow.map(_.toScala).asScala, viaFlow.asScala)( + combinerToScala(combine)).map( + _.toJava).asJava + } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index bf384639d4f..9affc6af150 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -137,6 +137,32 @@ object Source { def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.cycle(() => f.create().asScala)) + /** + * Creates a Source from an existing base Source outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of source and viaFlow + * @return a Source with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Optional which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat], + viaFlow: Flow[SOut, FOut, FMat], + combine: function.Function2[SMat, FMat, Mat] + ): Source[Optional[FOut], Mat] = + scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava + /** * Helper to create [[Source]] from `Iterable`. * Example usage: diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index 39e2c4e4d96..f41b7babfad 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance @@ -28,6 +29,7 @@ import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ object SourceWithContext { @@ -38,6 +40,38 @@ object SourceWithContext { def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = { new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala))) } + + /** + * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of source and viaFlow + * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Optional which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat], + viaFlow: Flow[SOut, FOut, FMat], + combine: function.Function2[SMat, FMat, Mat] + ): SourceWithContext[Optional[FOut], Ctx, Mat] = + scaladsl.SourceWithContext.unsafeOptionalDataVia(source.map(_.toScala).asScala, viaFlow.asScala)( + combinerToScala(combine)).map( + _.toJava).asJava + } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 07ba28c9bfe..3b3876dbc80 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -428,6 +428,55 @@ object Flow { */ def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f) + /** + * Creates a FlowW from an existing base Flow outputting an optional element and + * applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of flow and viaFlow + * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat])( + combine: (FMat, FViaMat) => Mat + ): Flow[FIn, Option[FViaOut], Mat] = + Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Option[FOut]](2)) + val merge = b.add(Merge[Option[FViaOut]](2)) + + val filterAvailable = Flow[Option[FOut]].collect { + case Some(f) => f + } + + val filterUnavailable = Flow[Option[FOut]].collect { + case None => Option.empty[FViaOut] + } + + val mapIntoOption = Flow[FViaOut].map { + f => Some(f) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + FlowShape(s.in, merge.out) + }) + /** * A graph with the shape of a flow logically is a flow, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index b2e9bced042..94141c95b94 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko +import pekko.annotation.ApiMayChange import pekko.NotUsed import pekko.japi.Pair import pekko.stream._ @@ -36,6 +37,69 @@ object FlowWithContext { def fromTuples[In, CtxIn, Out, CtxOut, Mat]( flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(flow) + + /** + * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of flow and viaFlow + * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat]( + flow: FlowWithContext[FIn, Ctx, Option[FOut], Ctx, FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat])( + combine: (FMat, FViaMat) => Mat + ): FlowWithContext[FIn, Ctx, Option[FViaOut], Ctx, Mat] = + FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { + implicit b => (f, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2)) + val merge = b.add(Merge[(Option[FViaOut], Ctx)](2)) + + val unzip = b.add(Unzip[FOut, Ctx]()) + val zipper = b.add(Zip[FViaOut, Ctx]()) + + val filterAvailable = Flow[(Option[FOut], Ctx)].collect { + case (Some(f), ctx) => (f, ctx) + } + + val filterUnavailable = Flow[(Option[FOut], Ctx)].collect { + case (None, ctx) => (Option.empty[FViaOut], ctx) + } + + val mapIntoOption = Flow[(FViaOut, Ctx)].map { + case (f, ctx) => (Some(f), ctx) + } + + f ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> unzip.in + + unzip.out0 ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + + zipper.out ~> mapIntoOption ~> merge.in(0) + + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + FlowShape(f.in, merge.out) + })) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index a39a15df59b..43ce87e3a20 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -308,6 +308,55 @@ object Source { fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource) } + /** + * Creates a Source from an existing base Source outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of source and viaFlow + * @return a Source with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Option[SOut], SMat], + viaFlow: Flow[SOut, FOut, FMat])( + combine: (SMat, FMat) => Mat + ): Source[Option[FOut], Mat] = + Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) { implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Option[SOut]](2)) + val merge = b.add(Merge[Option[FOut]](2)) + + val filterAvailable = Flow[Option[SOut]].collect { + case Some(f) => f + } + + val filterUnavailable = Flow[Option[SOut]].collect { + case None => Option.empty[FOut] + } + + val mapIntoOption = Flow[FOut].map { + f => Some(f) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + SourceShape(merge.out) + }) + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 55a6ce316c8..59e7af530dc 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko +import pekko.annotation.ApiMayChange import pekko.stream._ object SourceWithContext { @@ -25,6 +26,68 @@ object SourceWithContext { */ def fromTuples[Out, CtxOut, Mat](source: Source[(Out, CtxOut), Mat]): SourceWithContext[Out, CtxOut, Mat] = new SourceWithContext(source) + + /** + * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of source and viaFlow + * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Option[SOut], Ctx, SMat], + viaFlow: Flow[SOut, FOut, FMat])( + combine: (SMat, FMat) => Mat + ): SourceWithContext[Option[FOut], Ctx, Mat] = + SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) { + implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2)) + val merge = b.add(Merge[(Option[FOut], Ctx)](2)) + + val unzip = b.add(Unzip[SOut, Ctx]()) + val zipper = b.add(Zip[FOut, Ctx]()) + + val filterAvailable = Flow[(Option[SOut], Ctx)].collect { + case (Some(f), ctx) => (f, ctx) + } + + val filterUnavailable = Flow[(Option[SOut], Ctx)].collect { + case (None, ctx) => (Option.empty[FOut], ctx) + } + + val mapIntoOption = Flow[(FOut, Ctx)].map { + case (f, ctx) => (Some(f), ctx) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> unzip.in + + unzip.out0 ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + + zipper.out ~> mapIntoOption ~> merge.in(0) + + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + SourceShape(merge.out) + })) } /**