Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optionalVia and unsafeOptionalDataVia #1422

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions.
* add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989))
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))

The Stream Testkit Java DSL has some extra functions.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# optionalVia

For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Source.optionalVia](Source$) { scala="#optionalVia%5BSOut,FOut,SMat,FMat,Mat](source:org.apache.pekko.stream.scaladsl.Source%5BOption%5BSOut],SMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BSOut,FOut,FMat])(combine:(SMat,FMat)=%3EMat):org.apache.pekko.stream.scaladsl.Source%5BOption%5BFOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Source,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" }
@apidoc[Flow.optionalVia](Flow$) { scala="#optionalVia%5BFIn,FOut,FViaOut,FMat,FViaMat,Mat](flow:org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFOut],FMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BFOut,FViaOut,FViaMat])(combine:(FMat,FViaMat)=%3EMat):org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFViaOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" }

## Description

For a stream containing optional elements, transforms each element by applying
the given `viaFlow` and passing the value downstream as an optional value.

Scala
: @@snip [OptionalVia.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala) { #optionalVia }

Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #optionalVia }

## Reactive Streams semantics

@@@div { .callout }

**emits** while the provided viaFlow is runs with defined elements

**backpressures** when the viaFlow runs for the defined elements and downstream backpressures

**completes** when the upstream completes

@@@

2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|<a name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
|Source/Flow|<a name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.|
|Source/Flow|<a name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
Expand Down Expand Up @@ -558,6 +559,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [optionalVia](Source-or-Flow/optionalVia.md)
* [orElse](Source-or-Flow/orElse.md)
* [Partition](Partition.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md)
Expand Down
17 changes: 17 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,23 @@ void foldAsyncExample() {
// #foldAsync
}

void optionalViaExample() {

// #optionalVia
Flow<String, Integer, NotUsed> flow = Flow.fromFunction(Integer::parseInt);

Source<Optional<String>, NotUsed> source =
Source.from(
Arrays.asList(Optional.of("1"), Optional.empty(), Optional.empty(), Optional.of("4")));

Source.optionalVia(source, flow, Keep.none()).runForeach(System.out::println, system);
// Optional[1]
// Optional.empty
// Optional.empty
// Optional[4]
// #optionalVia
}

void takeExample() {
// #take
Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sourceorflow

object OptionalVia {
def optionalViaExample(): Unit = {
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Source }

implicit val system: ActorSystem = ActorSystem()

// #optionalVia
Source.optionalVia(
Source(List(Some("1"), None, None, Some("4"))),
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none).runForeach(println)
// Some(1)
// None
// None
// Some(4)
// #optionalVia
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

val flow = Flow[Option[String]]

Source(data).via(
Flow.optionalVia(
flow,
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
).runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val flow = Flow[(Option[String], Int)]
.asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
.map(_._1)

SourceWithContext
.fromTuples(Source(data)).via(
FlowWithContext.unsafeOptionalDataVia(
flow,
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
import Attributes._
val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("")
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

Source.optionalVia(
Source(data),
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
.runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

"A Source.run" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val source = SourceWithContext.fromTuples(Source(data))

SourceWithContext.unsafeOptionalDataVia(
source,
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
26 changes: 26 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ object Flow {
def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] =
Flow.create[I]().map(f)

/**
* Creates a Flow from an existing base Flow outputting an optional element and
* applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of flow and viaFlow
* @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
mdedetrich marked this conversation as resolved.
Show resolved Hide resolved
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combine: function.Function2[FMat, FViaMat, Mat]
): Flow[FIn, Optional[FViaOut], Mat] =
scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava

/** Create a `Flow` which can process elements of type `T`. */
def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -25,6 +26,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

object FlowWithContext {
Expand All @@ -39,6 +41,38 @@ object FlowWithContext {
under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(under)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combine How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
@ApiMayChange
def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combine: function.Function2[FMat, FViaMat, Mat]
): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] =
scaladsl.FlowWithContext.unsafeOptionalDataVia(flow.map(_.toScala).asScala, viaFlow.asScala)(
combinerToScala(combine)).map(
_.toJava).asJava

}

/**
Expand Down
26 changes: 26 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,32 @@ object Source {
def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.cycle(() => f.create().asScala))

/**
* Creates a Source from an existing base Source outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of source and viaFlow
* @return a Source with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original source's element had viaFlow
* applied.
* @since 1.1.0
*/
def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat],
viaFlow: Flow[SOut, FOut, FMat],
combine: function.Function2[SMat, FMat, Mat]
): Source[Optional[FOut], Mat] =
scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava

/**
* Helper to create [[Source]] from `Iterable`.
* Example usage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -28,6 +29,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

object SourceWithContext {
Expand All @@ -38,6 +40,38 @@ object SourceWithContext {
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}

/**
* Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combine How to combine the materialized values of source and viaFlow
* @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original source's element had viaFlow
* applied.
* @since 1.1.0
*/
@ApiMayChange
def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat],
viaFlow: Flow[SOut, FOut, FMat],
combine: function.Function2[SMat, FMat, Mat]
): SourceWithContext[Optional[FOut], Ctx, Mat] =
scaladsl.SourceWithContext.unsafeOptionalDataVia(source.map(_.toScala).asScala, viaFlow.asScala)(
combinerToScala(combine)).map(
_.toJava).asJava

}

/**
Expand Down
Loading
Loading