Skip to content

Commit

Permalink
Implement indexlookupjoin for non-point selection (ydb-platform#2298)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike committed Jun 6, 2024
1 parent ec1169f commit eca0972
Show file tree
Hide file tree
Showing 18 changed files with 967 additions and 255 deletions.
2 changes: 2 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode();
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams();
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.OldLookupJoinBehaviour = serviceConfig.GetOldLookupJoinBehaviour();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto indexAutoChooser = TableServiceConfig.GetIndexAutoChooseMode();

ui64 rangesLimit = TableServiceConfig.GetExtractPredicateRangesLimit();
ui64 idxLookupPointsLimit = TableServiceConfig.GetIdxLookupJoinPointsLimit();
bool oldLookupJoinBehaviour = TableServiceConfig.GetOldLookupJoinBehaviour();

bool enableSequences = TableServiceConfig.GetEnableSequences();
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
Expand Down Expand Up @@ -503,8 +505,10 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableSequences() != enableSequences ||
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetOldLookupJoinBehaviour() != oldLookupJoinBehaviour ||
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit) {
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit) {

QueryCache.Clear();

Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@
"Base": "TKqlReadTableRangesBase",
"Match": {"Type": "Callable", "Name": "KqlReadTableRanges"},
"Children": [
{"Index": 5, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true},
{"Index": 6, "Name": "PredicateExpr", "Type": "TCoLambda", "Optional": true},
{"Index": 7, "Name": "PredicateUsedColumns", "Type": "TCoAtomList", "Optional": true}
]
},
{
Expand All @@ -131,7 +133,9 @@
"Match": {"Type": "Callable", "Name": "TKqlReadTableIndexRanges"},
"Children": [
{"Index": 5, "Name": "Index", "Type": "TCoAtom"},
{"Index": 6, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true}
{"Index": 6, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true},
{"Index": 7, "Name": "PredicateExpr", "Type": "TCoLambda", "Optional": true},
{"Index": 8, "Name": "PredicateUsedColumns", "Type": "TCoAtomList", "Optional": true}
]
},
{
Expand Down
28 changes: 27 additions & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
size_t argCount = (olapTable || index) ? 6 : 5;

// prefix
if (!EnsureMinArgsCount(*node, argCount, ctx) && EnsureMaxArgsCount(*node, argCount + 1, ctx)) {
if (!EnsureMinArgsCount(*node, argCount, ctx) && EnsureMaxArgsCount(*node, argCount + 3, ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -375,8 +375,34 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
}

if (TKqlReadTableRanges::Match(node.Get())) {
if (node->ChildrenSize() > TKqlReadTableRanges::idx_PredicateExpr) {
auto& lambda = node->ChildRef(TKqlReadTableRanges::idx_PredicateExpr);
auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, node->Pos(), withSystemColumns);
if (!rowType) {
return TStatus::Error;
}
if (!UpdateLambdaAllArgumentsTypes(lambda, {rowType}, ctx)) {
return IGraphTransformer::TStatus::Error;
}
if (!lambda->GetTypeAnn()) {
return IGraphTransformer::TStatus::Repeat;
}
}
node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
} else if (TKqlReadTableIndexRanges::Match(node.Get())) {
if (node->ChildrenSize() > TKqlReadTableIndexRanges::idx_PredicateExpr) {
auto& lambda = node->ChildRef(TKqlReadTableIndexRanges::idx_PredicateExpr);
auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, node->Pos(), withSystemColumns);
if (!rowType) {
return TStatus::Error;
}
if (!UpdateLambdaAllArgumentsTypes(lambda, {rowType}, ctx)) {
return IGraphTransformer::TStatus::Error;
}
if (!lambda->GetTypeAnn()) {
return IGraphTransformer::TStatus::Repeat;
}
}
node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
} else if (TKqpReadTableRanges::Match(node.Get())) {
node->SetTypeAnn(ctx.MakeType<TFlowExprType>(rowType));
Expand Down
21 changes: 20 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ using namespace NYql::NNodes;

namespace {

TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
if (!range.From().Maybe<TKqlKeyInc>() || !range.To().Maybe<TKqlKeyInc>()) {
return {};
}
auto rangeFrom = range.From().Cast<TKqlKeyInc>();
auto rangeTo = range.To().Cast<TKqlKeyInc>();

if (rangeFrom.ArgCount() != rangeTo.ArgCount()) {
return {};
}
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) {
return {};
}
}

return rangeFrom;
}

/**
* KQP specific rule to check if a LookupJoin is applicable
*/
Expand Down Expand Up @@ -163,4 +182,4 @@ double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStat
}


}
}
8 changes: 4 additions & 4 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,20 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> PushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx) {
TMaybeNode<TExprBase> PushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
if (!KqpCtx.Config->PredicateExtract20) {
return node;
}
TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx);
TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx, *getParents());
DumpAppliedRule("PushExtractedPredicateToReadTable", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> LatePushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx) {
TMaybeNode<TExprBase> LatePushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
if (KqpCtx.Config->PredicateExtract20) {
return node;
}
TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx);
TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx, *getParents());
DumpAppliedRule("PushExtractedPredicateToReadTable", node.Ptr(), output.Ptr(), ctx);
return output;
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
.ExplainPrompt(read.ExplainPrompt())
.Index(index.Index().Cast())
.PrefixPointsExpr(index.PrefixPointsExpr())
.PredicateExpr(index.PredicateExpr())
.PredicateUsedColumns(index.PredicateUsedColumns())
.Done();
}

Expand All @@ -132,6 +134,8 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.PrefixPointsExpr(readRange.PrefixPointsExpr())
.PredicateExpr(readRange.PredicateExpr())
.PredicateUsedColumns(readRange.PredicateUsedColumns())
.Done();
}

Expand Down
Loading

0 comments on commit eca0972

Please sign in to comment.