Skip to content

Commit

Permalink
[#19189] xClusterDDLRepl: Handle Create Index
Browse files Browse the repository at this point in the history
Summary:
Adding support for create index. Cleaning up handling of ddl_command_end event trigger on the
source, swapping to use command_tags in pg_event_trigger_ddl_commands instead of the tag that
EventTriggerData provides. This approach is more flexible for processing queries that produce
multiple commands.

Combining CREATE TABLE and CREATE INDEX processing as they share the same checks required for the
extension (temp, primary indexes, colocated).
Jira: DB-7982

Test Plan:
```
ybd --cxx-test xcluster_ddl_replication-test --gtest_filter "XClusterDDLReplicationTest.CreateIndex"
ybd --java-test "org.yb.pgsql.TestPgRegressYbExtensionsYbXclusterDdlReplication"
```

Reviewers: hsunder, xCluster

Reviewed By: hsunder

Subscribers: ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34683
  • Loading branch information
hulien22 committed May 3, 2024
1 parent 7a8fc9b commit 6ddff2e
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ EXTENSION = yb_xcluster_ddl_replication
DATA = yb_xcluster_ddl_replication--1.0.sql

MODULE_big = yb_xcluster_ddl_replication
OBJS = yb_xcluster_ddl_replication.o extension_util.o json_util.o
OBJS = \
yb_xcluster_ddl_replication.o \
extension_util.o \
json_util.o \
source_ddl_end_handler.o

ifdef USE_PGXS
PG_CONFIG = pg_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CREATE TEMP TABLE temp_foo(i int PRIMARY KEY);
-- Verify that colocated tables are blocked.
CREATE TABLE coloc_foo(i int PRIMARY KEY);
ERROR: Colocated objects are not yet supported by yb_xcluster_ddl_replication
To manually replicate, run DDL with SET yb_xcluster_ddl_replication.enable_manual_ddl_replication = true
SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
yb_data
---------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
CALL TEST_reset();
SET yb_xcluster_ddl_replication.replication_role = DISABLED;
CREATE SCHEMA create_index;
SET search_path TO create_index;
-- Test temp table and index.
SET yb_xcluster_ddl_replication.replication_role = SOURCE;
CREATE TEMP TABLE temp_foo(i int PRIMARY KEY, a int);
CREATE INDEX foo_idx_temp on temp_foo(a);
SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
yb_data
---------
(0 rows)

SET yb_xcluster_ddl_replication.replication_role = BIDIRECTIONAL;
-- Create base table.
CREATE TABLE foo(i int PRIMARY KEY, a int, b text, c int);
-- Create indexes.
CREATE INDEX foo_idx_simple ON foo(a);
CREATE UNIQUE INDEX foo_idx_unique ON foo(b);
CREATE INDEX foo_idx_filtered ON foo(c ASC, a) WHERE a > c;
CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;
SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
yb_data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"user": "yugabyte", "query": "CREATE TABLE foo(i int PRIMARY KEY, a int, b text, c int);", "schema": "create_index", "version": 1, "command_tag": "CREATE TABLE"}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_simple ON foo(a);", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
{"user": "yugabyte", "query": "CREATE UNIQUE INDEX foo_idx_unique ON foo(b);", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_filtered ON foo(c ASC, a) WHERE a > c;", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
{"user": "yugabyte", "query": "CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;", "schema": "create_index", "version": 1, "command_tag": "CREATE INDEX"}
(5 rows)

SELECT * FROM yb_xcluster_ddl_replication.replicated_ddls ORDER BY start_time;
start_time | query_id | yb_data
------------+----------+----------------------------------------------------------------------------------------------
1 | 1 | {"query": "CREATE TABLE foo(i int PRIMARY KEY, a int, b text, c int);"}
2 | 1 | {"query": "CREATE INDEX foo_idx_simple ON foo(a);"}
3 | 1 | {"query": "CREATE UNIQUE INDEX foo_idx_unique ON foo(b);"}
4 | 1 | {"query": "CREATE INDEX foo_idx_filtered ON foo(c ASC, a) WHERE a > c;"}
5 | 1 | {"query": "CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;"}
(5 rows)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include "utils/fmgroids.h"
#include "utils/relcache.h"

const char *kManualReplicationErrorMsg =
"To manually replicate, run DDL with "
"SET yb_xcluster_ddl_replication.enable_manual_ddl_replication = true";

int64
GetInt64FromVariable(const char *var, const char *var_name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,8 @@
#define table_close(r, l) heap_close(r, l)
#endif

typedef enum ClusterReplicationRole
{
REPLICATION_ROLE_DISABLED,
REPLICATION_ROLE_SOURCE,
REPLICATION_ROLE_TARGET,
REPLICATION_ROLE_BIDIRECTIONAL,
} ClusterReplicationRole;

static const struct config_enum_entry replication_roles[] = {
{"DISABLED", REPLICATION_ROLE_DISABLED, false},
{"SOURCE", REPLICATION_ROLE_SOURCE, false},
{"TARGET", REPLICATION_ROLE_TARGET, false},
{"BIDIRECTIONAL", REPLICATION_ROLE_BIDIRECTIONAL, /* hidden */ true},
{NULL, 0, false}};
// Global variables.
extern const char *kManualReplicationErrorMsg;

// Get int64 value from string extension variable.
int64 GetInt64FromVariable(const char *var, const char *var_name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#include "source_ddl_end_handler.h"

#include "executor/spi.h"
#include "lib/stringinfo.h"

#include "extension_util.h"
#include "pg_yb_utils.h"

#define OBJID_COLUMN_ID 1
#define COMMAND_TAG_COLUMN_ID 2

bool
ShouldReplicateCreateRelation(Oid rel_oid)
{
Relation rel = RelationIdGetRelation(rel_oid);
// Ignore temporary tables and primary indexes (same as main table).
if (!IsYBBackedRelation(rel) ||
(rel->rd_rel->relkind == RELKIND_INDEX && rel->rd_index->indisprimary))
{
RelationClose(rel);
return false;
}

// Also need to disallow colocated objects until that is supported.
YbTableProperties table_props = YbGetTableProperties(rel);
bool is_colocated = table_props->is_colocated;
RelationClose(rel);
if (is_colocated)
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Colocated objects are not yet supported by "
"yb_xcluster_ddl_replication\n%s",
kManualReplicationErrorMsg)));

return true;
}

bool
ProcessSourceEventTriggerDDLCommands(JsonbParseState *state)
{
StringInfoData query_buf;
initStringInfo(&query_buf);
appendStringInfo(&query_buf, "SELECT objid, command_tag FROM "
"pg_catalog.pg_event_trigger_ddl_commands()");
int exec_res = SPI_execute(query_buf.data, true, 0);
if (exec_res != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed (error %d): %s", exec_res, query_buf.data);

TupleDesc spiTupDesc = SPI_tuptable->tupdesc;

// As long as there is at least one command that needs to be replicated, we
// will set this to true and replicate the entire query string.
bool should_replicate_ddl = false;
for (int row = 0; row < SPI_processed; row++)
{
HeapTuple spiTuple = SPI_tuptable->vals[row];
bool is_null;
Oid objid = DatumGetObjectId(
SPI_getbinval(spiTuple, spiTupDesc, OBJID_COLUMN_ID, &is_null));
if (is_null)
elog(ERROR, "Found NULL value when parsing objid");

const char *command_tag =
SPI_getvalue(spiTuple, spiTupDesc, COMMAND_TAG_COLUMN_ID);

if (strncmp(command_tag, "CREATE TABLE", 12) == 0 ||
strncmp(command_tag, "CREATE INDEX", 12) == 0)
{
should_replicate_ddl |= ShouldReplicateCreateRelation(objid);
}
else
{
elog(ERROR, "Unsupported DDL: %s\n%s", command_tag,
kManualReplicationErrorMsg);
}
}

return should_replicate_ddl;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#ifndef YB_XCLUSTER_DDL_REPLICATION_SOURCE_DDL_END
#define YB_XCLUSTER_DDL_REPLICATION_SOURCE_DDL_END

#include "postgres.h"
#include "utils/jsonb.h"

/*
* Iterate over pg_catalog.pg_event_trigger_ddl_commands() and process each base
* command (a single query may be composed of multiple base commands).
*
* There are three return cases for this function:
* 1. The DDL contains some unsupported command - This function throws an error
* which aborts the transaction.
* 2. The DDL is valid but does not need to be replicated (eg all the objects in
* the DDL are temp) - This function returns false.
* 3. The DDL is valid and should be replicated - This function returns true.
*/
bool ProcessSourceEventTriggerDDLCommands(JsonbParseState *state);

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CALL TEST_reset();

SET yb_xcluster_ddl_replication.replication_role = DISABLED;
CREATE SCHEMA create_index;
SET search_path TO create_index;

-- Test temp table and index.
SET yb_xcluster_ddl_replication.replication_role = SOURCE;
CREATE TEMP TABLE temp_foo(i int PRIMARY KEY, a int);
CREATE INDEX foo_idx_temp on temp_foo(a);

SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
SET yb_xcluster_ddl_replication.replication_role = BIDIRECTIONAL;

-- Create base table.
CREATE TABLE foo(i int PRIMARY KEY, a int, b text, c int);

-- Create indexes.
CREATE INDEX foo_idx_simple ON foo(a);

CREATE UNIQUE INDEX foo_idx_unique ON foo(b);

CREATE INDEX foo_idx_filtered ON foo(c ASC, a) WHERE a > c;

CREATE INDEX foo_idx_include ON foo(lower(b)) INCLUDE (a) SPLIT INTO 2 TABLETS;

SELECT yb_data FROM yb_xcluster_ddl_replication.ddl_queue ORDER BY start_time;
SELECT * FROM yb_xcluster_ddl_replication.replicated_ddls ORDER BY start_time;
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# contrib/yb_xcluster_ddl_replication/yb_schedule

test: setup
test: create_table
test: create_index

test: colocated_setup
test: create_colocated_table
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,26 @@

#include "extension_util.h"
#include "json_util.h"
#include "source_ddl_end_handler.h"

PG_MODULE_MAGIC;

/* Extension variables. */
typedef enum ClusterReplicationRole
{
REPLICATION_ROLE_DISABLED,
REPLICATION_ROLE_SOURCE,
REPLICATION_ROLE_TARGET,
REPLICATION_ROLE_BIDIRECTIONAL,
} ClusterReplicationRole;

static const struct config_enum_entry replication_roles[] = {
{"DISABLED", REPLICATION_ROLE_DISABLED, false},
{"SOURCE", REPLICATION_ROLE_SOURCE, false},
{"TARGET", REPLICATION_ROLE_TARGET, false},
{"BIDIRECTIONAL", REPLICATION_ROLE_BIDIRECTIONAL, /* hidden */ true},
{NULL, 0, false}};

static int ReplicationRole = REPLICATION_ROLE_DISABLED;
static bool EnableManualDDLReplication = false;
char *DDLQueuePrimaryKeyStartTime = NULL;
Expand Down Expand Up @@ -134,55 +150,6 @@ InsertIntoDDLQueue(Jsonb *yb_data)
InsertIntoTable(DDL_QUEUE_TABLE_NAME, epoch_time, random(), yb_data);
}

/* Returns whether or not to continue with processing the DDL. */
bool
HandleCreateTable()
{
// TODO(jhe): Is there an alternate method to get this info?
// TODO(jhe): Can we use ddl_deparse on command to handle each separately?
StringInfoData query_buf;
initStringInfo(&query_buf);
appendStringInfo(
&query_buf, "SELECT objid FROM pg_catalog.pg_event_trigger_ddl_commands()");
int exec_res = SPI_execute(query_buf.data, true, 0);
if (exec_res != SPI_OK_SELECT)
elog(ERROR, "SPI_exec failed (error %d): %s", exec_res, query_buf.data);

TupleDesc spiTupDesc = SPI_tuptable->tupdesc;
bool found_yb_relation = false;
for (int row = 0; row < SPI_processed; row++)
{
HeapTuple spiTuple = SPI_tuptable->vals[row];
bool is_null;
Oid objid =
DatumGetObjectId(SPI_getbinval(spiTuple, spiTupDesc, 1, &is_null));

Relation rel = RelationIdGetRelation(objid);
// Ignore temporary tables and primary indexes (same as main table).
if (!IsYBBackedRelation(rel) ||
(rel->rd_rel->relkind == RELKIND_INDEX && rel->rd_index->indisprimary))
{
RelationClose(rel);
continue;
}

// Also need to check colocated until that is supported.
YbTableProperties table_props = YbGetTableProperties(rel);
bool is_colocated = table_props->is_colocated;
RelationClose(rel);
if (is_colocated)
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Colocated objects are not yet supported by "
"yb_xcluster_ddl_replication")));

found_yb_relation = true;
}

// If all the objects are temporary, then stop processing early as we don't
// need to replicate this ddl query at all.
return found_yb_relation;
}

void
HandleSourceDDLEnd(EventTriggerData *trig_data)
{
Expand Down Expand Up @@ -219,17 +186,11 @@ HandleSourceDDLEnd(EventTriggerData *trig_data)
{
(void) AddBoolJsonEntry(state, "manual_replication", true);
}
else if (strcmp(trig_data->tag, "CREATE TABLE") == 0)
{
if (!HandleCreateTable())
goto exit;
}
else
{
elog(ERROR,
"Unsupported DDL: %s\nTo manually replicate, run DDL with "
"SET yb_xcluster_ddl_replication.enable_manual_ddl_replication = true",
trig_data->tag);
bool should_replicate_ddl = ProcessSourceEventTriggerDDLCommands(state);
if (!should_replicate_ddl)
goto exit;
}

// Construct the jsonb and insert completed row into ddl_queue table.
Expand Down Expand Up @@ -308,9 +269,9 @@ handle_ddl_end(PG_FUNCTION_ARGS)
static bool
IsInIgnoreList(EventTriggerData *trig_data)
{
if (strcmp(trig_data->tag, "CREATE EXTENSION") == 0 ||
strcmp(trig_data->tag, "DROP EXTENSION") == 0 ||
strcmp(trig_data->tag, "ALTER EXTENSION") == 0)
if (strncmp(trig_data->tag, "CREATE EXTENSION", 16) == 0 ||
strncmp(trig_data->tag, "DROP EXTENSION", 14) == 0 ||
strncmp(trig_data->tag, "ALTER EXTENSION", 15) == 0)
{
return true;
}
Expand Down
Loading

0 comments on commit 6ddff2e

Please sign in to comment.