Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-pipeline-connector][starrocks] Introduce starrocks cdc pipeline DataSink #2765

Closed
wants to merge 20 commits into from

Conversation

banmoy
Copy link
Contributor

@banmoy banmoy commented Nov 28, 2023

This close #2645. Add Implementation of DataSink for StarRocks which supports

  • StarRocks primary key table + at-least-once for idempotent writing
  • schema changes including CreateTableEvent, AddColumnEvent and DropColumnEvent. Other types of schema change will be supported in the following pr.

@banmoy banmoy force-pushed the starrocks_pipeline branch 2 times, most recently from 528bfda to cd19d56 Compare November 28, 2023 03:51
Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contribute this connector of high code quality. Left some comments.

<groupId>com.ververica</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${revision}</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector would not depend on flink-cdc-composer, we can remove FactoryDiscoveryUtils from StarRocksDataSinkFactoryTest to get rid of it.

.put("password", "")
.build());
DataSink dataSink =
sinkFactory.createDataSink(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Met NullPointerException when running this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public StarRocksRowData serialize(Event record) {
if (record instanceof SchemaChangeEvent) {
applySchemaChangeEvent((SchemaChangeEvent) record);
return null;
Copy link
Contributor

@lvyanquan lvyanquan Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a worry, will this lead to NoPointException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminding. Connector will deal with the null value.

record ->
DATE_FORMATTER.format(
Date.valueOf(
LocalDate.ofEpochDay(record.getInt(fieldPos))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because of different ways of handling time that this method was rewritten instead of using createFieldGetter in RecordData

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to convert the data to a string so that it can be sent to starrocks in json format

@banmoy banmoy force-pushed the starrocks_pipeline branch 4 times, most recently from 1e9b23d to f4689be Compare December 2, 2023 13:23
@github-actions github-actions bot added the build label Dec 2, 2023
@lvyanquan
Copy link
Contributor

@banmoy Sorry for bother you. Met the follow error while running test:

Caused by: java.lang.NoClassDefFoundError: com/starrocks/shade/org/apache/commons/compress/utils/Lists
        at com.starrocks.connector.flink.catalog.StarRocksCatalog.executeSingleColumnStatement(StarRocksCatalog.java:456)
        at com.starrocks.connector.flink.catalog.StarRocksCatalog.databaseExists(StarRocksCatalog.java:107)
        at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyCreateTable(StarRocksMetadataApplier.java:92)
        at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:71)
        at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:86)
        at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:145)
        at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123)
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204)
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121)
        ... 33 more
Caused by: java.lang.ClassNotFoundException: com.starrocks.shade.org.apache.commons.compress.utils.Lists
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        ... 42 more

@banmoy banmoy force-pushed the starrocks_pipeline branch from 3110d27 to ca52990 Compare December 3, 2023 16:34
@banmoy
Copy link
Contributor Author

banmoy commented Dec 4, 2023

@banmoy Sorry for bother you. Met the follow error while running test:

Caused by: java.lang.NoClassDefFoundError: com/starrocks/shade/org/apache/commons/compress/utils/Lists
        at com.starrocks.connector.flink.catalog.StarRocksCatalog.executeSingleColumnStatement(StarRocksCatalog.java:456)
        at com.starrocks.connector.flink.catalog.StarRocksCatalog.databaseExists(StarRocksCatalog.java:107)
        at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyCreateTable(StarRocksMetadataApplier.java:92)
        at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:71)
        at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:86)
        at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:145)
        at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123)
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204)
        at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121)
        ... 33 more
Caused by: java.lang.ClassNotFoundException: com.starrocks.shade.org.apache.commons.compress.utils.Lists
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        ... 42 more

@lvyanquan Thanks for the reminding. Have fixed it and the test passed.

@banmoy banmoy force-pushed the starrocks_pipeline branch from ca52990 to 4db2da4 Compare December 4, 2023 13:11
@banmoy banmoy force-pushed the starrocks_pipeline branch from 49420ba to 7688367 Compare December 4, 2023 15:04
Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, +1.

@leonardBang leonardBang added this to the V3.0.0 milestone Dec 5, 2023
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @banmoy for the great work and @lvyanquan for the continuous review, LGTM.

@leonardBang
Copy link
Contributor

Resolved by 33b7c85

@leonardBang leonardBang closed this Dec 5, 2023
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[flink-cdc-pipeline-connectors] Add Implementation of DataSink In StarRocks
3 participants