Skip to content

Commit

Permalink
[#18095] YSQL: Enable Postgres parallel query
Browse files Browse the repository at this point in the history
Summary:
Enable Postgres' parallel query feature and implement parallel scan of
YB tables in YBSeqScan, IndexScan, IndexOnlyScan nodes.

Feature is enabled in preview mode, that is, it is disabled by default, to enable:
```
set yb_parallel_range_rows  to 10000;
```
indicates the number of estimated rows per parallel worker. Smaller table scans are not parallelized, default 0 effectively disables the feature. The parameter defines minimum number of parallel workers, while `max_parallel_workers_per_gather` works as the maximum.
```
set yb_enable_base_scans_cost_model to true;
```
since parallel query cost improvements are factored in Yugabyte costing functions.
Also make sure the target tables are large and ANALYZE was done for them. Due to
planned  parallelization overhead optimizer selects parallel plan only if it thinks the
target table is large, default 1000 rows would not be sufficient.

The feature depends on the DocDB ability to return key ranges for
parallel scan implemented in D26978. Those key ranges are stored in the
shared memory buffer from where they are taken one at a time by the
parallel workers.

Transaction consistency between parallel workers is ensured by main
backend sharing its session and transaction context.
Jira: DB-7135

Test Plan:
ybd --java-test org.yb.pgsql.TestPgRegressParallel
ybd --cxx-test pggate_test_select --gtest_filter PggateTestSelect.TestSelectHashRanges
ybd --cxx-test pggate_test_select --gtest_filter PggateTestSelect.TestSelectScanRanges

Reviewers: sergei, timur, jason, pjain, tnayak

Reviewed By: pjain, tnayak

Subscribers: ybase, smishra, yql, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D28398
  • Loading branch information
andrei-mart committed Nov 29, 2023
1 parent 33fdb83 commit 2f91e3e
Show file tree
Hide file tree
Showing 58 changed files with 3,906 additions and 343 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.YBTestRunner;

/**
* Runs the pg_regress test suite on YB code.
*/
@RunWith(value=YBTestRunner.class)
public class TestPgRegressParallel extends BasePgSQLTest {
@Override
public int getTestMethodTimeoutSec() {
return 1800;
}

@Test
public void testPgRegressParallel() throws Exception {
runPgRegressTest("yb_parallelquery_serial_schedule");
}
}
31 changes: 20 additions & 11 deletions src/postgres/src/backend/access/transam/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "catalog/pg_enum.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/yb_catalog_version.h"
#include "commands/async.h"
#include "executor/execParallel.h"
#include "libpq/libpq.h"
Expand All @@ -30,6 +31,7 @@
#include "miscadmin.h"
#include "optimizer/planmain.h"
#include "pgstat.h"
#include "pg_yb_utils.h"
#include "storage/ipc.h"
#include "storage/sinval.h"
#include "storage/spin.h"
Expand Down Expand Up @@ -88,6 +90,8 @@ typedef struct FixedParallelState
PGPROC *parallel_master_pgproc;
pid_t parallel_master_pid;
BackendId parallel_master_backend_id;
bool parallel_master_is_yb_session;
uint64_t parallel_master_yb_session_id;
TimestampTz xact_ts;
TimestampTz stmt_ts;

Expand Down Expand Up @@ -331,6 +335,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_pgproc = MyProc;
fps->parallel_master_pid = MyProcPid;
fps->parallel_master_backend_id = MyBackendId;
/* Capture our Session ID to share with the background workers. */
fps->parallel_master_is_yb_session =
YbGetCurrentSessionId(&fps->parallel_master_yb_session_id);
fps->xact_ts = GetCurrentTransactionStartTimestamp();
fps->stmt_ts = GetCurrentStatementStartTimestamp();
SpinLockInit(&fps->mutex);
Expand Down Expand Up @@ -1352,9 +1359,10 @@ ParallelWorkerMain(Datum main_arg)
entrypt = LookupParallelWorkerFunction(library_name, function_name);

/* Restore database connection. */
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
fps->authenticated_user_id,
0);
YbBackgroundWorkerInitializeConnectionByOid(
fps->database_id, fps->authenticated_user_id,
fps->parallel_master_is_yb_session ?
&fps->parallel_master_yb_session_id : NULL, 0);

/*
* Set the client encoding to the database encoding, since that is what
Expand Down Expand Up @@ -1433,6 +1441,15 @@ ParallelWorkerMain(Datum main_arg)
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace);

/*
* TODO Revisit initialization of the catalog cache version.
* DocDB scans running in background workers need a catalog cache version
* to put into the request. However, I'm not sure what is the right way to
* obtain it. Perhaps master scan should share the value it has.
*/
if (IsYugaByteEnabled())
YbUpdateCatalogCacheVersion(YbGetMasterCatalogVersion());

/*
* We've initialized all of our state now; nothing should change
* hereafter.
Expand Down Expand Up @@ -1491,14 +1508,6 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
static void
ParallelWorkerShutdown(int code, Datum arg)
{
/*
* We currently do not support parallel workers, and if we do, we will need to
* enable this behaviour in reaper as well to wrap up any leftovers from suddenly
* terminated processes.
*/
elog(ERROR, "ParallelWorkers are not supported");
Assert(false);

SendProcSignal(ParallelMasterPid,
PROCSIG_PARALLEL_MESSAGE,
ParallelMasterBackendId);
Expand Down
16 changes: 12 additions & 4 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -4970,7 +4970,7 @@ StartSubTransaction(void)

ShowTransactionState("StartSubTransaction");

/*
/*
* Update the value of the sticky objects from parent transaction
*/
if(CurrentTransactionState->parent)
Expand Down Expand Up @@ -5455,9 +5455,17 @@ SerializeTransactionState(Size maxsize, char *start_address)
{
if (TransactionIdIsValid(s->transactionId))
workspace[i++] = s->transactionId;
memcpy(&workspace[i], s->childXids,
s->nChildXids * sizeof(TransactionId));
i += s->nChildXids;
/*
* In Yugabyte it is valid if childXids is NULL, but memcpy's arguments
* are supposed to be not null. Even when Yugabyte is not enabled,
* if s->nChildXids is 0, it is good to skip those no-op lines.
*/
if (s->nChildXids)
{
memcpy(&workspace[i], s->childXids,
s->nChildXids * sizeof(TransactionId));
i += s->nChildXids;
}
}
Assert(i == nxids);

Expand Down
115 changes: 99 additions & 16 deletions src/postgres/src/backend/access/yb_access/yb_lsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,10 @@ ybcinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ScanKey orderbys
{
if (scan->opaque)
{
YbScanDesc ybScan = (YbScanDesc) scan->opaque;
/* For rescan, end the previous scan. */
if (ybScan->pscan)
yb_init_partition_key_data(ybScan->pscan);
ybcinendscan(scan);
scan->opaque = NULL;
}
Expand All @@ -382,6 +385,30 @@ ybcinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ScanKey orderbys
scan->yb_distinct_prefixlen,
scan->yb_exec_params);
scan->opaque = ybScan;
if (scan->parallel_scan)
{
ParallelIndexScanDesc target = scan->parallel_scan;
ScanDirection direction = ForwardScanDirection;
ybScan->pscan = (YBParallelPartitionKeys)
OffsetToPointer(target, target->ps_offset);
Relation rel = scan->indexRelation;
/* If scan is by the PK, use the main relation instead */
if (scan->heapRelation &&
scan->heapRelation->rd_pkindex == RelationGetRelid(rel))
{
elog(LOG, "Scan is by PK, get parallel ranges from the main table");
rel = scan->heapRelation;
}
if (scan->yb_scan_plan)
{
if IsA(scan->yb_scan_plan, IndexScan)
direction = ((IndexScan *) scan->yb_scan_plan)->indexorderdir;
else if IsA(scan->yb_scan_plan, IndexOnlyScan)
direction = ((IndexOnlyScan *) scan->yb_scan_plan)->indexorderdir;
}
ybParallelPrepare(ybScan->pscan, rel, scan->yb_exec_params,
!ScanDirectionIsBackward(direction));
}
}

/*
Expand Down Expand Up @@ -410,23 +437,79 @@ ybcingettuple(IndexScanDesc scan, ScanDirection dir)
* ybc_getnext_indextuple.
*/
if (ybscan->quit_scan)
return NULL;
return false;

scan->xs_recheck = YbNeedsRecheck(ybscan);
if (!ybscan->is_exec_done)
{
/* Request with aggregates does not care of scan direction */
HandleYBStatus(YBCPgExecSelect(ybscan->handle,
ybscan->exec_params));
ybscan->is_exec_done = true;
}

/*
* Aggregate pushdown directly modifies the scan slot rather than
* passing it through xs_hitup or xs_itup.
* In the case of parallel scan we need to obtain boundaries from the
* pscan before the scan is executed. Also empty row from parallel range
* scan does not mean scan is done, it means the range is done and we
* need to pick up next. No rows from parallel range is possible, hence
* the loop.
*/
return ybc_getnext_aggslot(scan, ybscan->handle,
ybscan->prepare_params.index_only_scan);
while (true)
{
/* Need to execute the request */
if (!ybscan->is_exec_done)
{
/* Parallel mode: pick up parallel block first */
if (ybscan->pscan != NULL)
{
YBParallelPartitionKeys parallel_scan = ybscan->pscan;
const char *low_bound;
size_t low_bound_size;
const char *high_bound;
size_t high_bound_size;
/*
* If range is found, apply the boundaries, false means the
* scan * is done for that worker.
*/
if (ybParallelNextRange(parallel_scan,
&low_bound, &low_bound_size,
&high_bound, &high_bound_size))
{
HandleYBStatus(YBCPgDmlBindRange(
ybscan->handle, low_bound, low_bound_size,
high_bound, high_bound_size));
if (low_bound)
pfree((void *) low_bound);
if (high_bound)
pfree((void *) high_bound);
}
else
return false;
/*
* Use unlimited fetch.
* Parallel scan range is already of limited size, it is
* unlikely to exceed the
* message size, but may save some RPCs.
*/
ybscan->exec_params->limit_use_default = true;
ybscan->exec_params->yb_fetch_row_limit = 0;
ybscan->exec_params->yb_fetch_size_limit = 0;
}
/* Request with aggregates does not care of scan direction */
HandleYBStatus(YBCPgExecSelect(ybscan->handle,
ybscan->exec_params));
ybscan->is_exec_done = true;
}

/*
* Aggregate pushdown directly modifies the scan slot rather than
* passing it through xs_hitup or xs_itup.
*/
if (ybc_getnext_aggslot(scan, ybscan->handle,
ybscan->prepare_params.index_only_scan))
return true;
/*
* Parallel scan needs to pick next range and reexecute, done
* otherwise.
*/
if (ybscan->pscan)
ybscan->is_exec_done = false;
else
return false;
}
}

/*
Expand Down Expand Up @@ -487,7 +570,7 @@ ybcinhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false; /* TODO: support parallel scan */
amroutine->amcanparallel = true;
amroutine->amcaninclude = true;
amroutine->amkeytype = InvalidOid;

Expand All @@ -508,8 +591,8 @@ ybcinhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = ybcinendscan;
amroutine->ammarkpos = NULL; /* TODO: support mark/restore pos with ordering */
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL; /* TODO: support parallel scan */
amroutine->aminitparallelscan = NULL;
amroutine->amestimateparallelscan = yb_estimate_parallel_size;
amroutine->aminitparallelscan = yb_init_partition_key_data;
amroutine->amparallelrescan = NULL;
amroutine->yb_aminsert = ybcininsert;
amroutine->yb_amdelete = ybcindelete;
Expand Down
Loading

0 comments on commit 2f91e3e

Please sign in to comment.