-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FlinkSQL Nebula as Sink #57
FlinkSQL Nebula as Sink #57
Conversation
@Nicole00 Thanks for the build approval. Meanwhile I have fixed the build error caused by check-style and updated the pull request. Please review again. By the way, this was a mistake and check-style had been enabled in my development environment. |
@Nicole00 We believe you are very busy. Would you please spare us some time to review this PR? We are enthusiasm for this project. Our colleagues would like to contribute more codes based on this PR. Or is it possible to assign someone else to help us out? |
Please wait a moment, I have communicated with Nicole, for she is on a long vacation, so the pr processing is not very timely. Sorry for our slow reply. And thank you for your patience. (supported by Google Translate) |
@QingZ11 it is great to hear from you and thanks for letting us know. |
So sorry for the late reply. I'm on maternity leave so not review it in time. We'll review this pr as soon as possible. |
@@ -0,0 +1,175 @@ | |||
package org.apache.flink.connector.nebula.sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please append the license
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
throw new IllegalArgumentException( | ||
String.format("The value of '%s' option should not be negative, but is %s.", | ||
EXECUTE_RETRY.key(), config.get(EXECUTE_RETRY))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can keep just one parameter for timeout
and remove the retry
parameter, because java client 3.0 just exposes timeout
for NebulaPoolConfig
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the existing codes. I am fine with review and removed EXECUTE_RETRY related codes.
@@ -0,0 +1,38 @@ | |||
package org.apache.flink.connector.nebula.sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please append the license
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed all the related License codes.
options.add(CONNECT_TIMEOUT); | ||
options.add(CONNECT_RETRY); | ||
options.add(TIMEOUT); | ||
options.add(EXECUTE_RETRY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need CONNECT_TIMEOUT, CONNECT_RETRY, EXECUTE_RETRY.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -0,0 +1,44 @@ | |||
package org.apache.flink.connector.nebula.sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless class? can we delete it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set.add(CONNECT_TIMEOUT); | ||
set.add(CONNECT_RETRY); | ||
set.add(TIMEOUT); | ||
set.add(EXECUTE_RETRY); | ||
set.add(SRC_INDEX); | ||
set.add(DST_INDEX); | ||
set.add(RANK_INDEX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about ID_IDNEX
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Done.
It's a great PR! thanks again for your work and attention to nebula. 👍🏻 |
|
@Test | ||
public void sinkVertexData() throws ExecutionException, InterruptedException { | ||
TableEnvironment tableEnvironment = | ||
TableEnvironment.create(EnvironmentSettings.inStreamingMode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please make sure the ut works.
We can use docker-compose to start up nebula service and create a space before executing these tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great tests
It is our sunny day to hear from you:) Please forgive us to push you during your leave. |
|
|
For more clear function and more convenient for review, please submit the source function in another pr? Thanks~ |
> 2. And if there's point to define the vid(src id, dst id) data type in the create sql. > [Spike] based on my understanding, srcId and dstId have to be included in insert statements while all the columns included in insert statements have to be included in Create table statement. Hence it seems this is inevitable. Correct me if I am wrong^-^ Yes, you are right. CREATE DATABASE is exactly what you want. However in our user case, production is highly stability sensitive, creating database during single data stream processing is forbidden. (not only for nebula, but also for mysql and etc. Maybe we could create an separate issue to track this requirement. |
Sorry for the extra complexity. There are some duplicated codes in Source & Sink, so it is hard to separate them. However I have reverted this commit and maybe bring it on in the future if they were still needed. |
b847b5b
to
31f72ea
Compare
I totally agree to submit a separate issue and pr for CREATE SPACE. |
@@ -0,0 +1,270 @@ | |||
/* Copyright (c) 2020 vesoft inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please type the date to 2022, the same for other new files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@Test | ||
public void sinkVertexData() throws ExecutionException, InterruptedException { | ||
TableEnvironment tableEnvironment = | ||
TableEnvironment.create(EnvironmentSettings.inStreamingMode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great tests
Issue has been created as below: |
Codecov Report
@@ Coverage Diff @@
## master #57 +/- ##
=============================================
+ Coverage 38.43% 49.72% +11.28%
- Complexity 152 190 +38
=============================================
Files 49 50 +1
Lines 1517 1613 +96
Branches 142 153 +11
=============================================
+ Hits 583 802 +219
+ Misses 881 743 -138
- Partials 53 68 +15
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!@spike-liu Thanks again for your contribution~
It is my pleasure to work with you:) |
Hello, @spike-liu. I am new to Flink and Nebula, your work inspires me, I have a few questions about the implementation of flink sql as sink.
|
|
Looking forward for your reply, If this is really the problem, I want to try and refine it, because that's the job of OSPP. Thank you. |
@liuxiaocs7 replied in #58 |
Thank you for your reply, I created a comment here but then I noticed that it was a closed pr, I don't know if you could see it in time, so it was also mentioned in #58 . Feel sorry. |
FlinkSQL: take Nebula as Sink.
Please feel free to review.
BTW, integration test has been tested locally with the docker-compose.yaml as below:
https://github.com/vesoft-inc/nebula-flink-connector/pull/57/files#diff-881de9edd148f6d70ee8c0ebc96cc09268471ba279c5d0b953b0c8abcf9d2e43