-
Notifications
You must be signed in to change notification settings - Fork 681
/
multi_test_helpers.sql
668 lines (628 loc) · 24.1 KB
/
multi_test_helpers.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
-- File to create functions and helpers needed for subsequent tests
-- create a helper function to create objects on each node
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
EXECUTE p_sql;
PERFORM run_command_on_workers(p_sql);
END;$$;
-- Create a function to make sure that queries returning the same result
CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'failed to execute task%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- Create a function to ignore worker plans in explain output
CREATE OR REPLACE FUNCTION coordinator_plan(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
BEGIN
FOR query_plan IN execute explain_command LOOP
RETURN next;
IF query_plan LIKE '%Task Count:%'
THEN
RETURN;
END IF;
END LOOP;
RETURN;
END; $$ language plpgsql;
-- Create a function to ignore worker plans in explain output
-- It also shows task count for plan and subplans
CREATE OR REPLACE FUNCTION coordinator_plan_with_subplans(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
DECLARE
task_count_line_reached boolean := false;
BEGIN
FOR query_plan IN execute explain_command LOOP
IF NOT task_count_line_reached THEN
RETURN next;
END IF;
IF query_plan LIKE '%Task Count:%' THEN
IF NOT task_count_line_reached THEN
SELECT true INTO task_count_line_reached;
ELSE
RETURN next;
END IF;
END IF;
END LOOP;
RETURN;
END; $$ language plpgsql;
-- Create a function to normalize Memory Usage, Buckets, Batches
CREATE OR REPLACE FUNCTION plan_normalize_memory(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
BEGIN
FOR query_plan IN execute explain_command LOOP
query_plan := regexp_replace(query_plan, '(Memory( Usage)?|Buckets|Batches): \S*', '\1: xxx', 'g');
RETURN NEXT;
END LOOP;
END; $$ language plpgsql;
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_command text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan text;
BEGIN
FOR query_plan IN EXECUTE explain_command LOOP
IF query_plan ILIKE '%is not null%'
THEN
RETURN true;
END IF;
END LOOP;
RETURN false;
END; $$ language plpgsql;
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_command text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan text;
BEGIN
FOR query_plan IN EXECUTE explain_command LOOP
IF query_plan ILIKE '%Distributed Subplan %_%'
THEN
RETURN true;
END IF;
END LOOP;
RETURN false;
END; $$ language plpgsql;
--helper function to check there is a single task
CREATE OR REPLACE FUNCTION explain_has_single_task(explain_command text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan text;
BEGIN
FOR query_plan IN EXECUTE explain_command LOOP
IF query_plan ILIKE '%Task Count: 1%'
THEN
RETURN true;
END IF;
END LOOP;
RETURN false;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
EXECUTE p_sql;
PERFORM run_command_on_workers(p_sql);
END;$$;
-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
update pg_catalog.pg_dist_object
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;
-- helper function to verify the function of a coordinator is the same on all workers
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
RETURNS bool
LANGUAGE plpgsql
AS $func$
DECLARE
coordinatorSql text;
workerSql text;
BEGIN
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
IF workerSql != coordinatorSql THEN
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
RETURN false;
END IF;
END LOOP;
RETURN true;
END;
$func$;
--
-- Procedure for creating shards for range partitioned distributed table.
--
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
AS $$
DECLARE
new_shardid bigint;
idx int;
BEGIN
FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
LOOP
SELECT master_create_empty_shard(rel::text) INTO new_shardid;
UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- Introduce a function that waits until all cleanup records are deleted, for testing purposes
CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void
SET client_min_messages TO ERROR
AS $$
DECLARE
record_count integer;
BEGIN
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
WHILE record_count != 0 LOOP
CALL pg_catalog.citus_cleanup_orphaned_resources();
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
END LOOP;
END$$ LANGUAGE plpgsql;
-- Returns the foreign keys where the referencing relation's name starts with
-- given prefix.
--
-- Foreign keys are groupped by their configurations and then the constraint name,
-- referencing table, and referenced table for each distinct configuration are
-- aggregated into arrays.
CREATE OR REPLACE FUNCTION get_grouped_fkey_constraints(referencing_relname_prefix text)
RETURNS jsonb AS $func$
DECLARE
confdelsetcols_column_ref text;
get_grouped_fkey_constraints_query text;
result jsonb;
BEGIN
-- Read confdelsetcols as null if no such column exists.
-- This can only be the case for PG versions < 15.
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_constraint'::regclass AND attname='confdelsetcols')
THEN
confdelsetcols_column_ref := '(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(confdelsetcols))';
ELSE
confdelsetcols_column_ref := '(SELECT null::smallint[])';
END IF;
EXECUTE format(
$$
SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.constraint_names) AS fkeys_with_different_config FROM (
SELECT array_agg(constraint_name ORDER BY constraint_oid) AS constraint_names,
array_agg(referencing_table::regclass::text ORDER BY constraint_oid) AS referencing_tables,
array_agg(referenced_table::regclass::text ORDER BY constraint_oid) AS referenced_tables,
referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default
FROM (
SELECT
oid AS constraint_oid,
conname AS constraint_name,
conrelid AS referencing_table,
(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(conkey)) AS referencing_columns,
confrelid AS referenced_table,
(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = confrelid AND attnum = ANY(confkey)) AS referenced_columns,
condeferrable AS deferable,
condeferred AS deferred,
confupdtype AS on_update,
confdeltype AS on_delete,
confmatchtype AS match_type,
%2$s AS referencing_columns_set_null_or_default
FROM pg_constraint WHERE starts_with(conrelid::regclass::text, '%1$s') AND contype = 'f'
) q2
GROUP BY referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default
) q1
$$,
referencing_relname_prefix,
confdelsetcols_column_ref
) INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION get_index_defs(schemaname text, tablename text)
RETURNS jsonb AS $func$
DECLARE
result jsonb;
indnullsnotdistinct_column_ref text;
BEGIN
-- Not use indnullsnotdistinct in group by clause if no such column exists.
-- This can only be the case for PG versions < 15.
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_index'::regclass AND attname='indnullsnotdistinct')
THEN
indnullsnotdistinct_column_ref := ',indnullsnotdistinct';
ELSE
indnullsnotdistinct_column_ref := '';
END IF;
EXECUTE format(
$$
SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.indexnames) AS index_defs FROM (
SELECT array_agg(indexname ORDER BY indexrelid) AS indexnames,
array_agg(indexdef ORDER BY indexrelid) AS indexdefs
FROM pg_indexes
JOIN pg_index
ON (indexrelid = (schemaname || '.' || indexname)::regclass)
WHERE schemaname = '%1$s' AND starts_with(tablename, '%2$s')
GROUP BY indnatts, indnkeyatts, indisunique, indisprimary, indisexclusion,
indimmediate, indisclustered, indisvalid, indisready, indislive,
indisreplident, indkey, indcollation, indclass, indoption, indexprs,
indpred %3$s
) q1
$$,
schemaname, tablename, indnullsnotdistinct_column_ref) INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION get_column_defaults(schemaname text, tablename text)
RETURNS jsonb AS $func$
DECLARE
result jsonb;
BEGIN
EXECUTE format(
$$
SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.column_name) AS column_defs FROM (
SELECT column_name, column_default::text, generation_expression::text
FROM information_schema.columns
WHERE table_schema = '%1$s' AND table_name = '%2$s' AND
column_default IS NOT NULL OR generation_expression IS NOT NULL
) q1
$$,
schemaname, tablename) INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION get_column_attrs(relname_prefix text)
RETURNS jsonb AS $func$
DECLARE
result jsonb;
BEGIN
EXECUTE format(
$$
SELECT to_jsonb(q2.*) FROM (
SELECT relnames, jsonb_agg(to_jsonb(q1.*) - 'relnames' ORDER BY q1.column_name) AS column_attrs FROM (
SELECT array_agg(attrelid::regclass::text ORDER BY attrelid) AS relnames,
attname AS column_name, typname AS type_name, collname AS collation_name, attcompression AS compression_method, attnotnull AS not_null
FROM pg_attribute pa
LEFT JOIN pg_type pt ON (pa.atttypid = pt.oid)
LEFT JOIN pg_collation pc1 ON (pa.attcollation = pc1.oid)
JOIN pg_class pc2 ON (pa.attrelid = pc2.oid)
WHERE starts_with(attrelid::regclass::text, '%1$s') AND
attnum > 0 AND NOT attisdropped AND relkind = 'r'
GROUP BY column_name, type_name, collation_name, compression_method, not_null
) q1
GROUP BY relnames
) q2
$$,
relname_prefix) INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- Returns true if all shard placements of given table have given number of indexes.
CREATE OR REPLACE FUNCTION verify_index_count_on_shard_placements(
qualified_table_name text,
n_expected_indexes int)
RETURNS BOOLEAN
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT n_expected_indexes = ALL(
SELECT result::int INTO v_result
FROM run_command_on_placements(
qualified_table_name,
$$SELECT COUNT(*) FROM pg_index WHERE indrelid::regclass = '%s'::regclass$$
)
);
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
-- Returns names of the foreign keys that shards of given table are involved in
-- (as referencing or referenced one).
CREATE OR REPLACE FUNCTION get_fkey_names_on_placements(
qualified_table_name text)
RETURNS TABLE (
on_node text,
shard_id bigint,
fkey_names text[]
)
AS $func$
BEGIN
RETURN QUERY SELECT
CASE WHEN groupid = 0 THEN 'on_coordinator' ELSE 'on_worker' END AS on_node_col,
shardid,
(CASE WHEN result = '' THEN '{}' ELSE result END)::text[] AS fkey_names_col
FROM run_command_on_placements(
qualified_table_name,
$$SELECT array_agg(conname ORDER BY conname) FROM pg_constraint WHERE '%s'::regclass IN (conrelid, confrelid) AND contype = 'f'$$
)
JOIN pg_dist_node USING (nodename, nodeport);
END;
$func$ LANGUAGE plpgsql;
-- Returns true if all shard placements of given table have given number of partitions.
CREATE OR REPLACE FUNCTION verify_partition_count_on_placements(
qualified_table_name text,
n_expected_partitions int)
RETURNS BOOLEAN
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT n_expected_partitions = ALL(
SELECT result::int INTO v_result
FROM run_command_on_placements(
qualified_table_name,
$$SELECT COUNT(*) FROM pg_inherits WHERE inhparent = '%s'::regclass;$$
)
);
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
-- Whether shard is on the coordinator or on a primary worker node, and if this is expected.
-- Given shardid is used for shard placement of the table.
-- Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placement_for_single_shard_table(
qualified_table_name text,
expected_shard_id bigint,
expect_placement_on_coord boolean)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
nodename_nodeport_groupid record;
result boolean;
BEGIN
SELECT nodename, nodeport, groupid INTO nodename_nodeport_groupid
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = 'primary' AND shouldhaveshards AND isactive AND
logicalrelid = qualified_table_name::regclass AND shardid = expected_shard_id;
IF nodename_nodeport_groupid IS NULL
THEN
RAISE NOTICE 'Shard placement is not on a primary worker node';
RETURN false;
END IF;
IF (nodename_nodeport_groupid.groupid = 0) != expect_placement_on_coord
THEN
RAISE NOTICE 'Shard placement is on an unexpected node';
RETURN false;
END IF;
-- verify that metadata on workers is correct too
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_workers($$
SELECT COUNT(*) = 1
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE logicalrelid = ''%s''::regclass AND
shardid = %s AND
nodename = ''%s'' AND
nodeport = %s AND
groupid = %s
$$)
);',
qualified_table_name, expected_shard_id,
nodename_nodeport_groupid.nodename,
nodename_nodeport_groupid.nodeport,
nodename_nodeport_groupid.groupid
)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
-- Shard placement exist on coordinator and on all primary worker nodes.
-- Given shardid is used for shard placements of the table.
-- Given placementid is used for the coordinator shard placement.
-- Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placements_for_reference_table(
qualified_table_name text,
expected_shard_id bigint,
expected_coord_placement_id bigint)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT
(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = ''primary'' AND isactive) =
(SELECT COUNT(*)
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = ''primary'' AND isactive AND
logicalrelid = ''%s''::regclass AND shardid = %s)
AND
(SELECT COUNT(*) = 1
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = ''primary'' AND isactive AND
logicalrelid = ''%s''::regclass AND shardid = %s AND
placementid = %s AND groupid = 0)
$$)
);',
qualified_table_name, expected_shard_id,
qualified_table_name, expected_shard_id,
expected_coord_placement_id
)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given single-shard table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_single_shard_table(
qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT COUNT(*) = 1
FROM pg_dist_partition
WHERE logicalrelid = ''%s''::regclass AND
partmethod = ''n'' AND
partkey IS NULL AND
colocationid > 0 AND
repmodel = ''s'' AND
autoconverted = false
$$)
);',
qualified_table_name)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given reference table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_reference_table(
qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT COUNT(*) = 1
FROM pg_dist_partition
WHERE logicalrelid = ''%s''::regclass AND
partmethod = ''n'' AND
partkey IS NULL AND
colocationid > 0 AND
repmodel = ''t'' AND
autoconverted = false
$$)
);',
qualified_table_name)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- Returns pg_seclabels entries from all nodes in the cluster for which
-- the object name is the input.
CREATE OR REPLACE FUNCTION get_citus_tests_label_provider_labels(object_name text,
master_port INTEGER DEFAULT 57636,
worker_1_port INTEGER DEFAULT 57637,
worker_2_port INTEGER DEFAULT 57638)
RETURNS TABLE (
node_type text,
result text
)
AS $func$
DECLARE
pg_seclabels_cmd TEXT := 'SELECT to_jsonb(q.*) FROM (' ||
'SELECT provider, objtype, label FROM pg_seclabels ' ||
'WHERE objname = ''' || object_name || ''') q';
BEGIN
RETURN QUERY
SELECT
CASE
WHEN nodeport = master_port THEN 'coordinator'
WHEN nodeport = worker_1_port THEN 'worker_1'
WHEN nodeport = worker_2_port THEN 'worker_2'
ELSE 'unexpected_node'
END AS node_type,
a.result
FROM run_command_on_all_nodes(pg_seclabels_cmd) a
JOIN pg_dist_node USING (nodeid)
ORDER BY node_type;
END;
$func$ LANGUAGE plpgsql;
-- For all nodes, returns database properties of given database, except
-- oid, datfrozenxid and datminmxid.
--
-- Also returns whether the node has a pg_dist_object record for the database
-- and whether there are any stale pg_dist_object records for a database.
CREATE OR REPLACE FUNCTION check_database_on_all_nodes(p_database_name text)
RETURNS TABLE (node_type text, result text)
AS $func$
DECLARE
pg_ge_15_options text := '';
pg_ge_16_options text := '';
BEGIN
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'datlocprovider') THEN
pg_ge_15_options := ', daticulocale, datcollversion, datlocprovider';
ELSE
pg_ge_15_options := $$, null as daticulocale, null as datcollversion, 'c' as datlocprovider$$;
END IF;
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'daticurules') THEN
pg_ge_16_options := ', daticurules';
ELSE
pg_ge_16_options := ', null as daticurules';
END IF;
RETURN QUERY
SELECT
CASE WHEN (groupid = 0 AND groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'coordinator (local)'
WHEN (groupid = 0) THEN 'coordinator (remote)'
WHEN (groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'worker node (local)'
ELSE 'worker node (remote)'
END AS node_type,
q2.result
FROM run_command_on_all_nodes(
format(
$$
SELECT to_jsonb(q.*)
FROM (
SELECT
(
SELECT to_jsonb(database_properties.*)
FROM (
SELECT datname, pa.rolname as database_owner,
pg_encoding_to_char(pd.encoding) as encoding,
datistemplate, datallowconn, datconnlimit, datacl,
pt.spcname AS tablespace, datcollate, datctype
%2$s -- >= pg15 options
%3$s -- >= pg16 options
FROM pg_database pd
JOIN pg_authid pa ON pd.datdba = pa.oid
JOIN pg_tablespace pt ON pd.dattablespace = pt.oid
WHERE datname = '%1$s'
) database_properties
) AS database_properties,
(
SELECT COUNT(*)=1
FROM pg_dist_object WHERE objid = (SELECT oid FROM pg_database WHERE datname = '%1$s')
) AS pg_dist_object_record_for_db_exists,
(
SELECT COUNT(*) > 0
FROM pg_dist_object
WHERE classid = 1262 AND objid NOT IN (SELECT oid FROM pg_database)
) AS stale_pg_dist_object_record_for_a_db_exists
) q
$$,
p_database_name, pg_ge_15_options, pg_ge_16_options
)
) q2
JOIN pg_dist_node USING (nodeid);
END;
$func$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION check_database_privileges(role_name text, db_name text, permissions text[])
RETURNS TABLE(permission text, result text)
AS $func$
DECLARE
permission text;
BEGIN
FOREACH permission IN ARRAY permissions
LOOP
RETURN QUERY EXECUTE format($inner$SELECT %s, result FROM run_command_on_all_nodes($$select has_database_privilege(%s,%s,%s); $$)$inner$,
quote_literal(permission), quote_literal(role_name), quote_literal(db_name), quote_literal(permission));
END LOOP;
END;
$func$ LANGUAGE plpgsql;