-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cdc-pipeline-connector][starrocks] Introduce starrocks cdc pipeline DataSink #2765
Conversation
528bfda
to
cd19d56
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contribute this connector of high code quality. Left some comments.
<groupId>com.ververica</groupId> | ||
<artifactId>flink-cdc-composer</artifactId> | ||
<version>${revision}</version> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connector would not depend on flink-cdc-composer, we can remove FactoryDiscoveryUtils from StarRocksDataSinkFactoryTest to get rid of it.
.put("password", "") | ||
.build()); | ||
DataSink dataSink = | ||
sinkFactory.createDataSink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Met NullPointerException when running this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
public StarRocksRowData serialize(Event record) { | ||
if (record instanceof SchemaChangeEvent) { | ||
applySchemaChangeEvent((SchemaChangeEvent) record); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a worry, will this lead to NoPointException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminding. Connector will deal with the null value.
record -> | ||
DATE_FORMATTER.format( | ||
Date.valueOf( | ||
LocalDate.ofEpochDay(record.getInt(fieldPos)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because of different ways of handling time that this method was rewritten instead of using createFieldGetter in RecordData?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to convert the data to a string so that it can be sent to starrocks in json format
1e9b23d
to
f4689be
Compare
@banmoy Sorry for bother you. Met the follow error while running test:
|
3110d27
to
ca52990
Compare
@lvyanquan Thanks for the reminding. Have fixed it and the test passed. |
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
ca52990
to
4db2da4
Compare
49420ba
to
7688367
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, +1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @banmoy for the great work and @lvyanquan for the continuous review, LGTM.
Resolved by 33b7c85 |
This close #2645. Add Implementation of DataSink for StarRocks which supports