Skip to content

Commit

Permalink
[#17872] YSQL: Planner support for distinct pushdown
Browse files Browse the repository at this point in the history
Summary:
#### Objective

While the LSM index itself supports  prefix based distinct skip scans, the planning layer lacks awareness of such support. We make an Index Scan variant called “Distinct Index Scan'' to express such scans. The primary objective of this change is to integrate Distinct Index Scans into YSQL.

#### Design

Implementing distinct pushdown in YSQL requires three broad changes. First, the planner must generate distinct index scan path nodes. Then, these path nodes must be integrated properly into the overall query plan. Last, a few minor changes are required in the execution layer.

##### Generating Distinct Index Paths

Distinct Index Paths are generated along with other index paths. This is generally useful only  when generating paths for a DISTINCT query, bar a few exceptions such as semi joins and UNION queries (unsupported at the moment). To avoid creating another scan path node, the index scan path data structure is reused for distinct index scans as well. However, there are a few fundamental differences. First, distinct index scans are parameterized by the prefix length on the index. Hence, the planner can distinguish a distinct index scan from a normal index scan by simply checking if the prefix length is non-zero. Additionally, recall that a distinct index scan fetches fewer rows from the storage and as a consequence, has a different path cost. Finally, the distinct operation cannot always be pushed down to the base relations because of which distinct index scans cannot always be generated alongside the regular index paths. These differences are discussed in more detail in the following sections.

###### Computing the prefix length

To compute prefix length, first, we determine the set of keys that need to be unique. This change operates only on index-only scan scenarios.  Moreover, this change does not apply in the presence of  unsupported expressions such as aggregates and volatile functions within the requested  column references. With our distinct key set we have just determined, use the shortest prefix that encompasses all these key columns. We look for the shortest possible prefix since using a longer one leads to a poorly performant scan due to fetching more rows than necessary. We can use distinct index scans for queries that request any subset of index columns, not just prefixes, albeit requiring further processing on top.

###### Range partitioned tables

There is a small caveat with the current implementation of distinct pushdown. Range partitioned tables can return duplicate tuples since the DISTINCTification only happens within a tablet. To combat this, we generate a unique node on top of the distinct index scan, as a workaround. Hash partitioned tables do not exhibit this quirk.

###### Path Cost

Accurate costing of distinct index path is not the primary focus of the change. Regardless, we adjust the cost so that it makes a bit more sense. We simply scale the cost down by a rough estimation of the number of duplicate values of the prefix.

##### Distinct Index Scan Integration

In this section, we discuss how distinct queries were previously supported in YSQL, next we understand how pathkeys help in generating these query plans, then we reason why pathkeys do not mesh well with distinct pushdown, and finally we see how distinct queries are supported now with skip scan integration.

###### Distinct query plan generation before distinct pushdown

There are two primary mechanisms to create paths for distinct queries. One is sort based and the other hash based. The hash based method uses a hash aggregation method to remove duplicate values, not too relevant to distinct pushdown. The sort based method, on the other hand, is similar to a skip scan. Here, the input is first sorted and the duplicate values are now easily removed since they are adjacent to each other. This is useful when we expect the input to be already sorted like in the output of index scans or merge sort joins. The planner identifies the correct ordering of such scans/joins by looking at their pathkeys. Pathkeys provide useful functionality such as optimizing away constants and equivalent columns. However, they have their limitations when it comes to pushing down distinct.

###### Using skip scans with distinct queries

With skip scans, there is now a third way to generate a plan for distinct queries. A distinct query can use skip scans to generate a candidate distinct path once they are proven to be sufficiently distinct. While pathkeys provide a good mechanism for ordering proofs, they are inadequate for distinct pushdown. For example, not all distinct queries request leading columns of an index. In such cases, using a skip scan is still useful since they probably fetch fewer rows. We can then use the above techniques to deduplicate the output of the skip scan. Moreover, a distinct query need not request leading columns of an index in the same order as the index. To elaborate, unlike ordering, distinct does not care about the order in which the target keys are specified. This distinction is also visible when it comes to joins. Here, the distinct operation distributes over the join operation but the sort operation does not. For all these reasons, we use a different mechanism to propagate distinctness information to parent pathnodes.

##### Changes during execution time

The changes here are minimal. The primary objective here is to propagate the prefix length computed at the planning time to the underlying storage layer. Moreover, DocDB should use a HybridScan whenever the query layer requests for a prefix based skip scan.

#### Future Work

We will extend support for more complex queries that may involve join trees or those using DISTINCT ON queries.

Jira: DB-6955

Test Plan: ./yb_build.sh --java-test TestPgRegressDistinctPushdown

Reviewers: smishra, mihnea, tnayak

Reviewed By: tnayak

Subscribers: jason, yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D26566
  • Loading branch information
vbalvida committed Aug 9, 2023
1 parent 395c517 commit 5d84faa
Show file tree
Hide file tree
Showing 31 changed files with 3,001 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ public int getTestMethodTimeoutSec() {
}

/**
* Test the GUC variable yb_enable_distinct_pushdown
* Param controls whether to enable pushdown feature for SELECT DISTINCT clauses
* Sets the default value for both the number of masters and tservers.
*/
@Override
protected int getReplicationFactor() {
return 1;
}

/**
* Home for expected plans and regressions tests for
* numerous queries affected by DISTINCT pushdown
*/
@Test
public void testPgRegressDistinctPushdown() throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSelect.java
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ public void testDistinctOnNonPrefixScan() throws Exception {
assertEquals(0, metrics.seekCount);

metrics = assertFullDocDBFilter(statement, query, "idx");
assertEquals(2, metrics.seekCount);
assertEquals(1, metrics.seekCount);
}
}
}
Expand Down
41 changes: 40 additions & 1 deletion src/postgres/src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ static void YbAppendPgMemInfo(ExplainState *es, const Size peakMem);
static void
YbAggregateExplainableRPCRequestStat(ExplainState *es,
const YbInstrumentation *instr);
static void YbExplainDistinctPrefixLen(
int yb_distinct_prefixlen, ExplainState *es);

typedef enum YbStatLabel
{
Expand Down Expand Up @@ -1301,9 +1303,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
break;
case T_IndexScan:
pname = sname = "Index Scan";
if (((IndexScan *) plan)->yb_distinct_prefixlen > 0)
pname = sname = "Distinct Index Scan";
break;
case T_IndexOnlyScan:
pname = sname = "Index Only Scan";
if (((IndexOnlyScan *) plan)->yb_distinct_prefixlen > 0)
pname = sname = "Distinct Index Only Scan";
break;
case T_BitmapIndexScan:
pname = sname = "Bitmap Index Scan";
Expand Down Expand Up @@ -1770,6 +1776,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
*/
show_scan_qual(((IndexScan *) plan)->indexorderbyorig,
"Order By", planstate, ancestors, es);
/*
* YB: Distinct prefix during Distinct Index Scan.
* Shown after ORDER BY clause and before remote filters since
* that's currently the order of operations in DocDB.
*/
YbExplainDistinctPrefixLen(
((IndexScan *) plan)->yb_distinct_prefixlen, es);
show_scan_qual(((IndexScan *) plan)->yb_idx_pushdown.quals,
"Remote Index Filter", planstate, ancestors, es);
show_scan_qual(((IndexScan *) plan)->yb_rel_pushdown.quals,
Expand Down Expand Up @@ -1798,6 +1811,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
planstate, es);
show_scan_qual(((IndexOnlyScan *) plan)->indexorderby,
"Order By", planstate, ancestors, es);
/*
* YB: Distinct prefix during HybridScan.
* Shown after ORDER BY clause and before remote filters since
* that's currently the order of operations in DocDB.
*/
YbExplainDistinctPrefixLen(
((IndexOnlyScan *) plan)->yb_distinct_prefixlen, es);
/*
* Remote filter is applied first, so it is output first.
*/
Expand Down Expand Up @@ -3223,7 +3243,7 @@ show_yb_rpc_stats(PlanState *planstate, bool indexScan, ExplainState *es)
{
YbInstrumentation *yb_instr = &planstate->instrument->yb_instr;
double nloops = planstate->instrument->nloops;

/* Read stats */
double table_reads = yb_instr->tbl_reads.count / nloops;
double table_read_wait = yb_instr->tbl_reads.wait_time / nloops;
Expand Down Expand Up @@ -4259,3 +4279,22 @@ YbAggregateExplainableRPCRequestStat(ExplainState *es,
es->yb_stats.flush.count += yb_instr->write_flushes.count;
es->yb_stats.flush.wait_time += yb_instr->write_flushes.wait_time;
}

/*
* YB:
* Explain Output
* --------------
* Distinct Index Scan
* ...
* Distinct Prefix: <prefix length>
* ...
*
* Adds Distinct Prefix to explain info
*/
static void
YbExplainDistinctPrefixLen(int yb_distinct_prefixlen, ExplainState *es)
{
if (yb_distinct_prefixlen > 0)
ExplainPropertyInteger(
"Distinct Prefix", NULL, yb_distinct_prefixlen, es);
}
12 changes: 0 additions & 12 deletions src/postgres/src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1795,18 +1795,6 @@ ExecAgg(PlanState *pstate)
if (IsYugaByteEnabled())
{
pstate->state->yb_exec_params.limit_use_default = true;

// Currently, postgres employs an "optimization" where it requests the
// complete heap tuple from the executor whenever possible so as to
// avoid unnecessary copies
// See the comment in create_scan_plan (create_plan.c) for more info
//
// However, this "optimization" is not always in effect and here we guard
// against any undesirable prefix based filtering in the presence of
// aggregate targets. More importantly, the current behavior to
// retrieve the complete tuple is not necessarily optimal for
// remote storage such as DocDB and this may change in the future
pstate->state->yb_exec_params.yb_can_pushdown_distinct = false;
}

/* Dispatch based on strategy */
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/backend/executor/nodeIndexonlyscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)
indexstate->ioss_RuntimeKeysReady = false;
indexstate->ioss_RuntimeKeys = NULL;
indexstate->ioss_NumRuntimeKeys = 0;
/* YB: Prefix length parameter passed to DocDB. */
estate->yb_exec_params.yb_distinct_prefixlen = node->yb_distinct_prefixlen;

/*
* build the index scan keys from the index qualification
Expand Down
8 changes: 5 additions & 3 deletions src/postgres/src/backend/executor/nodeIndexscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,8 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
indexstate->iss_RuntimeKeysReady = false;
indexstate->iss_RuntimeKeys = NULL;
indexstate->iss_NumRuntimeKeys = 0;
/* YB: Prefix length parameter passed to DocDB. */
estate->yb_exec_params.yb_distinct_prefixlen = node->yb_distinct_prefixlen;

/*
* build the index scan keys from the index qualification
Expand Down Expand Up @@ -1239,7 +1241,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
* For now, these cases are generated for batched nested loop joins in
* yb_zip_batched_exprs() in restrictinfo.c during indexscan
* plan node generation.
*
*
*
* 5. NullTest ("indexkey IS NULL/IS NOT NULL"). We just fill in the
* ScanKey properly.
Expand Down Expand Up @@ -1586,7 +1588,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
this_scan_key->sk_argument = PointerGetDatum(first_sub_key);
}
else if (IsA(clause, ScalarArrayOpExpr) &&
(!IsYugaByteEnabled() ||
(!IsYugaByteEnabled() ||
!IsA(yb_get_saop_left_op(clause), RowExpr)))
{
Assert(!IsYugaByteEnabled() ||
Expand Down Expand Up @@ -1766,7 +1768,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
this_key = &first_sub_key[n_sub_key];
op_strategy = BTEqualStrategyNumber;
op_righttype = InvalidOid;

if (varattno < 1 || varattno > indnkeyatts)
elog(ERROR, "bogus index qualification");

Expand Down
9 changes: 0 additions & 9 deletions src/postgres/src/backend/executor/nodeUnique.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ ExecUnique(PlanState *pstate)

CHECK_FOR_INTERRUPTS();

/*
* SELECT DISTINCT is only enabled for an index scan. Specifically, for a scan on hash columns,
* the index scan will not be used.
*
* `yb_can_pushdown_distinct` controls whether or not the DISTINCT operation is pushed down
*/
if (IsYugaByteEnabled())
pstate->state->yb_exec_params.yb_can_pushdown_distinct = yb_enable_distinct_pushdown;

/*
* get information from the node
*/
Expand Down
6 changes: 4 additions & 2 deletions src/postgres/src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ _copyIndexScan(const IndexScan *from)
COPY_NODE_FIELD(yb_idx_pushdown.colrefs);
COPY_NODE_FIELD(yb_rel_pushdown.quals);
COPY_NODE_FIELD(yb_rel_pushdown.colrefs);
COPY_SCALAR_FIELD(yb_distinct_prefixlen);

return newnode;
}
Expand Down Expand Up @@ -550,6 +551,7 @@ _copyIndexOnlyScan(const IndexOnlyScan *from)
COPY_NODE_FIELD(yb_pushdown.quals);
COPY_NODE_FIELD(yb_pushdown.colrefs);
COPY_NODE_FIELD(yb_indexqual_for_recheck);
COPY_SCALAR_FIELD(yb_distinct_prefixlen);

return newnode;
}
Expand Down Expand Up @@ -908,7 +910,7 @@ _copyYbBatchedNestLoop(const YbBatchedNestLoop *from)
COPY_POINTER_FIELD(
hashClauseInfos,
from->num_hashClauseInfos * sizeof(YbBNLHashClauseInfo));

for (int i = 0; i < from->num_hashClauseInfos; i++)
{
newnode->hashClauseInfos[i].outerParamExpr =
Expand Down Expand Up @@ -5870,7 +5872,7 @@ copyObjectImpl(const void *from)
case T_YbExprColrefDesc:
retval = _copyYbExprColrefDesc(from);
break;

case T_YbBatchedExpr:
retval = _copyYbBatchedExpr(from);
break;
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ _outIndexScan(StringInfo str, const IndexScan *node)
WRITE_NODE_FIELD(yb_idx_pushdown.colrefs);
WRITE_NODE_FIELD(yb_rel_pushdown.quals);
WRITE_NODE_FIELD(yb_rel_pushdown.colrefs);
WRITE_INT_FIELD(yb_distinct_prefixlen);
}

static void
Expand All @@ -609,6 +610,7 @@ _outIndexOnlyScan(StringInfo str, const IndexOnlyScan *node)
WRITE_ENUM_FIELD(indexorderdir, ScanDirection);
WRITE_NODE_FIELD(yb_pushdown.quals);
WRITE_NODE_FIELD(yb_pushdown.colrefs);
WRITE_INT_FIELD(yb_distinct_prefixlen);
}

static void
Expand Down
6 changes: 4 additions & 2 deletions src/postgres/src/backend/nodes/readfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,7 @@ _readIndexScan(void)
READ_NODE_FIELD(yb_idx_pushdown.colrefs);
READ_NODE_FIELD(yb_rel_pushdown.quals);
READ_NODE_FIELD(yb_rel_pushdown.colrefs);
READ_INT_FIELD(yb_distinct_prefixlen);

READ_DONE();
}
Expand All @@ -1819,6 +1820,7 @@ _readIndexOnlyScan(void)
READ_ENUM_FIELD(indexorderdir, ScanDirection);
READ_NODE_FIELD(yb_pushdown.quals);
READ_NODE_FIELD(yb_pushdown.colrefs);
READ_INT_FIELD(yb_distinct_prefixlen);

READ_DONE();
}
Expand Down Expand Up @@ -2101,12 +2103,12 @@ _readYbBatchedNestLoop(void)

YbBNLHashClauseInfo *current_hinfo = local_node->hashClauseInfos;
for (int i = 0; i < num_hashClauseInfos; i++)
{
{
char *tok = pg_strtok(&length);
(void) tok;
tok = pg_strtok(&length);
current_hinfo->hashOp = atoi(tok);

tok = pg_strtok(&length);
(void) tok;
tok = pg_strtok(&length);
Expand Down
Loading

0 comments on commit 5d84faa

Please sign in to comment.