Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MERGE command for single_shard_distributed Target #7643

Merged
merged 14 commits into from
Jul 16, 2024
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,7 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
* Find the shard interval and id for the partition column value for
* non-reference tables.
*
* For reference table, this function blindly returns the tables single
* For reference table, and single shard distributed table this function blindly returns the tables single
* shard.
*/
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry);
Expand Down
42 changes: 24 additions & 18 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,27 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ

CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);

/*
* Get the index of the column in the source query that will be utilized
* to repartition the source rows, ensuring colocation with the target
*/
distributedPlan->sourceResultRepartitionColumnIndex =
SourceResultPartitionColumnIndex(mergeQuery,
sourceQuery->targetList,
targetRelation);

if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
{
/*
* if target table is SINGLE_SHARD_DISTRIBUTED let's set this to invalid -1
* so later in execution phase we don't rely on this value and try to find single shard of target instead.
*/
distributedPlan->sourceResultRepartitionColumnIndex = -1;
}
else
{
/*
* Get the index of the column in the source query that will be utilized
* to repartition the source rows, ensuring colocation with the target
*/

distributedPlan->sourceResultRepartitionColumnIndex =
SourceResultPartitionColumnIndex(mergeQuery,
sourceQuery->targetList,
targetRelation);
}

/*
* Make a copy of the source query, since following code scribbles it
Expand All @@ -262,11 +275,11 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions,
boundParams);
bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);
bool isRepartitionAllowed = IsRedistributablePlan(sourceRowsPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);

/* If plan is distributed, no work at the coordinator */
if (repartitioned)
if (isRepartitionAllowed)
{
distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION;
}
Expand Down Expand Up @@ -1273,13 +1286,6 @@ static int
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
CitusTableCacheEntry *targetRelation)
{
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("MERGE operation across distributed schemas "
"or with a row-based distributed table is "
"not yet supported")));
}

/* Get all the Join conditions from the ON clause */
List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree);
Var *targetColumn = targetRelation->partitionColumn;
Expand Down
1 change: 1 addition & 0 deletions src/test/regress/bin/normalize.sed
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ s/"t2_[0-9]+"/"t2_xxxxxxx"/g
# shard table names for MERGE tests
s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g
s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
s/merge_vcore_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g

# shard table names for multi_subquery
s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g
Expand Down
53 changes: 46 additions & 7 deletions src/test/regress/expected/merge_schema_sharding.out
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,26 @@ WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b)>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b)
DEBUG: Execute MERGE task list
-- with a distributed table
SET search_path TO schema_shard_table1;
MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
Expand All @@ -114,7 +126,12 @@ WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, sch
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b)>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b)
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
Expand Down Expand Up @@ -163,7 +180,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b;
DEBUG: A mix of distributed and reference table, try repartitioning
DEBUG: A mix of distributed and reference table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
Expand All @@ -174,7 +197,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b;
DEBUG: A mix of distributed and local table, try repartitioning
DEBUG: A mix of distributed and citus-local table, routable query is not possible
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b
DEBUG: Execute MERGE task list
MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
WHEN MATCHED THEN DELETE;
DEBUG: A mix of distributed and local table, try repartitioning
Expand Down Expand Up @@ -210,7 +239,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b
DEBUG: Execute MERGE task list
WITH cte AS materialized (
SELECT * FROM schema_shard_table.distributed_table
)
Expand All @@ -219,7 +253,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: Distributed tables are not co-located, try repartitioning
DEBUG: For MERGE command, all the distributed tables must be colocated
DEBUG: Creating MERGE repartition plan
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collect source query results on coordinator
DEBUG: Create a MERGE task list that needs to be routed
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b
DEBUG: Execute MERGE task list
SET client_min_messages TO WARNING;
DROP SCHEMA schema_shard_table1 CASCADE;
DROP SCHEMA schema_shard_table2 CASCADE;
Expand Down
Loading
Loading