Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full insert/update/delete support for dynamic table sinks #77

Closed
linhr opened this issue Nov 18, 2022 · 13 comments
Closed

Full insert/update/delete support for dynamic table sinks #77

linhr opened this issue Nov 18, 2022 · 13 comments
Labels
type/feature req Type: feature request

Comments

@linhr
Copy link
Contributor

linhr commented Nov 18, 2022

I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL INSERT statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?

I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.

Thanks a lot!

@wey-gu
Copy link

wey-gu commented Nov 18, 2022

Ping @Nicole00

BTW. Have you checked this project?

https://github.com/nebula-contrib/nebula-real-time-exchange

@linhr
Copy link
Contributor Author

linhr commented Nov 18, 2022

Thanks for the prompt reply! Let me take a look at the real time exchange project as well.

@linhr
Copy link
Contributor Author

linhr commented Nov 18, 2022

I took a brief look at the real time exchange project, and it seems that it reads the MySQL binlog directly (using the Flink CDC connector which runs Debezium embedded inside the connector). It is definitely a viable solution for replicating MySQL databases to NebulaGraph. (We were aware of the Flink CDC project, and it is really exciting to learn that the NebulaGraph ecosystem already has something built on top of it!)

However, in our current production architecture, we already have Debezium running separately, which writes database change events to Kafka. In this way our Flink applications do not access MySQL binlog directly. In the future we may also have other Kafka-based streaming events (not from MySQL data changes) that we'd like to store in NebulaGraph, so the full dynamic table support would be very beneficial.

@linhr linhr changed the title Full Insert/Update/Delete Support for Dynamic Tables Full insert/update/delete support for dynamic table sinks Nov 18, 2022
@wey-gu
Copy link

wey-gu commented Nov 18, 2022

Sure, also, nebula-exchange can be wired to Kafka, apart from the Flink SQL part, which I am not familiar with, before @Nicole00 or others could help.

https://github.com/vesoft-inc/nebula-exchange

@linhr
Copy link
Contributor Author

linhr commented Nov 21, 2022

Thanks @wey-gu for sharing the nebula-exchange project as well. It seems to support Spark use cases. Right now I'm working on Flink applications, but I'll definitely take a look at it when I need to work with Kafka and NebulaGraph in Spark.

@linhr
Copy link
Contributor Author

linhr commented Nov 21, 2022

I looked into the source code in more details and it seems there is per-executor WriteModeEnum configuration for vertex/edge batch executor classes (NebulaVertexTableBatchExecutor and NebulaEdgeTableBatchExecutor). However this does not take into account the RowKind value for each RowData object when processing them in batches. Therefore when a changelog contains a mixed kinds of insert/update/delete events the dynamic table sink does not behave correctly.

I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link).

I'm happy to contribute a PR if the above makes sense.

@liuxiaocs7
Copy link
Contributor

I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL INSERT statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?

I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.

Thanks a lot!

Hi, @linhr, you can find examples in sink and source now, and I will try to enrich the documentation later

@liuxiaocs7
Copy link
Contributor

I looked into the source code in more details and it seems there is per-executor WriteModeEnum configuration for vertex/edge batch executor classes (NebulaVertexTableBatchExecutor and NebulaEdgeTableBatchExecutor). However this does not take into account the RowKind value for each RowData object when processing them in batches. Therefore when a changelog contains a mixed kinds of insert/update/delete events the dynamic table sink does not behave correctly.

I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link).

I'm happy to contribute a PR if the above makes sense.

I looked into the source code in more details and it seems there is per-executor WriteModeEnum configuration for vertex/edge batch executor classes (NebulaVertexTableBatchExecutor and NebulaEdgeTableBatchExecutor). However this does not take into account the RowKind value for each RowData object when processing them in batches. Therefore when a changelog contains a mixed kinds of insert/update/delete events the dynamic table sink does not behave correctly.

I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link).

I'm happy to contribute a PR if the above makes sense.

Yes, simultaneous operation is not supported yet. Your suggestion seems good for me. Look forward to your contribution and improve Nebula Flink Connector with you. And What's your idea? @Nicole00

@Nicole00
Copy link
Contributor

Hi @linhr , Sorry for late reply. I carefully consider your requirements, yeah, the flink connector does not consider the RowKind, and just performs the insert/update/delete operation according to users' WriteMode config, which maybe be unreasonable for the streaming delete operation.

  1. For now, to meet your requirement, can we try Flink side-output to filter the insert data, udpate data and delete data and sink them into Nebula separately?
  2. Thanks for your continuous participation @liuxiaocs7. I'm so glad to hear from you and look forward to your pr for dynamic data operation with different kind.

@linhr
Copy link
Contributor Author

linhr commented Nov 25, 2022

@Nicole00 Thanks for your reply! Side-output seems a reasonable alternative to this issue (assuming we switch to the Data Stream API).

Actually I have been working on this issue in the past few days. I've got something working locally. I'll send out a PR once my code change is in good shape (probably next week). (cc @liuxiaocs7)

@Nicole00
Copy link
Contributor

@Nicole00 Thanks for your reply! Side-output seems a reasonable alternative to this issue (assuming we switch to the Data Stream API).

Actually I have been working on this issue in the past few days. I've got something working locally. I'll send out a PR once my code change is in good shape (probably next week). (cc @liuxiaocs7)

Wow, looking forward to your PR 🎉. Don't feel pressured and do it on your own time.

@Sophie-Xie Sophie-Xie added the type/enhancement Type: make the code neat or more efficient label Nov 29, 2022
@Nicole00 Nicole00 added the type/feature req Type: feature request label Nov 29, 2022
@github-actions github-actions bot removed the type/enhancement Type: make the code neat or more efficient label Nov 29, 2022
@liuxiaocs7
Copy link
Contributor

Hi @linhr , Sorry for late reply. I carefully consider your requirements, yeah, the flink connector does not consider the RowKind, and just performs the insert/update/delete operation according to users' WriteMode config, which maybe be unreasonable for the streaming delete operation.

  1. For now, to meet your requirement, can we try Flink side-output to filter the insert data, udpate data and delete data and sink them into Nebula separately?
  2. Thanks for your continuous participation @liuxiaocs7. I'm so glad to hear from you and look forward to your pr for dynamic data operation with different kind.

My pleasure, I'll think about it carefully. :)

@linhr
Copy link
Contributor Author

linhr commented Dec 5, 2022

The PR is here: #81. It took a bit longer than I expected to finish writing the unit/integration tests, but I think my code change is now ready for review. @Nicole00 Looking forward to your feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature req Type: feature request
Projects
None yet
Development

No branches or pull requests

5 participants