Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlinkSQL Nebula as Sink #57

Merged
merged 8 commits into from
Jul 12, 2022

Conversation

spike-liu
Copy link
Contributor

@spike-liu spike-liu commented Jun 16, 2022

FlinkSQL: take Nebula as Sink.

Please feel free to review.

BTW, integration test has been tested locally with the docker-compose.yaml as below:

https://github.com/vesoft-inc/nebula-flink-connector/pull/57/files#diff-881de9edd148f6d70ee8c0ebc96cc09268471ba279c5d0b953b0c8abcf9d2e43

image

@CLAassistant
Copy link

CLAassistant commented Jun 16, 2022

CLA assistant check
All committers have signed the CLA.

@spike-liu
Copy link
Contributor Author

@Nicole00 Thanks for the build approval.

Meanwhile I have fixed the build error caused by check-style and updated the pull request. Please review again.

By the way, this was a mistake and check-style had been enabled in my development environment.

@spike-liu
Copy link
Contributor Author

spike-liu commented Jun 24, 2022

@Nicole00 We believe you are very busy. Would you please spare us some time to review this PR?

We are enthusiasm for this project. Our colleagues would like to contribute more codes based on this PR.

Or is it possible to assign someone else to help us out?

@QingZ11
Copy link

QingZ11 commented Jun 29, 2022

@Nicole00 We believe you are very busy. Would you please spare us some time to review this PR?

We are enthusiasm for this project. Our colleagues would like to contribute more codes based on this PR.

Or is it possible to assign someone else to help us out?

Please wait a moment, I have communicated with Nicole, for she is on a long vacation, so the pr processing is not very timely.

Sorry for our slow reply. And thank you for your patience.

(supported by Google Translate)

@spike-liu
Copy link
Contributor Author

@QingZ11 it is great to hear from you and thanks for letting us know.

@Nicole00
Copy link
Contributor

Nicole00 commented Jul 4, 2022

@QingZ11 it is great to hear from you and thanks for letting us know.

So sorry for the late reply. I'm on maternity leave so not review it in time. We'll review this pr as soon as possible.

@@ -0,0 +1,175 @@
package org.apache.flink.connector.nebula.sink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please append the license

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

throw new IllegalArgumentException(
String.format("The value of '%s' option should not be negative, but is %s.",
EXECUTE_RETRY.key(), config.get(EXECUTE_RETRY)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can keep just one parameter for timeout and remove the retry parameter, because java client 3.0 just exposes timeout for NebulaPoolConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the existing codes. I am fine with review and removed EXECUTE_RETRY related codes.

@@ -0,0 +1,38 @@
package org.apache.flink.connector.nebula.sink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please append the license

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed all the related License codes.

options.add(CONNECT_TIMEOUT);
options.add(CONNECT_RETRY);
options.add(TIMEOUT);
options.add(EXECUTE_RETRY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need CONNECT_TIMEOUT, CONNECT_RETRY, EXECUTE_RETRY.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -0,0 +1,44 @@
package org.apache.flink.connector.nebula.sink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless class? can we delete it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in NebulaDynamicTableSink as below:

image

set.add(CONNECT_TIMEOUT);
set.add(CONNECT_RETRY);
set.add(TIMEOUT);
set.add(EXECUTE_RETRY);
set.add(SRC_INDEX);
set.add(DST_INDEX);
set.add(RANK_INDEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about ID_IDNEX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done.

@Nicole00
Copy link
Contributor

Nicole00 commented Jul 4, 2022

It's a great PR! thanks again for your work and attention to nebula. 👍🏻

@Nicole00
Copy link
Contributor

Nicole00 commented Jul 4, 2022

  1. How can we create a graph space using flink sql?
  2. And if there's point to define the vid(src id, dst id) data type in the create sql.

@Test
public void sinkVertexData() throws ExecutionException, InterruptedException {
TableEnvironment tableEnvironment =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure the ut works.
We can use docker-compose to start up nebula service and create a space before executing these tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed as below:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great tests

@spike-liu
Copy link
Contributor Author

@QingZ11 it is great to hear from you and thanks for letting us know.

So sorry for the late reply. I'm on maternity leave so not review it in time. We'll review this pr as soon as possible.

It is our sunny day to hear from you:)

Please forgive us to push you during your leave.

@spike-liu
Copy link
Contributor Author

spike-liu commented Jul 5, 2022

  1. How can we create a graph space using flink sql?
    [Spike] Currently Flink SQL only support few statements as below, so no chance for us to create graph space:
    https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/overview/

image

  1. And if there's point to define the vid(src id, dst id) data type in the create sql.
    [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^

@Nicole00
Copy link
Contributor

Nicole00 commented Jul 7, 2022

  1. How can we create a graph space using flink sql?
    [Spike] Currently Flink SQL only support few statements as below, so no chance for us to create graph space:
    https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/overview/
image
  1. And if there's point to define the vid(src id, dst id) data type in the create sql.
    [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^
  1. The supported statements in Flink SQL includes CREATE DATABASE, for nebula it's equal to CREATE SPACE.
  2. I agree that srcid and dstid should be included in insert statements. but how to define the datatype of vid in CREATE TABLE statement? like vid STRING or vid BUGINT? In fact, the datatype in CREATE TABLE statement is useless, it's point in CREATE SPACE statement.

@Nicole00
Copy link
Contributor

Nicole00 commented Jul 7, 2022

For more clear function and more convenient for review, please submit the source function in another pr? Thanks~

@spike-liu
Copy link
Contributor Author

spike-liu commented Jul 7, 2022

  1. How can we create a graph space using flink sql?
    [Spike] Currently Flink SQL only support few statements as below, so no chance for us to create graph space:
    https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/overview/
image > 2. And if there's point to define the vid(src id, dst id) data type in the create sql. > [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^
  1. The supported statements in Flink SQL includes CREATE DATABASE, for nebula it's equal to CREATE SPACE.
  2. I agree that srcid and dstid should be included in insert statements. but how to define the datatype of vid in CREATE TABLE statement? like vid STRING or vid BUGINT? In fact, the datatype in CREATE TABLE statement is useless, it's point in CREATE SPACE statement.

Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc.

Maybe we could create an separate issue to track this requirement.

@spike-liu
Copy link
Contributor Author

spike-liu commented Jul 7, 2022

For more clear function and more convenient for review, please submit the source function in another pr? Thanks~

Sorry for the extra complexity. There are some duplicated codes in Source & Sink, so it is hard to separate them.

However I have reverted this commit and maybe bring it on in the future if they were still needed.

@spike-liu spike-liu force-pushed the flinksql_nebula_as_sink branch from b847b5b to 31f72ea Compare July 7, 2022 11:57
@Nicole00
Copy link
Contributor

Nicole00 commented Jul 7, 2022

Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc.

Maybe we could create an separate issue to track this requirement.

I totally agree to submit a separate issue and pr for CREATE SPACE.
And For flink sql connector, there's no need to forbidden the creating space operation(at most time, the creating space is together with creating tag/edge). Mysql's database is not allowed to be create because the connection to mysql needs database name, but nebula does not.

Nicole00
Nicole00 previously approved these changes Jul 9, 2022
@@ -0,0 +1,270 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please type the date to 2022, the same for other new files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@Test
public void sinkVertexData() throws ExecutionException, InterruptedException {
TableEnvironment tableEnvironment =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great tests

@spike-liu
Copy link
Contributor Author

Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc.
Maybe we could create an separate issue to track this requirement.

I totally agree to submit a separate issue and pr for CREATE SPACE. And For flink sql connector, there's no need to forbidden the creating space operation(at most time, the creating space is together with creating tag/edge). Mysql's database is not allowed to be create because the connection to mysql needs database name, but nebula does not.

Issue has been created as below:
#62

@codecov-commenter
Copy link

Codecov Report

Merging #57 (6388c1a) into master (b94c0a6) will increase coverage by 11.28%.
The diff coverage is 73.68%.

@@              Coverage Diff              @@
##             master      #57       +/-   ##
=============================================
+ Coverage     38.43%   49.72%   +11.28%     
- Complexity      152      190       +38     
=============================================
  Files            49       50        +1     
  Lines          1517     1613       +96     
  Branches        142      153       +11     
=============================================
+ Hits            583      802      +219     
+ Misses          881      743      -138     
- Partials         53       68       +15     
Impacted Files Coverage Δ
...r/nebula/catalog/factory/NebulaCatalogFactory.java 0.00% <0.00%> (ø)
...connector/nebula/sink/NebulaEdgeBatchExecutor.java 64.10% <ø> (ø)
...nnector/nebula/sink/NebulaVertexBatchExecutor.java 63.15% <ø> (ø)
...connector/nebula/sink/NebulaBatchOutputFormat.java 49.41% <62.50%> (+10.17%) ⬆️
...nector/nebula/table/NebulaDynamicTableFactory.java 80.85% <74.44%> (+80.85%) ⬆️
...ctor/nebula/sink/NebulaEdgeTableBatchExecutor.java 83.33% <83.33%> (ø)
...or/nebula/sink/NebulaVertexTableBatchExecutor.java 83.33% <83.33%> (ø)
...nnector/nebula/connection/NebulaClientOptions.java 90.00% <100.00%> (+0.12%) ⬆️
...ctor/nebula/sink/NebulaBatchTableOutputFormat.java 100.00% <100.00%> (ø)
...connector/nebula/table/NebulaDynamicTableSink.java 90.00% <100.00%> (+90.00%) ⬆️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b94c0a6...6388c1a. Read the comment docs.

Copy link
Contributor

@Nicole00 Nicole00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!@spike-liu Thanks again for your contribution~

@Nicole00 Nicole00 merged commit 93df7a1 into vesoft-inc:master Jul 12, 2022
@spike-liu
Copy link
Contributor Author

Great work!@spike-liu Thanks again for your contribution~

It is my pleasure to work with you:)

@liuxiaocs7
Copy link
Contributor

Hello, @spike-liu. I am new to Flink and Nebula, your work inspires me, I have a few questions about the implementation of flink sql as sink.

  1. I think now the table is a temporary table, and the table name is default_catalog.default_database.table_name, and in your implementation the table in flink sql create statement is the same as the vertex/edge name in nebula? If that's true, how to create two table in different nebula graph space with same name?

@liuxiaocs7
Copy link
Contributor

  1. I noticed that your example in sink table the dataType is all String, but if i create a source table with different dataType and select data from the table, i can't sink the data because datatype is incompatible. IT maybe not a question because I don't know how you use it in your work.

@liuxiaocs7
Copy link
Contributor

Looking forward for your reply, If this is really the problem, I want to try and refine it, because that's the job of OSPP. Thank you.

@spike-liu
Copy link
Contributor Author

@liuxiaocs7 replied in #58

@liuxiaocs7
Copy link
Contributor

Thank you for your reply, I created a comment here but then I noticed that it was a closed pr, I don't know if you could see it in time, so it was also mentioned in #58 . Feel sorry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants