Skip to content

Commit

Permalink
[Bug Fix] [SEGFAULT] Querying distributed tables with window partitio…
Browse files Browse the repository at this point in the history
…n may cause segfault #7705

In function MasterAggregateMutator(), when the original Node is a Var node use makeVar() instead
of copyObject() when constructing the Var node for the target list of the combine query.
The varnullingrels field of the original Var node is ignored because it is not relevant for the
combine query; copying this cause the problem in issue 7705, where a coordinator query had
a Var with a reference to a non-existent join relation.

(cherry picked from commit c52f360)
  • Loading branch information
colm-mchugh authored and naisila committed Jan 13, 2025
1 parent 1893d9a commit 353033d
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 4 deletions.
7 changes: 4 additions & 3 deletions src/backend/distributed/planner/multi_logical_optimizer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1557,9 +1557,10 @@ MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerC
}
else if (IsA(originalNode, Var))
{
Var *newColumn = copyObject((Var *) originalNode);
newColumn->varno = masterTableId;
newColumn->varattno = walkerContext->columnId;
Var *origColumn = (Var *) originalNode;
Var *newColumn = makeVar(masterTableId, walkerContext->columnId,
origColumn->vartype, origColumn->vartypmod,
origColumn->varcollid, origColumn->varlevelsup);
walkerContext->columnId++;

newNode = (Node *) newColumn;
Expand Down
248 changes: 248 additions & 0 deletions src/test/regress/expected/issue_7705.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
--- Test for verifying that column references (var nodes) in targets that cannot be pushed down
--- do not cause issues for the postgres planner, in particular postgres versions 16+, where the
--- varnullingrels field of a VAR node may contain relids of join relations that can make the var
--- NULL; in a rewritten distributed query without a join such relids do not have a meaning.
--- Issue #7705: [SEGFAULT] Querying distributed tables with window partition causes segmentation fault
--- https://github.com/citusdata/citus/issues/7705
CREATE SCHEMA issue_7705;
SET search_path to 'issue_7705';
SET citus.next_shard_id TO 30070000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
CREATE TABLE t1 (id INT PRIMARY KEY);
INSERT INTO t1 VALUES (1), (2);
CREATE TABLE t2 (id INT, account_id INT, a2 INT, PRIMARY KEY(id, account_id));
INSERT INTO t2 VALUES (3, 1, 10), (4, 2, 20), (5, 1, NULL);
SELECT create_distributed_table('t1', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$issue_7705.t1$$)
create_distributed_table
---------------------------------------------------------------------

(1 row)

SELECT create_distributed_table('t2', 'account_id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$issue_7705.t2$$)
create_distributed_table
---------------------------------------------------------------------

(1 row)

-- Test the issue seen in #7705; a target expression with
-- a window function that cannot be pushed down because the
-- partion by is not on the distribution column also includes
-- a column from the inner side of a left outer join, which
-- produces a non-empty varnullingrels set in PG 16 (and higher)
SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;
id | max
---------------------------------------------------------------------
1 | 10
2 | 20
1 |
(3 rows)

EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;
QUERY PLAN
---------------------------------------------------------------------
WindowAgg
Output: remote_scan.id, max(remote_scan.max) OVER (?), remote_scan.worker_column_3
-> Sort
Output: remote_scan.worker_column_3, remote_scan.id, remote_scan.max
Sort Key: remote_scan.worker_column_3
-> Custom Scan (Citus Adaptive)
Output: remote_scan.worker_column_3, remote_scan.id, remote_scan.max
Task Count: 4
Tasks Shown: One of 4
-> Task
Query: SELECT worker_column_1 AS id, worker_column_2 AS max, worker_column_3 FROM (SELECT t1.id AS worker_column_1, t2.a2 AS worker_column_2, t2.id AS worker_column_3 FROM (issue_7705.t1_30070000 t1 LEFT JOIN issue_7705.t2_30070004 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.account_id)))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Output: t1.id, t2.a2, t2.id
Inner Unique: true
Hash Cond: (t2.account_id = t1.id)
-> Seq Scan on issue_7705.t2_30070004 t2
Output: t2.id, t2.account_id, t2.a2
-> Hash
Output: t1.id
-> Seq Scan on issue_7705.t1_30070000 t1
Output: t1.id
(22 rows)

SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t2 RIGHT OUTER JOIN t1 ON t1.id = t2.account_id;
id | max
---------------------------------------------------------------------
1 | 10
2 | 20
1 |
(3 rows)

EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t2 RIGHT OUTER JOIN t1 ON t1.id = t2.account_id;
QUERY PLAN
---------------------------------------------------------------------
WindowAgg
Output: remote_scan.id, max(remote_scan.max) OVER (?), remote_scan.worker_column_3
-> Sort
Output: remote_scan.worker_column_3, remote_scan.id, remote_scan.max
Sort Key: remote_scan.worker_column_3
-> Custom Scan (Citus Adaptive)
Output: remote_scan.worker_column_3, remote_scan.id, remote_scan.max
Task Count: 4
Tasks Shown: One of 4
-> Task
Query: SELECT worker_column_1 AS id, worker_column_2 AS max, worker_column_3 FROM (SELECT t1.id AS worker_column_1, t2.a2 AS worker_column_2, t2.id AS worker_column_3 FROM (issue_7705.t2_30070004 t2 RIGHT JOIN issue_7705.t1_30070000 t1 ON ((t1.id OPERATOR(pg_catalog.=) t2.account_id)))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Output: t1.id, t2.a2, t2.id
Inner Unique: true
Hash Cond: (t2.account_id = t1.id)
-> Seq Scan on issue_7705.t2_30070004 t2
Output: t2.id, t2.account_id, t2.a2
-> Hash
Output: t1.id
-> Seq Scan on issue_7705.t1_30070000 t1
Output: t1.id
(22 rows)

SELECT DISTINCT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;
id | max
---------------------------------------------------------------------
1 |
1 | 10
2 | 20
(3 rows)

EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT DISTINCT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Output: remote_scan.id, (max(remote_scan.max) OVER (?)), remote_scan.worker_column_3
Group Key: remote_scan.id, max(remote_scan.max) OVER (?)
-> WindowAgg
Output: remote_scan.id, max(remote_scan.max) OVER (?), remote_scan.worker_column_3
-> Sort
Output: remote_scan.worker_column_3, remote_scan.id, remote_scan.max
Sort Key: remote_scan.worker_column_3
-> Custom Scan (Citus Adaptive)
Output: remote_scan.worker_column_3, remote_scan.id, remote_scan.max
Task Count: 4
Tasks Shown: One of 4
-> Task
Query: SELECT worker_column_1 AS id, worker_column_2 AS max, worker_column_3 FROM (SELECT t1.id AS worker_column_1, t2.a2 AS worker_column_2, t2.id AS worker_column_3 FROM (issue_7705.t1_30070000 t1 LEFT JOIN issue_7705.t2_30070004 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.account_id)))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Output: t1.id, t2.a2, t2.id
Inner Unique: true
Hash Cond: (t2.account_id = t1.id)
-> Seq Scan on issue_7705.t2_30070004 t2
Output: t2.id, t2.account_id, t2.a2
-> Hash
Output: t1.id
-> Seq Scan on issue_7705.t1_30070000 t1
Output: t1.id
(25 rows)

CREATE SEQUENCE test_seq START 101;
CREATE OR REPLACE FUNCTION TEST_F(int) returns INT language sql stable as $$ select $1 + 42; $$ ;
-- Issue #7705 also occurs if a target expression includes a column
-- of a distributed table that is on the inner side of a left outer
-- join and a call to nextval(), because nextval() cannot be pushed
-- down, and must be run on the coordinator
SELECT t1.id, TEST_F(t2.a2 + nextval('test_seq') :: int)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;
id | test_f
---------------------------------------------------------------------
1 | 153
1 |
2 | 165
(3 rows)

EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, TEST_F(t2.a2 + nextval('test_seq') :: int)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;
QUERY PLAN
---------------------------------------------------------------------
Result
Output: remote_scan.id, ((remote_scan.test_f + (nextval('test_seq'::regclass))::integer) + 42)
-> Sort
Output: remote_scan.id, remote_scan.test_f
Sort Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
Output: remote_scan.id, remote_scan.test_f
Task Count: 4
Tasks Shown: One of 4
-> Task
Query: SELECT worker_column_1 AS id, worker_column_2 AS test_f FROM (SELECT t1.id AS worker_column_1, t2.a2 AS worker_column_2 FROM (issue_7705.t1_30070000 t1 LEFT JOIN issue_7705.t2_30070004 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.account_id)))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Output: t1.id, t2.a2
Inner Unique: true
Hash Cond: (t2.account_id = t1.id)
-> Seq Scan on issue_7705.t2_30070004 t2
Output: t2.id, t2.account_id, t2.a2
-> Hash
Output: t1.id
-> Seq Scan on issue_7705.t1_30070000 t1
Output: t1.id
(22 rows)

SELECT t1.id, CASE nextval('test_seq') % 2 = 0 WHEN true THEN t2.a2 ELSE 1 END
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;
id | case
---------------------------------------------------------------------
1 | 10
1 | 1
2 | 20
(3 rows)

EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, CASE nextval('test_seq') %2 = 0 WHEN true THEN t2.a2 ELSE 1 END
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;
QUERY PLAN
---------------------------------------------------------------------
Result
Output: remote_scan.id, CASE ((nextval('test_seq'::regclass) % '2'::bigint) = 0) WHEN CASE_TEST_EXPR THEN remote_scan."case" ELSE 1 END
-> Sort
Output: remote_scan.id, remote_scan."case"
Sort Key: remote_scan.id
-> Custom Scan (Citus Adaptive)
Output: remote_scan.id, remote_scan."case"
Task Count: 4
Tasks Shown: One of 4
-> Task
Query: SELECT worker_column_1 AS id, worker_column_2 AS "case" FROM (SELECT t1.id AS worker_column_1, t2.a2 AS worker_column_2 FROM (issue_7705.t1_30070000 t1 LEFT JOIN issue_7705.t2_30070004 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.account_id)))) worker_subquery
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Output: t1.id, t2.a2
Inner Unique: true
Hash Cond: (t2.account_id = t1.id)
-> Seq Scan on issue_7705.t2_30070004 t2
Output: t2.id, t2.account_id, t2.a2
-> Hash
Output: t1.id
-> Seq Scan on issue_7705.t1_30070000 t1
Output: t1.id
(22 rows)

--- cleanup
\set VERBOSITY TERSE
DROP SCHEMA issue_7705 CASCADE;
NOTICE: drop cascades to 4 other objects
RESET all;
2 changes: 1 addition & 1 deletion src/test/regress/multi_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol
test: alter_table_set_access_method
test: alter_distributed_table
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
Expand Down
72 changes: 72 additions & 0 deletions src/test/regress/sql/issue_7705.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
--- Test for verifying that column references (var nodes) in targets that cannot be pushed down
--- do not cause issues for the postgres planner, in particular postgres versions 16+, where the
--- varnullingrels field of a VAR node may contain relids of join relations that can make the var
--- NULL; in a rewritten distributed query without a join such relids do not have a meaning.
--- Issue #7705: [SEGFAULT] Querying distributed tables with window partition causes segmentation fault
--- https://github.com/citusdata/citus/issues/7705

CREATE SCHEMA issue_7705;
SET search_path to 'issue_7705';
SET citus.next_shard_id TO 30070000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;

CREATE TABLE t1 (id INT PRIMARY KEY);
INSERT INTO t1 VALUES (1), (2);

CREATE TABLE t2 (id INT, account_id INT, a2 INT, PRIMARY KEY(id, account_id));
INSERT INTO t2 VALUES (3, 1, 10), (4, 2, 20), (5, 1, NULL);

SELECT create_distributed_table('t1', 'id');
SELECT create_distributed_table('t2', 'account_id');

-- Test the issue seen in #7705; a target expression with
-- a window function that cannot be pushed down because the
-- partion by is not on the distribution column also includes
-- a column from the inner side of a left outer join, which
-- produces a non-empty varnullingrels set in PG 16 (and higher)
SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;
EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;

SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t2 RIGHT OUTER JOIN t1 ON t1.id = t2.account_id;
EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t2 RIGHT OUTER JOIN t1 ON t1.id = t2.account_id;

SELECT DISTINCT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;
EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT DISTINCT t1.id, MAX(t2.a2) OVER (PARTITION BY t2.id)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id;

CREATE SEQUENCE test_seq START 101;
CREATE OR REPLACE FUNCTION TEST_F(int) returns INT language sql stable as $$ select $1 + 42; $$ ;

-- Issue #7705 also occurs if a target expression includes a column
-- of a distributed table that is on the inner side of a left outer
-- join and a call to nextval(), because nextval() cannot be pushed
-- down, and must be run on the coordinator
SELECT t1.id, TEST_F(t2.a2 + nextval('test_seq') :: int)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;
EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, TEST_F(t2.a2 + nextval('test_seq') :: int)
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;

SELECT t1.id, CASE nextval('test_seq') % 2 = 0 WHEN true THEN t2.a2 ELSE 1 END
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;
EXPLAIN (VERBOSE, COSTS OFF, TIMING OFF)
SELECT t1.id, CASE nextval('test_seq') %2 = 0 WHEN true THEN t2.a2 ELSE 1 END
FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.account_id
ORDER BY t1.id;

--- cleanup
\set VERBOSITY TERSE
DROP SCHEMA issue_7705 CASCADE;
RESET all;

0 comments on commit 353033d

Please sign in to comment.