Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optionalVia and unsafeOptionalDataVia #1422

Merged

Conversation

mdedetrich
Copy link
Contributor

This PR adds optionalVia/unsafeOptionalVia functions which allows you to apply a viaFlow to elements in the stream only if they are defined. This kind of utility function has cropped up due to needing to fulfill a pretty basic/common scenario, i.e. in my case I have a Source of elements and a Flow that performs a http request but I only want to apply the http request a subset of those elements.

Tests have been added to the PR but I haven't added documentation, @raboof @jrudolph @He-Pin maybe you can let me know where to put it.

@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch 3 times, most recently from 2c1d6a1 to 74d4c3b Compare August 2, 2024 07:22
@raboof
Copy link
Member

raboof commented Aug 2, 2024

This kind of utility function has cropped up due to needing to fulfill a pretty basic/common scenario, i.e. in my case I have a Source of elements and a Flow that performs a http request but I only want to apply the http request a subset of those elements.

Just for my understanding: why would you want to keep the None elements at all, rather than doing something like optionals.flatMapConcat(o => Source.fromIterable(o))? Is this because the None elements may have relevant contexts attached to them that you don't want to lose?

I haven't added documentation, @raboof @jrudolph @He-Pin maybe you can let me know where to put it.

I think the CI "doc check" should tell you what to do, let's see if that's indeed the case :)

@raboof
Copy link
Member

raboof commented Aug 2, 2024

I haven't added documentation, @raboof @jrudolph @He-Pin maybe you can let me know where to put it.

I think the CI "doc check" should tell you what to do, let's see if that's indeed the case :)

[error] java.lang.RuntimeException: Unable to extract details from /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md
(...)
[error] Caused by: java.io.FileNotFoundException: /home/runner/work/pekko/pekko/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md (No such file or directory)

OK so the error is slightly obscure, but that's where to put it. We might want to show a more friendly message in this case.

@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch 3 times, most recently from b383f34 to cc2775b Compare August 2, 2024 07:36
@mdedetrich
Copy link
Contributor Author

Just for my understanding: why would you want to keep the None elements at all, rather than doing something like optionals.flatMapConcat(o => Source.fromIterable(o))? Is this because the None elements may have relevant contexts attached to them that you don't want to lose?

Yes, this is especially true of SourceWithContext/FlowWithContext i.e. in my specific usecase the context is a Kafka cursor which we definitely do not want to lose since we have to submit that cursor (even if we don't use the viaFlow).

OK so the error is slightly obscure, but that's where to put it. We might want to show a more friendly message in this case.

Thanks, working on this now!

@pjfanning pjfanning modified the milestones: 1.0.x, 1.1.0-M2 Aug 2, 2024
@raboof
Copy link
Member

raboof commented Aug 2, 2024

Just for my understanding: why would you want to keep the None elements at all, rather than doing something like optionals.flatMapConcat(o => Source.fromIterable(o))? Is this because the None elements may have relevant contexts attached to them that you don't want to lose?

Yes, this is especially true of SourceWithContext/FlowWithContext i.e. in my specific usecase the context is a Kafka cursor which we definitely do not want to lose since we have to submit that cursor (even if we don't use the viaFlow).

Right, that is indeed a common use case. To nitpick: losing the cursor should never be a functional problem, but always be an optimization, right? Still, a worthwhile one.

We might want to take some time to consider whether adding a unsafeOptionalVia is the right way to deal with this scenario. I could see two alternative designs:

  • You could mapAsync and do the evaluation whether you want to call the HTTP service and the actual call within that function
  • We could have Seq[Offset] as context instead of just Offset, and introduce a .filter that adds the offsets of 'filtered' elements to the context of the next unfiltered element. The downside of this is that committing the offset of a filtered element may be delayed (until the next non-filtered element is processed), and that it only works well for infinite streams (a stream that completes would lose the offsets of filtered elements at the end of the stream). Both of these limitations might actually be fine for the typical Kafka use case.

The advantage of those approaches is that you avoid the unsafe aspect, where a Flow that might drop, duplicate or reorder elements would lead to subtle bugs. The mapAsync approach might also simplify your code but will not fit in all cases.

I don't have a strong opinion and am not a priori opposed to adding unsafeOptionalVia, but wanted to present these for consideration.

I wonder if the regular optionalVia has use cases, but I guess it might make sense to have it for consistency.

@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch from cc2775b to e4e5143 Compare August 2, 2024 08:42
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Aug 2, 2024

Right, that is indeed a common use case. To nitpick: losing the cursor should never be a functional problem, but always be an optimization, right? Still, a worthwhile one.

This isn't true in the case of Kafka if you perform side effects, if you don't submit a cursor for an element then it will continuously retry that element (while it is true that you can expect this with Kafka due to its architecture since exactly once isn't a thing, we don't want to unnecessarily perform side effects more than we have to)

You could mapAsync and do the evaluation whether you want to call the HTTP service and the actual call within that function

We are using an actual Flow i.e. Http.get(system).superPool which basically creates a Flow<HttpRequest, HttpResponse> and would like to keep it that way (i.e. we have a pure stream that is backpressured all the way through)

We could have Seq[Offset] as context instead of just Offset, and introduce a .filter that adds the offsets of 'filtered' elements to the context of the next unfiltered element. The downside of this is that committing the offset of a filtered element may be delayed (until the next non-filtered element is processed), and that it only works well for infinite streams (a stream that completes would lose the offsets of filtered elements at the end of the stream). Both of these limitations might actually be fine for the typical Kafka use case.

This could work but not only is it more complicated, it can unnecessarily delay the side effects that my usecase relies on because as you note it creates gaps and if those gaps are long enough it can cause problems. Futhermore if the gap is longer than the commit time interval it creates the problem described before where the kafka topics will get resent (because cursor offset is not submitted) therefore creating multiple side effects.

I wonder if the regular optionalVia has use cases, but I guess it might make sense to have it for consistency.

I agree with this sentiment, but as you noted I added it in for consistency

@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch 3 times, most recently from a260bbe to a78643f Compare August 2, 2024 08:51
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Aug 2, 2024

I have just repushed with the following changes

  • Added since("1.1.0") to all of the introduced methods
  • Removed @ApiMayChange from ONLY the optionalVia methods since this API is likely not to ever change. The unsafe versions (i.e. unsafeOptionalVia) still have @ApiMayChange for the same reasons unsafeDataVia has @ApiMayChange)
  • Renamed unsafeOptionalVia to unsafeOptionalDataVia for consistency
  • Added docs to optionalVia.md (still need to add signature methods, have to figure this out)
  • Correctly using combinerToScala/Function2 for javadsl

@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch from a78643f to 1da14f7 Compare August 2, 2024 08:54
@raboof
Copy link
Member

raboof commented Aug 2, 2024

Right, that is indeed a common use case. To nitpick: losing the cursor should never be a functional problem, but always be an optimization, right? Still, a worthwhile one.

This isn't true in the case of Kafka if you perform side effects, if you don't submit a cursor for an element then it will continuously retry that element.

Sure, but that would strictly speaking still be an optimization, not a correctness issue, right? It is always possible that the side effect happens but the offset commit doesn't, so your application needs to be robust against that. I don't mean to diminish the fact that this optimization is useful, though ;) .

You could mapAsync and do the evaluation whether you want to call the HTTP service and the actual call within that function

We are using an actual Flow i.e. Http.get(system).superPool which basically creates a Flow<HttpRequest, HttpResponse> and would like to keep it that way (i.e. we have a pure stream that is backpressured all the way through)

I'm not entirely sure superPool gives guarantees that a mapAsync with a sensible parallelism doesn't, but possibly.

We could have Seq[Offset] as context instead of just Offset, and introduce a .filter that adds the offsets of 'filtered' elements to the context of the next unfiltered element. The downside of this is that committing the offset of a filtered element may be delayed (until the next non-filtered element is processed), and that it only works well for infinite streams (a stream that completes would lose the offsets of filtered elements at the end of the stream). Both of these limitations might actually be fine for the typical Kafka use case.

This could work but not only is it more complicated, it can unnecessarily delay the side effects that my usecase relies on because as you note it creates gaps and if those gaps are long enough it can cause problems. Futhermore if the gap is longer than the commit time interval it creates the problem described before where the kafka topics will get resent (because cursor offset is not submitted) therefore creating multiple side effects.

I don't see how it would delay side effects, but indeed it would delay the offset commits. I didn't know that would cause resends during normal operation, that seems like a reasonable motivation to have this 'unsafe' pattern.

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Aug 2, 2024

So to clarify technically in terms of correctness there indeed isn't an issue, what I was meaning to say is that creating additional side effects (because we don't commit a cursor in the specific time slice causing Kafka to replay topics) is something I want to avoid, especially since the service we are making a request to is rate limited one (futhermore in my case, the subet of elements that will make requests is a very small subset further exacerbating the issue)

And yes you are right in that it won't delay side effects, was in a rush when writing the message and didn't think clearly

@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch 4 times, most recently from f1a3867 to 83d4171 Compare August 2, 2024 14:37
@mdedetrich mdedetrich changed the title Add optionalVia and unsafeOptionalVia Add optionalVia and unsafeOptionalDataVia Aug 2, 2024
@mdedetrich mdedetrich force-pushed the add-optionalvia-unsafeoptionalVia branch 2 times, most recently from 38b49b7 to cba69b6 Compare August 2, 2024 15:00
@mdedetrich
Copy link
Contributor Author

So the PR is now complete, I have double checked the generated docs and went through the PR a couple of times to make sure that everything looks okay.

* applied.
* @since 1.1.0
*/
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat],
Copy link
Member

@He-Pin He-Pin Aug 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we can just change it to something like predicate: Out =>Boolen for conditional transformation, instead of been forcing to lift to an Optional/Option, which can introduce additional allocation.

I have not checked other stream implementations for this, but I would like to avoid of the Option/Optional but use a predicate instead, and it's some kind of groupBy/`partition operator,

Copy link
Member

@jxnu-liguobin jxnu-liguobin Aug 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

BTW, do we need to confirm that the return type of Flow Out is Optional?

Copy link
Contributor Author

@mdedetrich mdedetrich Aug 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optional is necessary for the problem I am solving since

  1. I need to keep the elements that don't fulfill the predicate (i.e. I am using StreamWithContext/FlowWithContext and I don't want to lose the context). Adding this to Stream/Flow was largely for consistency reasons but it can still be useful for other scenarious.
  2. I need to know which elements were filtered and which weren't after the matter of fact, so even if this was changed to a predicate it wouldn't ultimately save on performance as I would manually have to add the functionality to achieve this.

I am not against adding an alternative version that avoids boxing due to using a predicate but I would suggest to do that in a different PR, and as @raboof pointed out earlier that may not even need a specific utility function depending on whether point 2 is important or not.

It goes without saying that I have no issue with performance improvements that keep the same api/behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optional is just a kind of marker field, you can have an object with a field additionalProcessEnabled:Boolean for that case.

I suggest we look at fs2/zio/reactor-core for the same case before we merge this.

Copy link
Contributor Author

@mdedetrich mdedetrich Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optional is just a kind of marker field, you can have an object with a field additionalProcessEnabled:Boolean for that case.

But in that case the returned types would then be different which means it would be a different API/overloaded function etc etc (which I don't have a problem with, we can just do it separately then?).

I suggest we look at fs2/zio/reactor-core for the same case before we merge this.

Sure, not that familiar with these API's but I can have a look at them a bit later

Copy link
Member

@jxnu-liguobin jxnu-liguobin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I don't like this kind of optional operation, considering that the return type must be optional, I can also accept it.

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Aug 5, 2024

@He-Pin @jxnu-liguobin If you aren't completely happy with the API, I can always add @ApiMayChange, its already on the unsafe variants but I am open to adding it to the Source/Flow variants as well.

As the annotation suggests, its specifically designed for functions where we aren't 100% sure if its the right design.

@He-Pin
Copy link
Member

He-Pin commented Aug 5, 2024

I will find a time to look into it and other implementation, sorry for the delay, a little busy at work

s ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the elements' order of the new flow, will it keep the origin order?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's some kind of the

Source.flatmapConcat { element =>
 if predicate(element) 
    Source.single(element).via(yourFlow) 
 else 
    Source.single(element) 
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the elements' order of the new flow, will it keep the origin order?

Yes, this is verified in tests

I think it's some kind of the

Source.flatmapConcat { element =>
 if predicate(element) 
    Source.single(element).via(yourFlow) 
 else 
    Source.single(element) 
}

This only works if the via leaves the output type unchanged. If the output type changes then the true branch will have a different element compared to the false branch.

Thats why the wrapping in an Option/Optional type is needed because in the case where you don't apply the via its just None/Optional.empty

Copy link
Contributor

@pjfanning pjfanning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@mdedetrich
Copy link
Contributor Author

@pjfanning Thanks for approval. @He-Pin if there are issues with this approach we always have option to change it afterwards before main release

@mdedetrich mdedetrich merged commit 60c480a into apache:main Aug 21, 2024
18 checks passed
@mdedetrich mdedetrich deleted the add-optionalvia-unsafeoptionalVia branch August 21, 2024 12:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants