-
Notifications
You must be signed in to change notification settings - Fork 930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Incremental result fetching for Flink engine #4806
Comments
Currently, the beeline incremental fetching works on the client side. The Kyuubi server pulls all result rows from the engine and returns them to the client in a micro-batch manner. However, this could be problematic for Flink engine or other streaming scenarios, where the operation is still running and producing more records after the Kyuubi operation is executed asynchronizedly and considered result-available. Consider a select-from-kafka use case, if we simply forward to fetch result requests to the Flink engine, there would be 3 situations:
In the first two situations, Flink engine should but can't tell the Kyuubi server to fetch again later, thus the current workaround is that Flink engine polls result set until the rows reached We could fix this by adding a kyuubi/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala Lines 530 to 533 in 056c5ef
WDYT? @pan3793 @bowenliang123 |
The Spark engine already supported engine-side incremental collection, I think there is no blocker on the infrastructure |
The difference is that in Spark scenarios the result is always ready and complete, so the client would know the result rows are fully fetched if one fetch is empty. That's to say, kyuubi/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiQueryResultSet.java Lines 333 to 348 in 056c5ef
However, in Flink scenarios, the fetched result could be temporarily empty because the result data is not ready. That means we need a clear sign of EOS, which could be |
Any progress in this feature is welcomed. But |
@bowenliang123 Thanks for your input! IMHO, introducing a timeout seems to be a workaround for this problem but doesn't solve the problem at the root cause. For flink jobs with heavy states or high parallelism, it might take minutes for job initialization, it's likely that we fetch no rows before the timeout. Users could adjust the timeout of course, but it's hard to determine an appropriate timeout threshold for a single job without a few tries. I think |
SGTM. In my priority, 1. timed-out config for testing / adhoc one-time query situation (fetching stable results within user-confident timeout), 2. |
…ing bootstrap ### _Why are the changes needed?_ As titled. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5082 from link3280/KYUUBI-5080. Closes #5080 e8026b8 [Paul Lin] [KYUUBI #4806][FLINK] Improve logs fd78f32 [Paul Lin] [KYUUBI #4806][FLINK] Fix gateway NPE a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java 50830d4 [Paul Lin] [KYUUBI #5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap Lead-authored-by: Paul Lin <paullin3280@gmail.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
…fe during bootstrap ### _Why are the changes needed?_ As titled. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes apache#5082 from link3280/KYUUBI-5080. Closes apache#5080 e8026b8 [Paul Lin] [KYUUBI apache#4806][FLINK] Improve logs fd78f32 [Paul Lin] [KYUUBI apache#4806][FLINK] Fix gateway NPE a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java 50830d4 [Paul Lin] [KYUUBI apache#5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap Lead-authored-by: Paul Lin <paullin3280@gmail.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
@bowenliang123 Now I'm in favor of the simple timeout solution you proposed. The more I coded the more I realized the divergence between the current result fetching and the streaming incremental one. The former assumes the results are ready, so when result I'm closing this issue in flavor of #5088. |
Yes, thanks for considering my suggestion. |
…fe during bootstrap ### _Why are the changes needed?_ As titled. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes apache#5082 from link3280/KYUUBI-5080. Closes apache#5080 e8026b8 [Paul Lin] [KYUUBI apache#4806][FLINK] Improve logs fd78f32 [Paul Lin] [KYUUBI apache#4806][FLINK] Fix gateway NPE a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java 50830d4 [Paul Lin] [KYUUBI apache#5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap Lead-authored-by: Paul Lin <paullin3280@gmail.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
Reopen the issue because after a discussion with @pan3793 , we think the semantic problem is acceptable:
However, we still need to think about what to do if the timeout happens. There should be 3 approaches:
Personally, I lean toward approach 1. |
@link3280 thanks for the summary of the offline discussion, yes, I agree with you, throwing a dedicated exception is a better approach. |
+1 for approach 1. |
Code of Conduct
Search before asking
Describe the feature
Add incremental result fetching for Flink engine.
Motivation
Currently, Flink query results are buffered in 3 layers:
That leads to unnecessary memory footprints and adds more latency. Moreover, the data consistency could be affected since every layer is of different size limits.
Describe the solution
No response
Additional context
No response
Are you willing to submit PR?
The text was updated successfully, but these errors were encountered: