Skip to content

Commit

Permalink
allow joins for non-point predicates
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike committed Feb 28, 2024
1 parent bfd5754 commit b252df2
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 60 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams() && serviceConfig.GetEnableAstCache();
kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit();
kqpConfig.EnablePerStatementQueryExecution = serviceConfig.GetEnablePerStatementQueryExecution();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto indexAutoChooser = TableServiceConfig.GetIndexAutoChooseMode();

ui64 rangesLimit = TableServiceConfig.GetExtractPredicateRangesLimit();
ui64 idxLookupPointsLimit = TableServiceConfig.GetIdxLookupJoinPointsLimit();

bool enableSequences = TableServiceConfig.GetEnableSequences();
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
Expand Down Expand Up @@ -517,7 +518,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit) {
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit) {

QueryCache.Clear();

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@
"Base": "TKqlReadTableRangesBase",
"Match": {"Type": "Callable", "Name": "KqlReadTableRanges"},
"Children": [
{"Index": 5, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true},
{"Index": 6, "Name": "PredicateExpr", "Type": "TCoLambda", "Optional": true}
]
},
{
Expand All @@ -131,7 +132,8 @@
"Match": {"Type": "Callable", "Name": "TKqlReadTableIndexRanges"},
"Children": [
{"Index": 5, "Name": "Index", "Type": "TCoAtom"},
{"Index": 6, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true}
{"Index": 6, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true},
{"Index": 7, "Name": "PredicateExpr", "Type": "TCoLambda", "Optional": true}
]
},
{
Expand Down
20 changes: 19 additions & 1 deletion ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
size_t argCount = (olapTable || index) ? 6 : 5;

// prefix
if (!EnsureMinArgsCount(*node, argCount, ctx) && EnsureMaxArgsCount(*node, argCount + 1, ctx)) {
if (!EnsureMinArgsCount(*node, argCount, ctx) && EnsureMaxArgsCount(*node, argCount + 2, ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -375,8 +375,26 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
}

if (TKqlReadTableRanges::Match(node.Get())) {
if (node->ChildrenSize() > TKqlReadTableRanges::idx_PredicateExpr) {
auto& lambda = node->ChildRef(TKqlReadTableRanges::idx_PredicateExpr);
if (!UpdateLambdaAllArgumentsTypes(lambda, {rowType}, ctx)) {
return IGraphTransformer::TStatus::Error;
}
if (!lambda->GetTypeAnn()) {
return IGraphTransformer::TStatus::Repeat;
}
}
node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
} else if (TKqlReadTableIndexRanges::Match(node.Get())) {
if (node->ChildrenSize() > TKqlReadTableIndexRanges::idx_PredicateExpr) {
auto& lambda = node->ChildRef(TKqlReadTableIndexRanges::idx_PredicateExpr);
if (!UpdateLambdaAllArgumentsTypes(lambda, {rowType}, ctx)) {
return IGraphTransformer::TStatus::Error;
}
if (!lambda->GetTypeAnn()) {
return IGraphTransformer::TStatus::Repeat;
}
}
node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
} else if (TKqpReadTableRanges::Match(node.Get())) {
node->SetTypeAnn(ctx.MakeType<TFlowExprType>(rowType));
Expand Down
21 changes: 20 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ using namespace NYql::NNodes;

namespace {

TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
if (!range.From().Maybe<TKqlKeyInc>() || !range.To().Maybe<TKqlKeyInc>()) {
return {};
}
auto rangeFrom = range.From().Cast<TKqlKeyInc>();
auto rangeTo = range.To().Cast<TKqlKeyInc>();

if (rangeFrom.ArgCount() != rangeTo.ArgCount()) {
return {};
}
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) {
return {};
}
}

return rangeFrom;
}

/**
* KQP specific rule to check if a LookupJoin is applicable
*/
Expand Down Expand Up @@ -163,4 +182,4 @@ double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStat
}


}
}
2 changes: 2 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
.ExplainPrompt(read.ExplainPrompt())
.Index(index.Index().Cast())
.PrefixPointsExpr(index.PrefixPointsExpr())
.PredicateExpr(index.PredicateExpr())
.Done();
}

Expand All @@ -132,6 +133,7 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.PrefixPointsExpr(readRange.PrefixPointsExpr())
.PredicateExpr(readRange.PredicateExpr())
.Done();
}

Expand Down
Loading

0 comments on commit b252df2

Please sign in to comment.