Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable async processing for SDF on Spark runner #23852 #24837

Merged
merged 13 commits into from
Feb 2, 2023

Conversation

JozoVilcek
Copy link
Contributor

Enables SparkRunner to process SDF functions which can generate large output ( such as ParquetIO ) without the need to fit outuup to memory.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@JozoVilcek
Copy link
Contributor Author

R: @je-ik
R: @aromanenko-dev
R: @mosche

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@mosche
Copy link
Member

mosche commented Dec 30, 2022

Thanks @JozoVilcek, that's awesome. I won't get to this today, but I'll have a look early next week!

Copy link
Contributor

@je-ik je-ik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the approach is correct overall, there are a few questions and mostly, we need to test this against complete @ValidatesRunner suite. Plus it would be good to test the iterators independently using unit tests.

Copy link
Member

@mosche mosche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for your contribution @JozoVilcek!
I've added a few more comments. Note, I've also pointed out a few things that are not issues of this PR, but existed before. Feel free to ignore those if you prefer.

I'm still scratching my head a bit about naming of the different components (lol, as always 🙈). But to be fair I also can't propose anything that feels more intuitive ...

@@ -180,14 +186,14 @@ public TimerInternals timerInternals() {
DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);

return new SparkProcessContext<>(
SparkProcessContext<Object, InputT, OutputT> ctx =
new SparkProcessContext<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just throwing it out here as option to consider ... considering there's already a lot of moving pieces involved and the context has just become a container without functionality, everything in there except for the runner could already be passed to the processor when initializing it (some for the input iterator as well). The only thing that then has to be passed to process is the runner itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually can not be put into constructor, because processor is responsible for providing correct instance of output manager which is needed doFnRunner construction. Therefore I did choose to wrap these into container and pass to processor as context object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because processor is responsible for providing correct instance of output manager which is needed doFnRunner construction

That's more or less what I meant, you can but everything into the constructor of the processor except the runner itself. E.g. process could look like this then:

processor.process(iter, doFnRunnerWithMetrics)

or even as below if you pass the input iterator into the constructor as well.

processor.process(doFnRunnerWithMetrics)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JozoVilcek Please feel free to discard or ignore! This code here is quickly hacked together and just meant to demonstrate what I had in mind above. SparkInputDataProcessor was replaced by SparkOutputManager, no SparkProcessContext and output iterator needed anymore.

@JozoVilcek
Copy link
Contributor Author

@je-ik , @mosche I hope I did address all suggested changes except unit tests for iterators ( in my todo list for later ). Validates runner did pass for me locally.

@mosche
Copy link
Member

mosche commented Jan 5, 2023

Thanks so much @JozoVilcek. I'm off tomorrow, but I'll have a look on Monday.

@mosche
Copy link
Member

mosche commented Jan 5, 2023

Run Java PreCommit

@mosche
Copy link
Member

mosche commented Jan 5, 2023

Run Spark ValidatesRunner

Copy link
Member

@mosche mosche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JozoVilcek Thanks so much for addressing all the comments, even the existing tech debt i pointed out 🎉 A couple of small things left, please have a look.
@je-ik Anything else left from your side?

@@ -80,6 +77,7 @@
private final boolean stateful;
private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<String, PCollectionView<?>> sideInputMapping;
private final boolean useBoundedOutput;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe even useBoundedConcurrentOutput or useBoundedParallelOutput?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed


if (outputProducerTask == null) {
outputProducerTask =
executorService.submit(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this initialization code into a private helper startOutputProducerTask() or similar for better readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@mosche
Copy link
Member

mosche commented Jan 10, 2023

@JozoVilcek also note, there's one conflict with the master branch

@JozoVilcek
Copy link
Contributor Author

Hi @mosche , just a quick feedback, I appreciate all the comments and suggestion. Ido plan to address them all by beginning of next week. Apologies for delay due to some other priorities on my side.

@mosche
Copy link
Member

mosche commented Jan 18, 2023

Thanks, no worries @JozoVilcek :)

@JozoVilcek JozoVilcek force-pushed the spark-async-support-for-multido-fn branch from e6523ea to 3a52456 Compare January 31, 2023 11:35
@JozoVilcek
Copy link
Contributor Author

Run Java PreCommit

@JozoVilcek
Copy link
Contributor Author

Run Portable_Python PreCommit

@JozoVilcek
Copy link
Contributor Author

Run Spark ValidatesRunner

@JozoVilcek
Copy link
Contributor Author

Run Spark ValidatesRunner

@JozoVilcek JozoVilcek requested review from mosche and je-ik and removed request for mosche and je-ik January 31, 2023 15:40
@mosche
Copy link
Member

mosche commented Feb 1, 2023

Run Portable_Python PreCommit

Copy link
Member

@mosche mosche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks a lot @JozoVilcek 🎉

Just two final optional suggestions:

  • How about naming the experiment as done in MultiDoFnFunction, so
    use_bounded_concurrent_output_for_sdf instead of use_bounded_output_for_sdf?
  • And how about mentioning the experiment as improvement in CHANGES.md with instructions how to enable?

But I'm also happy to merge as is, let me know.

@mosche
Copy link
Member

mosche commented Feb 2, 2023

Run Spark ValidatesRunner

@mosche mosche merged commit 01aa470 into apache:master Feb 2, 2023
@mosche
Copy link
Member

mosche commented Feb 2, 2023

Thanks so much @JozoVilcek, merged 🎉

@JozoVilcek JozoVilcek deleted the spark-async-support-for-multido-fn branch February 2, 2023 11:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants