Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Incremental result fetching for Flink engine #4806

Closed
3 of 4 tasks
link3280 opened this issue May 8, 2023 · 11 comments
Closed
3 of 4 tasks

[FEATURE] Incremental result fetching for Flink engine #4806

link3280 opened this issue May 8, 2023 · 11 comments
Labels

Comments

@link3280
Copy link
Contributor

link3280 commented May 8, 2023

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the feature

Add incremental result fetching for Flink engine.

Motivation

Currently, Flink query results are buffered in 3 layers:

  1. Flink result fetcher
  2. Flink engine
  3. Kyuubi server

That leads to unnecessary memory footprints and adds more latency. Moreover, the data consistency could be affected since every layer is of different size limits.

Describe the solution

No response

Additional context

No response

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to improve.
  • No. I cannot submit a PR at this time.
@link3280
Copy link
Contributor Author

link3280 commented Jun 24, 2023

Currently, the beeline incremental fetching works on the client side. The Kyuubi server pulls all result rows from the engine and returns them to the client in a micro-batch manner.

However, this could be problematic for Flink engine or other streaming scenarios, where the operation is still running and producing more records after the Kyuubi operation is executed asynchronizedly and considered result-available.

Consider a select-from-kafka use case, if we simply forward to fetch result requests to the Flink engine, there would be 3 situations:

  • the Kafka topic is temporarily empty, we get an empty rowset
  • the Kafka topic has 10 records, so we get a rowset with 10 rows (less than kyuubi.session.engine.flink.max.rows)
  • the Kafka topic has lots of records, so we get a rowset with ${kyuubi.session.engine.flink.max.rows} rows

In the first two situations, Flink engine should but can't tell the Kyuubi server to fetch again later, thus the current workaround is that Flink engine polls result set until the rows reached ${kyuubi.session.engine.flink.max.rows}.

We could fix this by adding a hasMoreResults field to the result fetching response (i.e. TRowSet), but this would cause a large footprint, as it touches the thrift protocol. So I'm thinking of injecting TRowSet with metadata columns prefixed with __KYUUBI_ to allow the engines to tell the server there're more rows to fetch. The server needs to convert these metadata columns into TFetchResultsResp fields and drop them before returning the TRowSet to the clients.

val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog)
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp.setStatus(OK_STATUS)

WDYT? @pan3793 @bowenliang123

@pan3793
Copy link
Member

pan3793 commented Jun 24, 2023

The Spark engine already supported engine-side incremental collection, I think there is no blocker on the infrastructure

@link3280
Copy link
Contributor Author

link3280 commented Jun 24, 2023

The difference is that in Spark scenarios the result is always ready and complete, so the client would know the result rows are fully fetched if one fetch is empty. That's to say, TFetchResultsResp.hasMoreRows is not required (and it's not used actually, see the codes below).

if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize);
TFetchResultsResp fetchResp;
fetchResp = client.FetchResults(fetchReq);
Utils.verifySuccessWithInfo(fetchResp.getStatus());
TRowSet results = fetchResp.getResults();
fetchedRows = RowSetFactory.create(results, protocol);
fetchedRowsItr = fetchedRows.iterator();
}
if (fetchedRowsItr.hasNext()) {
row = fetchedRowsItr.next();
} else {
return false;
}

However, in Flink scenarios, the fetched result could be temporarily empty because the result data is not ready. That means we need a clear sign of EOS, which could be TFetchResultsResp.hasMoreRows.

@bowenliang123
Copy link
Contributor

Any progress in this feature is welcomed. But hasMoreRows may be heavy for it. For the first two situations, the server could just wait for the timeout (with a new config added for this) and then return the resultset with the statement closed, without footprints in the whole fetching process.

@link3280
Copy link
Contributor Author

link3280 commented Jun 26, 2023

@bowenliang123 Thanks for your input! IMHO, introducing a timeout seems to be a workaround for this problem but doesn't solve the problem at the root cause.

For flink jobs with heavy states or high parallelism, it might take minutes for job initialization, it's likely that we fetch no rows before the timeout. Users could adjust the timeout of course, but it's hard to determine an appropriate timeout threshold for a single job without a few tries.

I think hasMoreRows is not as heavy as we thought (BTW, hasMoreRows is an existing field that's not put into use), as it's false by default and should be compatible with batch/olap engines. WDYT?

@bowenliang123
Copy link
Contributor

SGTM. In my priority, 1. timed-out config for testing / adhoc one-time query situation (fetching stable results within user-confident timeout), 2. hasMoreRows for incremental result fetching. They can be implemented to satisfy either or both scenarios, while the first one could be easier. If the second one is a better solution for you, I'm good with it.

link3280 added a commit to link3280/kyuubi that referenced this issue Jul 15, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Jul 15, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Jul 21, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Jul 22, 2023
pan3793 added a commit that referenced this issue Jul 22, 2023
…ing bootstrap

### _Why are the changes needed?_
As titled.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5082 from link3280/KYUUBI-5080.

Closes #5080

e8026b8 [Paul Lin] [KYUUBI #4806][FLINK] Improve logs
fd78f32 [Paul Lin] [KYUUBI #4806][FLINK] Fix gateway NPE
a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
50830d4 [Paul Lin] [KYUUBI #5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap

Lead-authored-by: Paul Lin <paullin3280@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
link3280 added a commit to link3280/kyuubi that referenced this issue Jul 23, 2023
…fe during bootstrap

### _Why are the changes needed?_
As titled.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes apache#5082 from link3280/KYUUBI-5080.

Closes apache#5080

e8026b8 [Paul Lin] [KYUUBI apache#4806][FLINK] Improve logs
fd78f32 [Paul Lin] [KYUUBI apache#4806][FLINK] Fix gateway NPE
a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
50830d4 [Paul Lin] [KYUUBI apache#5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap

Lead-authored-by: Paul Lin <paullin3280@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
link3280 added a commit to link3280/kyuubi that referenced this issue Jul 23, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Jul 23, 2023
@link3280
Copy link
Contributor Author

@bowenliang123 Now I'm in favor of the simple timeout solution you proposed. The more I coded the more I realized the divergence between the current result fetching and the streaming incremental one.

The former assumes the results are ready, so when result hasNext()==true, users expect that there's always a row returned by next(). However, there's no guarantee of a new row for streaming incremental fetch, so hasNext()==true means there COULD BE a new row within a fetch timeout. That makes confusion for the semantics of result fetching and may surprise users.

I'm closing this issue in flavor of #5088.

@bowenliang123
Copy link
Contributor

Yes, thanks for considering my suggestion. hasNext does hint at an infinite result set with an unstable size, in certain scenarios with a common unbound stream. Let's see whether the timed-out approach suite for most cases.

zhaohehuhu pushed a commit to zhaohehuhu/incubator-kyuubi that referenced this issue Jul 24, 2023
…fe during bootstrap

### _Why are the changes needed?_
As titled.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes apache#5082 from link3280/KYUUBI-5080.

Closes apache#5080

e8026b8 [Paul Lin] [KYUUBI apache#4806][FLINK] Improve logs
fd78f32 [Paul Lin] [KYUUBI apache#4806][FLINK] Fix gateway NPE
a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
50830d4 [Paul Lin] [KYUUBI apache#5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap

Lead-authored-by: Paul Lin <paullin3280@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 6, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 6, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 6, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 6, 2023
@link3280 link3280 reopened this Aug 7, 2023
@link3280
Copy link
Contributor Author

link3280 commented Aug 7, 2023

Reopen the issue because after a discussion with @pan3793 , we think the semantic problem is acceptable:

  • For Beeline users, beeline supports incremental mode which pulls rows in a micro-batch manner. If no row is produced, Beeline will block on result.next() until there are some rows or the timeout is reached (no timeout by default).
  • For JDBC users, calling result.next() now could be long-blocking, and users could either set a fetch timeout or interrupt the thread externally.

However, we still need to think about what to do if the timeout happens. There should be 3 approaches:

  1. throw a timeout exception with result.next()
  2. return false for result.next() which indicates there are no more rows (which is technically not true).
  3. return true for result.next() but the current row is set to null which indicates there are no rows yet (which is against normal use cases and may lead to surprises).

Personally, I lean toward approach 1.

@pan3793
Copy link
Member

pan3793 commented Aug 7, 2023

@link3280 thanks for the summary of the offline discussion, yes, I agree with you, throwing a dedicated exception is a better approach.

@bowenliang123
Copy link
Contributor

+1 for approach 1.

link3280 added a commit to link3280/kyuubi that referenced this issue Aug 11, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 11, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 11, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 11, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 14, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 14, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 14, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 14, 2023
link3280 added a commit to link3280/kyuubi that referenced this issue Aug 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants