You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Create table in postgres and add a unique index that uses a function like coalesce
create table test_tbl
(
id text not null,
parent_id text,
cnt integer default 0,
created_at timestamp default LOCALTIMESTAMP,
updated_at timestamp default LOCALTIMESTAMP
);
create unique index test_tbl_idx
on test_tbl (id, COALESCE(parent_id, ''::text));
Start the flink application to receive events from this table
What did you expect to see?
Expected Flink application to receive the data.
What did you see instead?
The connector throws this error at startup
14:51:09.530 [debezium-postgresconnector-postgres_cdc_source-change-event-source-coordinator] ERROR io.debezium.pipeline.ErrorHandler - Producer failure
java.lang.IllegalArgumentException: The column "COALESCE(parent_id, ''::text)" is referenced as PRIMARY KEY, but a matching column is not defined in table "public.test_tbl"!
at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:106) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.ArrayList.removeIf(ArrayList.java:1702) ~[?:?]
at java.util.ArrayList.removeIf(ArrayList.java:1690) ~[?:?]
at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:102) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:267) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.lambda$overwriteTable$2(Tables.java:192) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:84) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.overwriteTable(Tables.java:186) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1214) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:87) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:68) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.init(PostgresStreamingChangeEventSource.java:118) ~[flink-connector-postgres-cdc-2.4.2.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:182) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
14:51:10.004 [debezium-engine] ERROR com.ververica.cdc.debezium.internal.Handover - Reporting error:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.IllegalArgumentException: The column "COALESCE(parent_id, ''::text)" is referenced as PRIMARY KEY, but a matching column is not defined in table "public.test_tbl"!
at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:106) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at java.util.ArrayList.removeIf(ArrayList.java:1702) ~[?:?]
at java.util.ArrayList.removeIf(ArrayList.java:1690) ~[?:?]
at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:102) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:267) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.lambda$overwriteTable$2(Tables.java:192) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.util.FunctionalReadWriteLock.write(FunctionalReadWriteLock.java:84) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.relational.Tables.overwriteTable(Tables.java:186) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1214) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:87) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresTaskContext.refreshSchema(PostgresTaskContext.java:68) ~[debezium-connector-postgres-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.init(PostgresStreamingChangeEventSource.java:118) ~[flink-connector-postgres-cdc-2.4.2.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:182) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]
... 5 more
Anything else?
The fix is already implemented in Debezium (debezium/debezium#3718), however it is applied only in 2.x version which cannot be used in this connector as it is a major change.
Also, this connector modifies Debezium classes and this fix is in one of those classes. Applying the same fix to these classes should solve this problem.
Are you willing to submit a PR?
I'm willing to submit a PR!
The text was updated successfully, but these errors were encountered:
Search before asking
Flink version
1.16.0
Flink CDC version
2.4.2
Database and its version
AWS RDS - PostgreSQL 11.22 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-12), 64-bit
Minimal reproduce step
See original bug - https://issues.redhat.com/browse/DBZ-5398
Create table in postgres and add a unique index that uses a function like
coalesce
Start the flink application to receive events from this table
What did you expect to see?
Expected Flink application to receive the data.
What did you see instead?
The connector throws this error at startup
Anything else?
The fix is already implemented in Debezium (debezium/debezium#3718), however it is applied only in 2.x version which cannot be used in this connector as it is a major change.
Also, this connector modifies Debezium classes and this fix is in one of those classes. Applying the same fix to these classes should solve this problem.
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: