Skip to content

Commit

Permalink
fix(pg-cdc): fix support for partitioned table (#18456)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Sep 22, 2024
1 parent 98a2d41 commit bc65ffb
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 5 deletions.
54 changes: 54 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,57 @@ NULL NULL NULL
{} {} {}
NULL NULL NULL
{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL

query TTT
SELECT * FROM partitioned_timestamp_table ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
4 f 2024-01-04 13:00:00
5 f 2024-03-05 09:30:00
6 f 2024-06-06 14:20:00
7 f 2024-09-07 16:45:00
8 f 2025-01-08 18:30:00
9 f 2025-07-09 07:10:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00
14 t 2024-01-04 13:00:00
15 t 2024-03-05 09:30:00
16 t 2024-06-06 14:20:00
17 t 2024-09-07 16:45:00
18 t 2025-01-08 18:30:00
19 t 2025-07-09 07:10:00

query TTT
SELECT * FROM partitioned_timestamp_table_2023 ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00

query TTT
SELECT * FROM partitioned_timestamp_table_shared ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
4 f 2024-01-04 13:00:00
5 f 2024-03-05 09:30:00
6 f 2024-06-06 14:20:00
7 f 2024-09-07 16:45:00
8 f 2025-01-08 18:30:00
9 f 2025-07-09 07:10:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00
14 t 2024-01-04 13:00:00
15 t 2024-03-05 09:30:00
16 t 2024-06-06 14:20:00
17 t 2024-09-07 16:45:00
18 t 2025-01-08 18:30:00
19 t 2025-07-09 07:10:00
40 changes: 40 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,43 @@ create table upper_orders (
table.name = 'Orders',
slot.name = 'orders'
);

# for the partitioned table
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_partition_root',
slot.name = 'my_slot_partition'
);

# for only one partition, as Postgres does not support adding both a partitioned tableand its individual partitions to the same publication, we use different publication for the partition
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023',
publication.name = 'rw_publication_partition',
slot.name = 'my_slot_partition_2'
);
8 changes: 8 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
FROM pg_source TABLE 'public.person';

statement ok
CREATE TABLE partitioned_timestamp_table_shared(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,22 @@ create table shipments (

statement ok
drop table shipments;

statement error Table 'partitioned_timestamp_table' has partitions, which requires publication 'rw_publication_pubviaroot_false' to be created with `publish_via_partition_root = true`.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_pubviaroot_false',
slot.name = 'my_slot_partition'
);
31 changes: 31 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,34 @@ CREATE TABLE "Orders" (
name varchar
);
INSERT INTO "Orders" VALUES (1, 'happy');


CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');

CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2024-01-01') TO ('2024-12-31');

CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2025-01-01') TO ('2025-12-31');

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(1, false, '2023-02-01 10:30:00'),
(2, false, '2023-05-15 11:45:00'),
(3, false, '2023-11-03 12:15:00'),
(4, false, '2024-01-04 13:00:00'),
(5, false, '2024-03-05 09:30:00'),
(6, false, '2024-06-06 14:20:00'),
(7, false, '2024-09-07 16:45:00'),
(8, false, '2025-01-08 18:30:00'),
(9, false, '2025-07-09 07:10:00');

-- Here we create this publication without `WITH ( publish_via_partition_root = true )` only for tests. Normally, it should be added.
create publication rw_publication_pubviaroot_false for TABLE partitioned_timestamp_table;
10 changes: 10 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ INSERT INTO list_with_null VALUES (3, '{NULL,-3,-4}', '{NULL,nan,-inf}', '{NULL,
INSERT INTO list_with_null VALUES (4, '{-4,-5,-6}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,sad,ok}', '{b2e4636d-fa03-4ad4-bf16-029a79dca3e2}', '{\\x88,\\x99,\\xAA}');
INSERT INTO list_with_null VALUES (6, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(11, true, '2023-02-01 10:30:00'),
(12, true, '2023-05-15 11:45:00'),
(13, true, '2023-11-03 12:15:00'),
(14, true, '2024-01-04 13:00:00'),
(15, true, '2024-03-05 09:30:00'),
(16, true, '2024-06-06 14:20:00'),
(17, true, '2024-09-07 16:45:00'),
(18, true, '2025-01-08 18:30:00'),
(19, true, '2025-07-09 07:10:00');
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public static void createPostgresPublicationIfNeeded(
if (schemaTableName.isPresent()) {
createPublicationSql =
String.format(
"CREATE PUBLICATION %s FOR TABLE %s;",
"CREATE PUBLICATION %s FOR TABLE %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName), schemaTableName.get());
} else {
createPublicationSql =
String.format("CREATE PUBLICATION %s", quotePostgres(pubName));
String.format(
"CREATE PUBLICATION %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName));
}
try (var stmt = jdbcConnection.createStatement()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
private void validatePublicationConfig(boolean isSuperUser) throws SQLException {
boolean isPublicationCoversTable = false;
boolean isPublicationExists = false;
boolean isPublicationViaRoot = false;
boolean isPartialPublicationEnabled = false;

// Check whether publication exists
Expand All @@ -340,7 +341,8 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
stmt.setString(1, pubName);
var res = stmt.executeQuery();
while (res.next()) {
isPublicationExists = res.getBoolean(1);
isPublicationViaRoot = res.getBoolean(1);
isPublicationExists = true;
}
}

Expand Down Expand Up @@ -370,6 +372,31 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
isPartialPublicationEnabled = res.getBoolean(1);
}
}

List<String> partitions = new ArrayList<>();
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_names"))) {
stmt.setString(1, schemaName);
stmt.setString(2, tableName);
var res = stmt.executeQuery();
while (res.next()) {
partitions.add(res.getString(1));
}
}
if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) {
// make sure the publication are created with `publish_via_partition_root = true`, which
// is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` in the upstream Postgres to check.");
}

// PG 15 and up supports partial publication of table
// check whether publication covers all columns of the table schema
if (isPartialPublicationEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.publication_exist=SELECT COUNT(*) > 0 from pg_publication WHERE pubname = ?
postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?;
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void testPermissionCheck() throws SQLException {
"CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))";
SourceTestClient.performQuery(connDbz, query);
// create a partial publication, check whether error is reported
query = "CREATE PUBLICATION rw_publication FOR TABLE orders (o_key)";
query =
"CREATE PUBLICATION rw_publication FOR TABLE orders (o_key) WITH ( publish_via_partition_root = true );";
SourceTestClient.performQuery(connDbz, query);
ConnectorServiceProto.TableSchema tableSchema =
ConnectorServiceProto.TableSchema.newBuilder()
Expand Down

0 comments on commit bc65ffb

Please sign in to comment.