-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable async processing for SDF on Spark runner #23852 #24837
Enable async processing for SDF on Spark runner #23852 #24837
Conversation
R: @je-ik |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Thanks @JozoVilcek, that's awesome. I won't get to this today, but I'll have a look early next week! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the approach is correct overall, there are a few questions and mostly, we need to test this against complete @ValidatesRunner suite. Plus it would be good to test the iterators independently using unit tests.
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java
Outdated
Show resolved
Hide resolved
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for your contribution @JozoVilcek!
I've added a few more comments. Note, I've also pointed out a few things that are not issues of this PR, but existed before. Feel free to ignore those if you prefer.
I'm still scratching my head a bit about naming of the different components (lol, as always 🙈). But to be fair I also can't propose anything that feels more intuitive ...
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
@@ -180,14 +186,14 @@ public TimerInternals timerInternals() { | |||
DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics = | |||
new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum); | |||
|
|||
return new SparkProcessContext<>( | |||
SparkProcessContext<Object, InputT, OutputT> ctx = | |||
new SparkProcessContext<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just throwing it out here as option to consider ... considering there's already a lot of moving pieces involved and the context has just become a container without functionality, everything in there except for the runner could already be passed to the processor when initializing it (some for the input iterator as well). The only thing that then has to be passed to process
is the runner itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually can not be put into constructor, because processor is responsible for providing correct instance of output manager which is needed doFnRunner construction. Therefore I did choose to wrap these into container and pass to processor as context object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because processor is responsible for providing correct instance of output manager which is needed doFnRunner construction
That's more or less what I meant, you can but everything into the constructor of the processor except the runner itself. E.g. process could look like this then:
processor.process(iter, doFnRunnerWithMetrics)
or even as below if you pass the input iterator into the constructor as well.
processor.process(doFnRunnerWithMetrics)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JozoVilcek Please feel free to discard or ignore! This code here is quickly hacked together and just meant to demonstrate what I had in mind above. SparkInputDataProcessor
was replaced by SparkOutputManager
, no SparkProcessContext
and output iterator needed anymore.
Thanks so much @JozoVilcek. I'm off tomorrow, but I'll have a look on Monday. |
Run Java PreCommit |
Run Spark ValidatesRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JozoVilcek Thanks so much for addressing all the comments, even the existing tech debt i pointed out 🎉 A couple of small things left, please have a look.
@je-ik Anything else left from your side?
@@ -80,6 +77,7 @@ | |||
private final boolean stateful; | |||
private final DoFnSchemaInformation doFnSchemaInformation; | |||
private final Map<String, PCollectionView<?>> sideInputMapping; | |||
private final boolean useBoundedOutput; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, maybe even useBoundedConcurrentOutput
or useBoundedParallelOutput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Show resolved
Hide resolved
|
||
if (outputProducerTask == null) { | ||
outputProducerTask = | ||
executorService.submit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this initialization code into a private helper startOutputProducerTask()
or similar for better readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
@JozoVilcek also note, there's one conflict with the master branch |
...s/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessor.java
Outdated
Show resolved
Hide resolved
Hi @mosche , just a quick feedback, I appreciate all the comments and suggestion. Ido plan to address them all by beginning of next week. Apologies for delay due to some other priorities on my side. |
Thanks, no worries @JozoVilcek :) |
…kCommonPipelineOptions.java Co-authored-by: Jan Lukavský <je.ik@seznam.cz>
…slation/SparkInputDataProcessor.java Co-authored-by: Jan Lukavský <je.ik@seznam.cz>
e6523ea
to
3a52456
Compare
Run Java PreCommit |
Run Portable_Python PreCommit |
Run Spark ValidatesRunner |
Run Spark ValidatesRunner |
Run Portable_Python PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks a lot @JozoVilcek 🎉
Just two final optional suggestions:
- How about naming the experiment as done in MultiDoFnFunction, so
use_bounded_concurrent_output_for_sdf
instead ofuse_bounded_output_for_sdf
? - And how about mentioning the experiment as improvement in CHANGES.md with instructions how to enable?
But I'm also happy to merge as is, let me know.
Run Spark ValidatesRunner |
Thanks so much @JozoVilcek, merged 🎉 |
Enables SparkRunner to process SDF functions which can generate large output ( such as ParquetIO ) without the need to fit outuup to memory.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.