Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support nebula source for flink sql connector #58

Closed
Nicole00 opened this issue Jul 4, 2022 · 12 comments
Closed

support nebula source for flink sql connector #58

Nicole00 opened this issue Jul 4, 2022 · 12 comments

Comments

@Nicole00
Copy link
Contributor

Nicole00 commented Jul 4, 2022

as title

@liuxiaocs7
Copy link
Contributor

/assign

@spike-liu
Copy link
Contributor

@Nicole00 An implementation for nebula as source has been pushed as follow:

b847b5b

@liuxiaocs7
Copy link
Contributor

liuxiaocs7 commented Jul 7, 2022

This issue is created to better track for OSPP 2022.

Connector Options

Options Required Default Type Description
meta-address required none String The nebula meta server address.
graph-address required none String The nebula graph server address.
username required none String The nebula server name.
password required none String The nebula server password.
graph-space optional none String The nebula graph space name.
label-name optional none String The nebula graph space label name.
data-type optional none Enum The nebula graph data type.
timeout optional 1000ms Integer The nebula execute timeout duration.
src-id-index optional -1 Integer The nebula execute edge src index.
dst-id-index optional -1 Integer The nebula execute edge dst index.
rank-id-index optional -1 Integer The nebula execute edge rank index.

Data Type Mapping

Flink Type Nebula Type
CHAR FIXED_STRING/GEOGRAPHY
VARCHAR STRING
STRING STRING
BOOLEAN BOOL
TINYINT INT8
SMALLINT INT16
INTEGER INT32
BIGINT INT64/INT/TIMESTAMP
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP DATETIME
TIMESTAMP_LTZ DATETIME
BYTES Not supported
DECIMAL Not supported
INTERVAL_YEAR_MONTH Not supported
INTERVAL_DAY_TIME Not supported
ARRAY Not supported
MAP Not supported
ROW Not supported
MULTISET Not supported
RAW Not supported

more information coming soon.

@spike-liu
Copy link
Contributor

PR is committed as #67

@liuxiaocs7
Copy link
Contributor

Hello, this pr #60 has already includes nebula source for flink sql connector, but it doesn't seem to be compatible with the current sink function implementation(#57), i will try to fix it.

@liuxiaocs7
Copy link
Contributor

Hello, @spike-liu. I am new to Flink and Nebula, your work(#57) inspires me, I have a few questions about the implementation of flink sql as sink.

  1. I think the table created by flink sql is a temporary table now, and the table name is default_catalog.default_database.table_name, and in your implementation the table name in flink sql create statement is the same as the vertex/edge name in nebula? If that's true, how to create two tables in flink sql from different nebula graph spaces with same name?

for example:

  • create statement: CREATE TABLE person ...
  • nebula execution options setTag(context.getObjectIdentifier().getObjectName())

Shoule we add a parameter in with clause?

At the same time, I noticed the listTables function in NebulaCatalog, Tag and Edge, tag starts with VERTEX. and edge starts with EDGE., Should we be compatible with this table name design if we want to use our own catalog?

  1. I noticed that your example in sink table the dataType is all String, but if i create a source table(eg: upstream from jdbc) with different dataType(create table xxx col1 int, col2 date, col3 time...) and select data from this table, i can't sink the data because datatype is incompatible.

Should we customize type conversions instead of using the internal Rowdata to Row?

Looking forward for your reply, thank you.

@spike-liu
Copy link
Contributor

Hello, this pr #60 has already includes nebula source for flink sql connector, but it doesn't seem to be compatible with the current sink function implementation(#57), i will try to fix it.

Oops, this is awkward. It seems there are duplicated work here. Anyway, first come and first served. #67 has been closed. Go ahead, @liuxiaocs7 .

@spike-liu
Copy link
Contributor

spike-liu commented Jul 15, 2022

Hello, @liuxiaocs7 . As a matter of fact, we are also using Flink in our project recently and glad to discuss these details with you.

For question 1: I agree with you. Would you please create an issue to track this enhancement?
For question 2: Would you please check Flink native data type cast as below? I am not sure if this could resolve incompatible issue you mention above.

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/#casting

If not, I would think there are two recipes to resolve data type cast issue: 1. put them in FlinkSQL. 2. put them in nebula-flink-connector. If these casts are specific for nebula, I would prefer recipe 2. Otherwise recipe 1 would be better because it is shared among all connectors, like mysql, kafka, hbase and etc.

@spike-liu
Copy link
Contributor

@liuxiaocs7 just a friendly suggestion, how about creating separate issues for discussion in the future? Your question is valuable for us I think.

@liuxiaocs7
Copy link
Contributor

@liuxiaocs7 just a friendly suggestion, how about creating separate issues for discussion in the future? Your question is valuable for us I think.

Thanks for your suggestion, I'm going to create a new issue to discuss this question. And develop the habit of discussing only one question in an issue.

@liuxiaocs7
Copy link
Contributor

Hello, this pr #60 has already includes nebula source for flink sql connector, but it doesn't seem to be compatible with the current sink function implementation(#57), i will try to fix it.

Oops, this is awkward. It seems there are duplicated work here. Anyway, first come and first served. #67 has been closed. Go ahead, @liuxiaocs7 .

I'll try my best to get it done, thanks a lot for your help.

@liuxiaocs7
Copy link
Contributor

liuxiaocs7 commented Jul 15, 2022

Hello, @liuxiaocs7 . As a matter of fact, we are also using Flink in our project recently and glad to discuss these details with you.

For question 1: I agree with you. Would you please create an issue to track this enhancement? For question 2: Would you please check Flink native data type cast as below? I am not sure if this could resolve incompatible issue you mention above.

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/#casting

If not, I would think there are two recipes to resolve data type cast issue: 1. put them in FlinkSQL. 2. put them in nebula-flink-connector. If these casts are specific for nebula, I would prefer recipe 2. Otherwise recipe 1 would be better because it is shared among all connectors, like mysql, kafka, hbase and etc.

@spike-liu , sorry for the late reply, now we can discuss question 1 in #70. welcome to discuss with us there.
As for question 2, I will learn about data types and casting above first, maybe it also needs a new issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants