From eca09727323b2276564bc411b145e44f1d110674 Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Sat, 9 Mar 2024 20:16:07 +0300 Subject: [PATCH] Implement indexlookupjoin for non-point selection (#2298) --- .../kqp/compile_service/kqp_compile_actor.cpp | 2 + .../compile_service/kqp_compile_service.cpp | 6 +- ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 8 +- ydb/core/kqp/host/kqp_type_ann.cpp | 28 +- ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp | 21 +- ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 8 +- .../kqp/opt/logical/kqp_opt_log_extract.cpp | 4 + .../kqp/opt/logical/kqp_opt_log_helpers.cpp | 404 ++++++++++++++++++ ydb/core/kqp/opt/logical/kqp_opt_log_impl.h | 23 + ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 341 +++++++-------- .../logical/kqp_opt_log_ranges_predext.cpp | 30 +- ydb/core/kqp/opt/logical/kqp_opt_log_rules.h | 2 +- .../kqp/opt/logical/kqp_opt_log_sqlin.cpp | 91 +--- .../kqp/provider/yql_kikimr_provider_impl.h | 3 + ydb/core/kqp/provider/yql_kikimr_settings.h | 2 + ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 14 + ydb/core/kqp/ut/join/kqp_join_ut.cpp | 232 ++++++++++ ydb/core/protos/table_service_config.proto | 3 + 18 files changed, 967 insertions(+), 255 deletions(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 283c6a2cde88..2d1daba6b006 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -506,6 +506,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode(); kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams(); kqpConfig.ExtractPredicateRangesLimit = serviceConfig.GetExtractPredicateRangesLimit(); + kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit(); + kqpConfig.OldLookupJoinBehaviour = serviceConfig.GetOldLookupJoinBehaviour(); if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) { kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U)); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index b0da00f2e888..2b228f338074 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -474,6 +474,8 @@ class TKqpCompileService : public TActorBootstrapped { auto indexAutoChooser = TableServiceConfig.GetIndexAutoChooseMode(); ui64 rangesLimit = TableServiceConfig.GetExtractPredicateRangesLimit(); + ui64 idxLookupPointsLimit = TableServiceConfig.GetIdxLookupJoinPointsLimit(); + bool oldLookupJoinBehaviour = TableServiceConfig.GetOldLookupJoinBehaviour(); bool enableSequences = TableServiceConfig.GetEnableSequences(); bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault(); @@ -503,8 +505,10 @@ class TKqpCompileService : public TActorBootstrapped { TableServiceConfig.GetEnableSequences() != enableSequences || TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault || TableServiceConfig.GetEnableOlapSink() != enableOlapSink || + TableServiceConfig.GetOldLookupJoinBehaviour() != oldLookupJoinBehaviour || TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit || - TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit) { + TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit || + TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit) { QueryCache.Clear(); diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index e25e12ced530..0cf0a1611e7c 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -104,7 +104,9 @@ "Base": "TKqlReadTableRangesBase", "Match": {"Type": "Callable", "Name": "KqlReadTableRanges"}, "Children": [ - {"Index": 5, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true} + {"Index": 5, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true}, + {"Index": 6, "Name": "PredicateExpr", "Type": "TCoLambda", "Optional": true}, + {"Index": 7, "Name": "PredicateUsedColumns", "Type": "TCoAtomList", "Optional": true} ] }, { @@ -131,7 +133,9 @@ "Match": {"Type": "Callable", "Name": "TKqlReadTableIndexRanges"}, "Children": [ {"Index": 5, "Name": "Index", "Type": "TCoAtom"}, - {"Index": 6, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true} + {"Index": 6, "Name": "PrefixPointsExpr", "Type": "TExprBase", "Optional": true}, + {"Index": 7, "Name": "PredicateExpr", "Type": "TCoLambda", "Optional": true}, + {"Index": 8, "Name": "PredicateUsedColumns", "Type": "TCoAtomList", "Optional": true} ] }, { diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 40f2aef9b065..bed9f6024340 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -340,7 +340,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, size_t argCount = (olapTable || index) ? 6 : 5; // prefix - if (!EnsureMinArgsCount(*node, argCount, ctx) && EnsureMaxArgsCount(*node, argCount + 1, ctx)) { + if (!EnsureMinArgsCount(*node, argCount, ctx) && EnsureMaxArgsCount(*node, argCount + 3, ctx)) { return TStatus::Error; } @@ -375,8 +375,34 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, } if (TKqlReadTableRanges::Match(node.Get())) { + if (node->ChildrenSize() > TKqlReadTableRanges::idx_PredicateExpr) { + auto& lambda = node->ChildRef(TKqlReadTableRanges::idx_PredicateExpr); + auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, node->Pos(), withSystemColumns); + if (!rowType) { + return TStatus::Error; + } + if (!UpdateLambdaAllArgumentsTypes(lambda, {rowType}, ctx)) { + return IGraphTransformer::TStatus::Error; + } + if (!lambda->GetTypeAnn()) { + return IGraphTransformer::TStatus::Repeat; + } + } node->SetTypeAnn(ctx.MakeType(rowType)); } else if (TKqlReadTableIndexRanges::Match(node.Get())) { + if (node->ChildrenSize() > TKqlReadTableIndexRanges::idx_PredicateExpr) { + auto& lambda = node->ChildRef(TKqlReadTableIndexRanges::idx_PredicateExpr); + auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, node->Pos(), withSystemColumns); + if (!rowType) { + return TStatus::Error; + } + if (!UpdateLambdaAllArgumentsTypes(lambda, {rowType}, ctx)) { + return IGraphTransformer::TStatus::Error; + } + if (!lambda->GetTypeAnn()) { + return IGraphTransformer::TStatus::Repeat; + } + } node->SetTypeAnn(ctx.MakeType(rowType)); } else if (TKqpReadTableRanges::Match(node.Get())) { node->SetTypeAnn(ctx.MakeType(rowType)); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp b/ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp index fed03ea383db..8c30e3db9777 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp @@ -14,6 +14,25 @@ using namespace NYql::NNodes; namespace { +TMaybeNode GetRightTableKeyPrefix(const TKqlKeyRange& range) { + if (!range.From().Maybe() || !range.To().Maybe()) { + return {}; + } + auto rangeFrom = range.From().Cast(); + auto rangeTo = range.To().Cast(); + + if (rangeFrom.ArgCount() != rangeTo.ArgCount()) { + return {}; + } + for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) { + if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) { + return {}; + } + } + + return rangeFrom; +} + /** * KQP specific rule to check if a LookupJoin is applicable */ @@ -163,4 +182,4 @@ double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStat } -} \ No newline at end of file +} diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 7ec98aa7f569..e53cc909468c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -86,20 +86,20 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { return output; } - TMaybeNode PushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx) { + TMaybeNode PushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { if (!KqpCtx.Config->PredicateExtract20) { return node; } - TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx); + TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx, *getParents()); DumpAppliedRule("PushExtractedPredicateToReadTable", node.Ptr(), output.Ptr(), ctx); return output; } - TMaybeNode LatePushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx) { + TMaybeNode LatePushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { if (KqpCtx.Config->PredicateExtract20) { return node; } - TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx); + TExprBase output = KqpPushExtractedPredicateToReadTable(node, ctx, KqpCtx, TypesCtx, *getParents()); DumpAppliedRule("PushExtractedPredicateToReadTable", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index 72fe19fd8a11..22fd070e2194 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -121,6 +121,8 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext& .ExplainPrompt(read.ExplainPrompt()) .Index(index.Index().Cast()) .PrefixPointsExpr(index.PrefixPointsExpr()) + .PredicateExpr(index.PredicateExpr()) + .PredicateUsedColumns(index.PredicateUsedColumns()) .Done(); } @@ -132,6 +134,8 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext& .Settings(read.Settings()) .ExplainPrompt(read.ExplainPrompt()) .PrefixPointsExpr(readRange.PrefixPointsExpr()) + .PredicateExpr(readRange.PredicateExpr()) + .PredicateUsedColumns(readRange.PredicateUsedColumns()) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_helpers.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_helpers.cpp index 17d577c2f330..9ce094d511bb 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_helpers.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_helpers.cpp @@ -7,6 +7,397 @@ namespace NKikimr::NKqp::NOpt { using namespace NYql; using namespace NYql::NNodes; +namespace { + +TExprBase MakeLT(TExprBase left, TExprBase right, TExprContext& ctx, TPositionHandle pos) { + return Build(ctx, pos) + .Add() + .Add().Value().Optional(left).Build().Build() + .Add().Optional(right).Build() + .Build() + .Add() + .Left(left) + .Right(right) + .Build() + .Done(); +} + +TExprBase MakeEQ(TExprBase left, TExprBase right, TExprContext& ctx, TPositionHandle pos) { + return Build(ctx, pos) + .Add() + .Add().Value().Optional(left).Build().Build() + .Add().Value().Optional(right).Build().Build() + .Build() + .Add() + .Left(left) + .Right(right) + .Build() + .Done(); +} + +TExprBase MakeLE(TExprBase left, TExprBase right, TExprContext& ctx, TPositionHandle pos) { + return Build(ctx, pos) + .Add().Value().Optional(left).Build().Build() + .Add() + .Left(left) + .Right(right) + .Build() + .Done(); +} + +TCoAtomList MakeAllColumnsList(const NYql::TKikimrTableDescription & tableDesc, TExprContext& ctx, TPositionHandle pos) { + TVector columns; + for (auto& [column, _] : tableDesc.Metadata->Columns) { + columns.push_back(Build(ctx, pos).Value(column).Done()); + } + return Build(ctx, pos).Add(columns).Done(); +}; + +TMaybe RewriteReadToPrefixLookup(TKqlReadTableBase read, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + TString lookupTable; + TString indexName; + + TMaybeNode lookupColumns; + size_t prefixSize; + TMaybeNode prefixExpr; + TMaybeNode extraFilter; + TMaybe> usedColumns; + + if (!read.template Maybe() && !read.template Maybe()) { + return {}; + } + + if (!read.Table().SysView().Value().empty()) { + // Can't lookup in system views + return {}; + } + + if (auto indexRead = read.template Maybe()) { + indexName = indexRead.Cast().Index().StringValue(); + lookupTable = GetIndexMetadata(indexRead.Cast(), *kqpCtx.Tables, kqpCtx.Cluster)->Name; + } else { + lookupTable = read.Table().Path().StringValue(); + } + const auto& rightTableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookupTable); + const auto& mainTableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path().StringValue()); + + auto from = read.Range().From(); + auto to = read.Range().To(); + + usedColumns.ConstructInPlace(); + prefixSize = 0; + while (prefixSize < from.ArgCount() && prefixSize < to.ArgCount()) { + if (from.Arg(prefixSize).Raw() != to.Arg(prefixSize).Raw()) { + break; + } + usedColumns->insert(rightTableDesc.Metadata->KeyColumnNames[prefixSize]); + ++prefixSize; + } + + lookupColumns = read.Columns(); + + // we don't need to make filter for point selection + if (!(prefixSize == from.ArgCount() && + prefixSize == to.ArgCount() && + from.template Maybe() && + to.template Maybe())) + { + extraFilter = MakeFilterForRange(read.Range(), ctx, read.Range().Pos(), rightTableDesc.Metadata->KeyColumnNames); + lookupColumns = MakeAllColumnsList(mainTableDesc, ctx, read.Pos()); + } + + TVector columns; + for (size_t i = 0; i < prefixSize; ++i) { + columns.push_back(TExprBase(from.Arg(i))); + } + + prefixExpr = Build(ctx, read.Pos()) + .Add() + .Add(columns) + .Build() + .Done(); + + Y_ENSURE(prefixExpr.IsValid()); + + return NKikimr::NKqp::NOpt::TPrefixLookup { + .LookupColumns = lookupColumns.Cast(), + .ResultColumns = read.Columns(), + + .Filter = extraFilter, + .FilterUsedColumnsHint = usedColumns, + + .PrefixSize = prefixSize, + .PrefixExpr = prefixExpr.Cast(), + + .LookupTableName = lookupTable, + .MainTable = read.Table(), + .IndexName = indexName, + }; +} + +TMaybe RewriteReadToPrefixLookup(TKqlReadTableRangesBase read, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, TMaybe maxKeys) { + TString lookupTable; + TString indexName; + + TMaybeNode lookupColumns; + size_t prefixSize; + TMaybeNode prefixExpr; + TMaybeNode extraFilter; + TMaybe> usedColumns; + + if (!read.template Maybe() && !read.template Maybe()) { + return {}; + } + + if (!read.Table().SysView().Value().empty()) { + // Can't lookup in system views + return {}; + } + + lookupColumns = read.Columns(); + + if (auto indexRead = read.template Maybe()) { + const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, read.Table().Path()); + const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(indexRead.Index().Cast().StringValue()); + lookupTable = indexMeta->Name; + indexName = indexRead.Cast().Index().StringValue(); + } else { + lookupTable = read.Table().Path().StringValue(); + } + + if (TCoVoid::Match(read.Ranges().Raw())) { + prefixSize = 0; + prefixExpr = Build(ctx, read.Pos()) + .Input().Build() + .Done(); + } else { + auto prompt = TKqpReadTableExplainPrompt::Parse(read); + + prefixSize = prompt.PointPrefixLen; + + const auto& rightTableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookupTable); + const auto& mainTableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path().StringValue()); + + TMaybeNode rowsExpr; + TMaybeNode filter; + TMaybeNode usedColumnsList; + if (read.Maybe()) { + rowsExpr = read.Cast().PrefixPointsExpr(); + filter = read.Cast().PredicateExpr(); + usedColumnsList = read.Cast().PredicateUsedColumns(); + } + if (read.Maybe()) { + rowsExpr = read.Cast().PrefixPointsExpr(); + filter = read.Cast().PredicateExpr(); + usedColumnsList = read.Cast().PredicateUsedColumns(); + } + + if (!rowsExpr.IsValid()) { + return {}; + } + + if (maxKeys && (!prompt.ExpectedMaxRanges || *prompt.ExpectedMaxRanges > *maxKeys)) { + return {}; + } + + // we don't need to make filter for point selection + if (prompt.PointPrefixLen != prompt.UsedKeyColumns.size()) { + if (!filter.IsValid() || !usedColumnsList.IsValid()) { + return {}; + } + usedColumns.ConstructInPlace(); + for (auto&& column : usedColumnsList.Cast()) { + usedColumns->insert(column.StringValue()); + } + extraFilter = filter; + lookupColumns = MakeAllColumnsList(mainTableDesc, ctx, read.Pos()); + } + + size_t prefixLen = prompt.PointPrefixLen; + TVector keyColumns; + for (size_t i = 0; i < prefixLen; ++i) { + YQL_ENSURE(i < rightTableDesc.Metadata->KeyColumnNames.size()); + keyColumns.push_back(rightTableDesc.Metadata->KeyColumnNames[i]); + } + + + auto rowArg = Build(ctx, read.Pos()) + .Name("rowArg") + .Done(); + + TVector components; + for (auto column : keyColumns) { + TCoAtom columnAtom(ctx.NewAtom(read.Ranges().Pos(), column)); + components.push_back( + Build(ctx, read.Ranges().Pos()) + .Struct(rowArg) + .Name(columnAtom) + .Done()); + } + + prefixExpr = Build(ctx, read.Pos()) + .Input(rowsExpr.Cast()) + .Lambda() + .Args({rowArg}) + .Body() + .Add(components) + .Build() + .Build() + .Done(); + } + + Y_ENSURE(prefixExpr.IsValid()); + + return TPrefixLookup{ + .LookupColumns = lookupColumns.Cast(), + .ResultColumns = read.Columns(), + + .Filter = extraFilter, + .FilterUsedColumnsHint = usedColumns, + + .PrefixSize = prefixSize, + .PrefixExpr = prefixExpr.Cast(), + + .LookupTableName = lookupTable, + .MainTable = read.Table(), + .IndexName = indexName, + }; +} + +} // namespace + +TCoLambda MakeFilterForRange(TKqlKeyRange range, TExprContext& ctx, TPositionHandle pos, TVector keyColumns) { + size_t prefix = 0; + auto arg = Build(ctx, pos).Name("_row_arg").Done(); + TVector conds; + while (prefix < range.From().ArgCount() && prefix < range.To().ArgCount()) { + auto column = Build(ctx, pos).Struct(arg).Name().Build(keyColumns[prefix]).Done(); + if (range.From().Arg(prefix).Raw() == range.To().Arg(prefix).Raw()) { + if (prefix + 1 == range.From().ArgCount() && range.From().Maybe()) { + break; + } + if (prefix + 1 == range.To().ArgCount() && range.To().Maybe()) { + break; + } + } else { + break; + } + conds.push_back(MakeEQ(column, range.From().Arg(prefix), ctx, pos)); + prefix += 1; + } + + { + TMaybeNode tupleComparison; + for (ssize_t i = static_cast(range.From().ArgCount()) - 1; i >= static_cast(prefix); --i) { + auto column = Build(ctx, pos).Struct(arg).Name().Build(keyColumns[i]).Done(); + if (tupleComparison.IsValid()) { + tupleComparison = Build(ctx, pos) + .Add(MakeLT(range.From().Arg(i), column, ctx, pos)) + .Add() + .Add(MakeEQ(range.From().Arg(i), column, ctx, pos)) + .Add(tupleComparison.Cast()) + .Build() + .Done(); + } else { + if (range.From().Maybe()) { + tupleComparison = MakeLE(range.From().Arg(i), column, ctx, pos); + } else { + tupleComparison = MakeLT(range.From().Arg(i), column, ctx, pos); + } + } + } + + if (tupleComparison.IsValid()) { + conds.push_back(tupleComparison.Cast()); + } + } + + { + TMaybeNode tupleComparison; + for (ssize_t i = static_cast(range.To().ArgCount()) - 1; i >= static_cast(prefix); --i) { + auto column = Build(ctx, pos).Struct(arg).Name().Build(keyColumns[i]).Done(); + if (tupleComparison.IsValid()) { + tupleComparison = Build(ctx, pos) + .Add(MakeLT(column, range.To().Arg(i), ctx, pos)) + .Add() + .Add(MakeEQ(column, range.To().Arg(i), ctx, pos)) + .Add(tupleComparison.Cast()) + .Build() + .Done(); + } else { + if (range.To().Maybe()) { + tupleComparison = MakeLE(column, range.To().Arg(i), ctx, pos); + } else { + tupleComparison = MakeLT(column, range.To().Arg(i), ctx, pos); + } + } + } + + if (tupleComparison.IsValid()) { + conds.push_back(tupleComparison.Cast()); + } + } + + return Build(ctx, pos) + .Args({arg}) + .Body() + .Predicate() + .Predicate() + .Add(conds) + .Build() + .Value() + .Literal().Build("false") + .Build() + .Build() + .Value(arg) + .Build() + .Done(); +} + +bool ExtractUsedFields(const TExprNode::TPtr& start, const TExprNode& arg, TSet& usedFields, const TParentsMap& parentsMap, bool allowDependsOn) { + const TTypeAnnotationNode* argType = RemoveOptionalType(arg.GetTypeAnn()); + if (argType->GetKind() != ETypeAnnotationKind::Struct) { + return false; + } + + if (&arg == start.Get()) { + return true; + } + + const auto inputStructType = argType->Cast(); + if (!IsDepended(*start, arg)) { + return true; + } + + TNodeSet nodes; + VisitExpr(start, [&](const TExprNode::TPtr& node) { + nodes.insert(node.Get()); + return true; + }); + + const auto parents = parentsMap.find(&arg); + YQL_ENSURE(parents != parentsMap.cend()); + for (const auto& parent : parents->second) { + if (nodes.cend() == nodes.find(parent)) { + continue; + } + + if (parent->IsCallable("Member")) { + usedFields.emplace(parent->Tail().Content()); + } else if (allowDependsOn && parent->IsCallable("DependsOn")) { + continue; + } else { + // unknown node + for (auto&& item : inputStructType->GetItems()) { + usedFields.emplace(item->GetName()); + } + return true; + } + } + + return true; +} + TExprBase TKqpMatchReadResult::BuildProcessNodes(TExprBase input, TExprContext& ctx) const { auto expr = input; @@ -81,4 +472,17 @@ TMaybe MatchRead(TExprBase node, std::function RewriteReadToPrefixLookup(TExprBase read, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, TMaybe maxKeys) { + if (maxKeys == TMaybe(0)) { + return {}; + } + if (auto readTable = read.Maybe()) { + return RewriteReadToPrefixLookup(readTable.Cast(), ctx, kqpCtx); + } else { + auto readRanges = read.Maybe(); + YQL_ENSURE(readRanges); + return RewriteReadToPrefixLookup(readRanges.Cast(), ctx, kqpCtx, maxKeys); + } +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_impl.h b/ydb/core/kqp/opt/logical/kqp_opt_log_impl.h index 5ff32edc6eeb..701994659753 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_impl.h +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_impl.h @@ -24,6 +24,29 @@ TMaybe MatchRead(NYql::NNodes::TExprBase node) { NYql::NNodes::TMaybeNode GetRightTableKeyPrefix(const NYql::NNodes::TKqlKeyRange& range); +NYql::NNodes::TCoLambda MakeFilterForRange(NYql::NNodes::TKqlKeyRange range, NYql::TExprContext& ctx, NYql::TPositionHandle pos, TVector keyColumns); + +bool ExtractUsedFields(const NYql::TExprNode::TPtr& start, const NYql::TExprNode& arg, TSet& usedFields, const NYql::TParentsMap& parentsMap, bool allowDependsOn); + +struct TPrefixLookup { + NYql::NNodes::TCoAtomList LookupColumns; + NYql::NNodes::TCoAtomList ResultColumns; + + NYql::NNodes::TMaybeNode Filter; + TMaybe> FilterUsedColumnsHint; + + size_t PrefixSize; + NYql::NNodes::TExprBase PrefixExpr; + + TString LookupTableName; + + NYql::NNodes::TKqpTable MainTable; + TString IndexName; +}; + +// Try to rewrite arbitrary table read to (ExtractMembers (Filter (Lookup LookupColumns) ResultColumns) +TMaybe RewriteReadToPrefixLookup(NYql::NNodes::TExprBase read, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, TMaybe maxKeys); + } // NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 47affa3e19bc..874afc44c38b 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -320,22 +320,101 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) { //#define DBG(...) YQL_CLOG(DEBUG, ProviderKqp) << __VA_ARGS__ #define DBG(...) -template -TMaybeNode BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, const TKqpMatchReadResult& rightReadMatch, TExprContext& ctx) { +TMaybeNode BuildKqpStreamIndexLookupJoin( + const TDqJoin& join, + TExprBase leftInput, + const TPrefixLookup& rightLookup, + + const TKqpMatchReadResult& rightReadMatch, + TExprContext& ctx) +{ TString leftLabel = join.LeftLabel().Maybe() ? TString(join.LeftLabel().Cast().Value()) : ""; TString rightLabel = join.RightLabel().Maybe() ? TString(join.RightLabel().Cast().Value()) : ""; - auto rightRead = rightReadMatch.Read.template Cast(); + + TMaybeNode lookupColumns; + if (auto read = rightReadMatch.Read.Maybe()) { + lookupColumns = read.Columns().Cast(); + } else { + auto readRanges = rightReadMatch.Read.Maybe(); + lookupColumns = readRanges.Columns().Cast(); + } + + TMaybeNode extraRightFilter = rightLookup.Filter; + + if (extraRightFilter.IsValid()) { + const TSet& usedColumns = *rightLookup.FilterUsedColumnsHint; + if (rightLookup.FilterUsedColumnsHint) { + TSet lookupColumnsSet; + for (auto&& column : lookupColumns.Cast()) { + lookupColumnsSet.insert(column.StringValue()); + } + bool rebuildColumns = false; + for (auto& column : usedColumns) { + if (!lookupColumnsSet.contains(column)) { + lookupColumnsSet.insert(column); + rebuildColumns = true; + } + } + // we should expand list of read columns + // narrow it immediately after filter + if (rebuildColumns) { + TVector newColumns; + auto pos = extraRightFilter.Cast().Pos(); + for (auto& column : lookupColumnsSet) { + newColumns.push_back(Build(ctx, pos).Value(column).Done()); + } + auto arg = Build(ctx, pos).Name("_extract_members_arg").Done(); + extraRightFilter = Build(ctx, pos) + .Args({arg}) + .Body() + .Members(lookupColumns.Cast()) + .Input() + .Lambda(ctx.DeepCopyLambda(extraRightFilter.Cast().Ref())) + .Input().Input(arg).Build() + .Build() + .Build() + .Done(); + lookupColumns = Build(ctx, pos) + .Add(newColumns) + .Done(); + } + } else { + return {}; + } + } TExprBase lookupJoin = Build(ctx, join.Pos()) - .Table(rightRead.Table()) + .Table(rightLookup.MainTable) .LookupKeys(leftInput) - .Columns(rightRead.Columns()) + .Columns(lookupColumns.Cast()) .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName) .Done(); // Stream lookup join output: stream>> // so we should apply filters to second element of tuple for each row + if (extraRightFilter.IsValid()) { + lookupJoin = Build(ctx, join.Pos()) + .Input(lookupJoin) + .Lambda() + .Args({"tuple"}) + .Body() + .Add() + .Tuple("tuple") + .Index().Value("0").Build() + .Build() + .Add() + .Input() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Lambda(ctx.DeepCopyLambda(extraRightFilter.Cast().Ref())) + .Build() + .Build() + .Build() + .Done(); + } + if (rightReadMatch.ExtractMembers) { lookupJoin = Build(ctx, join.Pos()) .Input(lookupJoin) @@ -432,10 +511,8 @@ TMaybeNode BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBa .Done(); } -template -TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { - static_assert(std::is_same_v || std::is_same_v, "unsupported read type"); +TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!join.RightLabel().Maybe()) { // Lookup only in tables return {}; @@ -450,136 +527,41 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext TString lookupTable; TString indexName; - auto rightReadMatch = MatchRead(join.RightInput()); + auto rightReadMatch = MatchRead(join.RightInput(), [](TExprBase node) { + return node.Maybe() || node.Maybe(); + }); + if (!rightReadMatch || rightReadMatch->FlatMap && !IsPassthroughFlatMap(rightReadMatch->FlatMap.Cast(), nullptr)) { return {}; } - auto rightRead = rightReadMatch->Read.template Cast(); - + TMaybeNode rightColumns; TMaybeNode lookupColumns; size_t rightPrefixSize; TMaybeNode rightPrefixExpr; - if constexpr (std::is_same_v) { - Y_ENSURE(rightRead.template Maybe() || rightRead.template Maybe()); - const TKqlReadTableBase read = rightRead; - if (!read.Table().SysView().Value().empty()) { - // Can't lookup in system views - return {}; - } - - auto maybeRightTableKeyPrefix = GetRightTableKeyPrefix(read.Range()); - if (!maybeRightTableKeyPrefix) { - return {}; - } - - lookupColumns = read.Columns(); - - rightPrefixSize = maybeRightTableKeyPrefix.Cast().ArgCount(); - TVector columns; - for (auto& column : maybeRightTableKeyPrefix.Cast().Args()) { - columns.push_back(TExprBase(column)); - } - - rightPrefixExpr = Build(ctx, join.Pos()) - .Add() - .Add(columns) - .Build() - .Done(); - - if (auto indexRead = rightRead.template Maybe()) { - indexName = indexRead.Cast().Index().StringValue(); - lookupTable = GetIndexMetadata(indexRead.Cast(), *kqpCtx.Tables, kqpCtx.Cluster)->Name; - } else { - lookupTable = read.Table().Path().StringValue(); - } - } else if constexpr (std::is_same_v){ - auto read = rightReadMatch->Read.template Cast(); - if (!read.Table().SysView().Value().empty()) { - // Can't lookup in system views - return {}; - } - - lookupColumns = read.Columns(); - - if (auto indexRead = read.template Maybe()) { - const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, read.Table().Path()); - const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(indexRead.Index().Cast().StringValue()); - lookupTable = indexMeta->Name; - indexName = indexRead.Cast().Index().StringValue(); - } else { - lookupTable = read.Table().Path().StringValue(); - } - - if (TCoVoid::Match(read.Ranges().Raw())) { - rightPrefixSize = 0; - rightPrefixExpr = Build(ctx, join.Pos()) - .Input().Build() - .Done(); - } else { - auto prompt = TKqpReadTableExplainPrompt::Parse(read); - - if (prompt.PointPrefixLen != prompt.UsedKeyColumns.size()) { - return {}; - } - - if (prompt.ExpectedMaxRanges != TMaybe(1)) { - return {}; - } - rightPrefixSize = prompt.PointPrefixLen; - - const auto& rightTableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookupTable); - - TMaybeNode rowsExpr; - if (read.template Maybe()) { - rowsExpr = read.template Cast().PrefixPointsExpr(); - } - if (read.template Maybe()) { - rowsExpr = read.template Cast().PrefixPointsExpr(); - } - - size_t prefixLen = prompt.PointPrefixLen; - TVector keyColumns; - for (size_t i = 0; i < prefixLen; ++i) { - YQL_ENSURE(i < rightTableDesc.Metadata->KeyColumnNames.size()); - keyColumns.push_back(rightTableDesc.Metadata->KeyColumnNames[i]); - } - + auto prefixLookup = RewriteReadToPrefixLookup(rightReadMatch->Read, ctx, kqpCtx, kqpCtx.Config->IdxLookupJoinsPrefixPointLimit); + if (prefixLookup) { + lookupTable = prefixLookup->LookupTableName; + indexName = prefixLookup->IndexName; + lookupColumns = prefixLookup->LookupColumns; + rightColumns = prefixLookup->ResultColumns; - auto rowArg = Build(ctx, join.Pos()) - .Name("rowArg") - .Done(); - - TVector components; - for (auto column : keyColumns) { - TCoAtom columnAtom(ctx.NewAtom(read.Ranges().Pos(), column)); - components.push_back( - Build(ctx, read.Ranges().Pos()) - .Struct(rowArg) - .Name(columnAtom) - .Done()); - } - - rightPrefixExpr = Build(ctx, join.Pos()) - .Input(rowsExpr.Cast()) - .Lambda() - .Args({rowArg}) - .Body() - .Add(components) - .Build() - .Build() - .Done(); - } + rightPrefixSize = prefixLookup->PrefixSize; + rightPrefixExpr = prefixLookup->PrefixExpr; + } else { + return {}; } - Y_ENSURE(rightPrefixExpr.IsValid()); - const auto& rightTableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookupTable); if (rightTableDesc.Metadata->Kind == NYql::EKikimrTableKind::Olap) { return {}; } + if ((!kqpCtx.Config->PredicateExtract20 || kqpCtx.Config->OldLookupJoinBehaviour) && prefixLookup->Filter.IsValid()) { + return {}; + } + TMap rightJoinKeyToLeft; TVector rightKeyColumns; rightKeyColumns.reserve(join.JoinKeys().Size()); @@ -615,6 +597,7 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext TVector skipNullColumns; ui32 fixedPrefix = 0; TSet deduplicateLeftColumns; + TVector prefixFilters; for (auto& rightColumnName : rightTableDesc.Metadata->KeyColumnNames) { TExprNode::TPtr member; @@ -622,7 +605,21 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext if (fixedPrefix < rightPrefixSize) { if (leftColumn) { - return {}; + prefixFilters.push_back( + Build(ctx, join.Pos()) + .Left() + .Tuple(prefixRowArg) + .Index().Value(ToString(fixedPrefix)).Build() + .Build() + .Right() + .Struct(leftRowArg) + .Name().Build(*leftColumn) + .Build() + .Done()); + deduplicateLeftColumns.insert(*leftColumn); + if ((!kqpCtx.Config->PredicateExtract20 || kqpCtx.Config->OldLookupJoinBehaviour)) { + return {}; + } } member = Build(ctx, prefixRowArg.Pos()) @@ -696,7 +693,8 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin - && supportedStreamJoinKinds.contains(join.JoinType().Value()); + && supportedStreamJoinKinds.contains(join.JoinType().Value()) + && !indexName; bool needPrecomputeLeft = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && !join.LeftInput().Maybe() @@ -749,27 +747,48 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Done(); } + auto wrapWithPrefixFilters = [&](TExprBase body) -> TExprBase { + if (prefixFilters.empty()) { + return Build(ctx, body.Pos()) + .Input(body) + .Done(); + } else { + return Build(ctx, body.Pos()) + .Predicate() + .Predicate() + .Add(prefixFilters) + .Build() + .Value() + .Literal().Build("false") + .Build() + .Build() + .Value(body) + .Done(); + } + }; + if (useStreamIndexLookupJoin) { auto leftInput = Build(ctx, join.Pos()) .Input(leftData) .Lambda() .Args({leftRowArg}) - .Body() + .Body() .Input(rightPrefixExpr.Cast()) .Lambda() .Args({prefixRowArg}) - .Body() - .Add() - .Add(lookupMembers) - .Build() - .Add(leftRowArg) - .Build() + .Body(wrapWithPrefixFilters( + Build(ctx, join.Pos()) + .Add() + .Add(lookupMembers) + .Build() + .Add(leftRowArg) + .Done())) .Build() .Build() .Build() .Done(); - return BuildKqpStreamIndexLookupJoin(join, leftInput, *rightReadMatch, ctx); + return BuildKqpStreamIndexLookupJoin(join, leftInput, *prefixLookup, *rightReadMatch, ctx); } auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos()); @@ -777,21 +796,33 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Input(leftDataDeduplicated) .Lambda() .Args({leftRowArg}) - .Body() + .Body() .Input(rightPrefixExpr.Cast()) .Lambda() .Args({prefixRowArg}) - .Body() - .Add(lookupMembers) - .Build() + .Body(wrapWithPrefixFilters(Build(ctx, join.Pos()).Add(lookupMembers).Done())) .Build() .Build() .Build() .Done(); TExprBase lookup = indexName - ? BuildLookupIndex(ctx, join.Pos(), rightRead.Table(), rightRead.Columns(), keysToLookup, skipNullColumns, indexName, kqpCtx) - : BuildLookupTable(ctx, join.Pos(), rightRead.Table(), rightRead.Columns(), keysToLookup, skipNullColumns, kqpCtx); + ? BuildLookupIndex(ctx, join.Pos(), prefixLookup->MainTable, lookupColumns.Cast(), keysToLookup, skipNullColumns, indexName, kqpCtx) + : BuildLookupTable(ctx, join.Pos(), prefixLookup->MainTable, lookupColumns.Cast(), keysToLookup, skipNullColumns, kqpCtx); + + if (prefixLookup->Filter.IsValid()) { + lookup = Build(ctx, join.Pos()) + .Input(lookup) + .Lambda(ctx.DeepCopyLambda(prefixLookup->Filter.Cast().Ref())) + .Done(); + } + + if (prefixLookup->LookupColumns.Raw() != prefixLookup->ResultColumns.Raw()) { + lookup = Build(ctx, join.Pos()) + .Input(lookup) + .Members(prefixLookup->ResultColumns) + .Done(); + } // Skip null keys in lookup part as for equijoin semantics null != null, // so we can't have nulls in lookup part @@ -803,7 +834,7 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Done(); if (rightReadMatch->ExtractMembers) { - lookupColumns = rightReadMatch->ExtractMembers.Cast().Members(); + rightColumns = rightReadMatch->ExtractMembers.Cast().Members(); } lookup = rightReadMatch->BuildProcessNodes(lookup, ctx); @@ -812,7 +843,7 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext auto arg = TCoArgument(ctx.NewArgument(join.Pos(), "row")); auto rightLabel = join.RightLabel().Cast().Value(); - TVector renames = CreateRenames(rightReadMatch->FlatMap, lookupColumns.Cast(), arg, rightLabel, + TVector renames = CreateRenames(rightReadMatch->FlatMap, rightColumns.Cast(), arg, rightLabel, join.Pos(), ctx); lookup = Build(ctx, join.Pos()) @@ -840,27 +871,7 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext } // anonymous namespace -TMaybeNode GetRightTableKeyPrefix(const TKqlKeyRange& range) { - if (!range.From().Maybe() || !range.To().Maybe()) { - return {}; - } - auto rangeFrom = range.From().Cast(); - auto rangeTo = range.To().Cast(); - - if (rangeFrom.ArgCount() != rangeTo.ArgCount()) { - return {}; - } - for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) { - if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) { - return {}; - } - } - - return rangeFrom; -} - -TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) -{ +TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin) || !node.Maybe()) { return node; } @@ -873,16 +884,12 @@ TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const T auto flipJoin = FlipLeftSemiJoin(join, ctx); DBG("-- Flip join"); - if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(flipJoin, ctx, kqpCtx)) { - return indexLookupJoin.Cast(); - } else if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(flipJoin, ctx, kqpCtx)) { + if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(flipJoin, ctx, kqpCtx)) { return indexLookupJoin.Cast(); } } - if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(join, ctx, kqpCtx)) { - return indexLookupJoin.Cast(); - } else if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(join, ctx, kqpCtx)) { + if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(join, ctx, kqpCtx)) { return indexLookupJoin.Cast(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index c53270ac283d..3bdd83516c9b 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -179,7 +179,7 @@ TMaybeNode TryBuildTrivialReadTable(TCoFlatMap& flatmap, TKqlReadTabl } // namespace TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, - TTypeAnnotationContext& typesCtx) + TTypeAnnotationContext& typesCtx, const NYql::TParentsMap& parentsMap) { if (!node.Maybe()) { return node; @@ -471,10 +471,33 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx if (!input) { TMaybeNode prefix; + TMaybeNode predicateExpr; + TMaybeNode usedColumnsList; if (kqpCtx.Config->PredicateExtract20) { prefix = prefixPointsExpr; + if (prefix) { + predicateExpr = ctx.DeepCopyLambda(flatmap.Lambda().Ref()); + TSet usedColumns; + if (!ExtractUsedFields( + flatmap.Lambda().Body().Ptr(), + flatmap.Lambda().Args().Arg(0).Ref(), + usedColumns, + parentsMap, + true)) + { + prefix = {}; + predicateExpr = {}; + } else { + TVector columnAtoms; + for (auto&& column : usedColumns) { + columnAtoms.push_back(Build(ctx, read.Pos()).Value(column).Done()); + } + usedColumnsList = Build(ctx, read.Pos()).Add(columnAtoms).Done(); + } + } } + if (indexName) { input = Build(ctx, read.Pos()) .Table(read.Table()) @@ -484,6 +507,8 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx .ExplainPrompt(prompt.BuildNode(ctx, read.Pos())) .Index(indexName.Cast()) .PrefixPointsExpr(prefix) + .PredicateExpr(predicateExpr) + .PredicateUsedColumns(usedColumnsList) .Done(); } else { input = Build(ctx, read.Pos()) @@ -493,12 +518,13 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx .Settings(read.Settings()) .ExplainPrompt(prompt.BuildNode(ctx, read.Pos())) .PrefixPointsExpr(prefix) + .PredicateExpr(predicateExpr) + .PredicateUsedColumns(usedColumnsList) .Done(); } } *input = readMatch->BuildProcessNodes(*input, ctx); - if (node.Maybe()) { return Build(ctx, node.Pos()) .Input(*input) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h index 9f4852eff0c9..d311b41be980 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h @@ -22,7 +22,7 @@ NYql::NNodes::TExprBase KqpPushPredicateToReadTable(NYql::NNodes::TExprBase node const TKqpOptimizeContext &kqpCtx); NYql::NNodes::TExprBase KqpPushExtractedPredicateToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, - const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx); + const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx, const NYql::TParentsMap& parentsMap); NYql::NNodes::TExprBase KqpJoinToIndexLookup(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp index 00d430ae7b40..ecd2e664d49a 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp @@ -76,88 +76,27 @@ TExprBase KqpRewriteSqlInToEquiJoin(const TExprBase& node, TExprContext& ctx, co const NYql::TKikimrTableDescription* tableDesc; - auto readMatch = MatchRead(flatMap.Input()); - auto rangesMatch = MatchRead(flatMap.Input()); - ui64 fixedPrefixLen; - if (readMatch) { - TString lookupTable; - - if (readMatch->FlatMap) { - return node; - } - - auto readTable = readMatch->Read.Cast(); - - static const std::set supportedReads { - TKqlReadTable::CallableName(), - TKqlReadTableIndex::CallableName(), - }; - - if (!supportedReads.contains(readTable.CallableName())) { - return node; - } - - if (!readTable.Table().SysView().Value().empty()) { - return node; - } - - if (auto indexRead = readTable.Maybe()) { - lookupTable = GetIndexMetadata(indexRead.Cast(), *kqpCtx.Tables, kqpCtx.Cluster)->Name; - } else if (!lookupTable) { - lookupTable = readTable.Table().Path().StringValue(); - } - - tableDesc = &kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookupTable); - const auto& rangeFrom = readTable.Range().From(); - const auto& rangeTo = readTable.Range().To(); - - if (!rangeFrom.Maybe() || !rangeTo.Maybe()) { - return node; - } - if (rangeFrom.Raw() != rangeTo.Raw()) { - // not point selection - return node; - } - - fixedPrefixLen = rangeFrom.ArgCount(); - } else if (rangesMatch) { - if (rangesMatch->FlatMap) { - return node; - } - - auto read = rangesMatch->Read.template Cast(); - - if (!read.Table().SysView().Value().empty()) { - return node; - } - - auto prompt = TKqpReadTableExplainPrompt::Parse(read); - if (prompt.PointPrefixLen != prompt.UsedKeyColumns.size()) { - return node; - } - - if (!TCoVoid::Match(read.Ranges().Raw()) && prompt.ExpectedMaxRanges != TMaybe(1)) { - return node; - } + auto readMatch = MatchRead(flatMap.Input(), [](TExprBase node) { + return node.Maybe() || node.Maybe(); + }); - TString lookupTable; - TString indexName; - if (auto indexRead = read.template Maybe()) { - const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, read.Table().Path()); - const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(indexRead.Index().Cast().StringValue()); - lookupTable = indexMeta->Name; - indexName = indexRead.Cast().Index().StringValue(); - } else { - lookupTable = read.Table().Path().StringValue(); - } + if (!readMatch) { + return node; + } - tableDesc = &kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookupTable); + ui64 fixedPrefixLen; + auto pointSelection = RewriteReadToPrefixLookup(readMatch->Read, ctx, kqpCtx, kqpCtx.Config->IdxLookupJoinsPrefixPointLimit); + if (!pointSelection) { + return node; + } - fixedPrefixLen = prompt.PointPrefixLen; - } else { + if ((!kqpCtx.Config->PredicateExtract20 || kqpCtx.Config->OldLookupJoinBehaviour) && pointSelection->Filter.IsValid()) { return node; } + fixedPrefixLen = pointSelection->PrefixSize; + tableDesc = &kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, pointSelection->LookupTableName); + i64 keySuffixLen = (i64) tableDesc->Metadata->KeyColumnNames.size() - (i64) fixedPrefixLen; if (keySuffixLen <= 0) { return node; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 4ff9fe205c73..9e6f4f945d80 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -214,6 +214,9 @@ NNodes::TCoAtomList BuildColumnsList(const TKikimrTableDescription& table, TPosi const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData, const TString& cluster, const TString& table, NNodes::TCoAtomList select, bool withSystemColumns = false); +const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData, + const TString& cluster, const TString& table, TPositionHandle pos, bool withSystemColumns); + TYdbOperation GetTableOp(const NNodes::TKiWriteTable& write); TVector TableOperationsToProto(const NNodes::TCoNameValueTupleList& operations, TExprContext& ctx); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index f6fb2beb1e2e..e8ef355c7bc4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -162,6 +162,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableAstCache = false; bool EnablePgConstsToParams = false; ui64 ExtractPredicateRangesLimit = 0; + ui64 IdxLookupJoinsPrefixPointLimit = 1; + bool OldLookupJoinBehaviour = true; }; } diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index d5d5b95754bd..a41a9eb9f81c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -1870,6 +1870,20 @@ TAutoPtr CreateKiSinkTypeAnnotationTransformer(TIntrusivePtr< return new TKiSinkTypeAnnotationTransformer(gateway, sessionCtx, types); } +const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData, + const TString& cluster, const TString& table, TPositionHandle pos, bool withSystemColumns) +{ + auto tableDesc = tablesData.EnsureTableExists(cluster, table, pos, ctx); + if (!tableDesc) { + return nullptr; + } + TVector columns; + for (auto&& [column, _] : tableDesc->Metadata->Columns) { + columns.push_back(Build(ctx, pos).Value(column).Done()); + } + return GetReadTableRowType(ctx, tablesData, cluster, table, Build(ctx, pos).Add(columns).Done(), withSystemColumns); +} + const TTypeAnnotationNode* GetReadTableRowType(TExprContext& ctx, const TKikimrTablesData& tablesData, const TString& cluster, const TString& table, TCoAtomList select, bool withSystemColumns) { diff --git a/ydb/core/kqp/ut/join/kqp_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_ut.cpp index fedaa77073ea..606302afd88f 100644 --- a/ydb/core/kqp/ut/join/kqp_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_ut.cpp @@ -1303,6 +1303,238 @@ Y_UNIT_TEST_SUITE(KqpJoin) { UNIT_ASSERT(explain.GetAst().Contains("GraceJoinCore")); } + Y_UNIT_TEST(FullOuterJoinNotNullJoinKey) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { // init tables + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE left + ( + Key Int64 NOT NULL, + Value Int64, + PRIMARY KEY (Key) + ); + + CREATE TABLE right + ( + Key Int64 NOT NULL, + Value Int64, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + REPLACE INTO left (Key, Value) VALUES + (1, 10), + (2, 20), + (3, 30); + + REPLACE INTO right (Key, Value) VALUES + (1, 10), + (2, 200), + (3, 300), + (4, 40); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Key, l.Value, r.Key, r.Value FROM left as l FULL JOIN right as r + ON (l.Value = r.Value AND l.Key = r.Key) + ORDER BY l.Key, r.Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#;#;[2];[200]]; + [#;#;[3];[300]]; + [#;#;[4];[40]]; + [[1];[10];[1];[10]]; + [[2];[20];#;#]; + [[3];[30];#;#] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(FullOuterJoinNotNullJoinKey) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { // init tables + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE left + ( + Key Int64 NOT NULL, + Value Int64, + PRIMARY KEY (Key) + ); + + CREATE TABLE right + ( + Key Int64 NOT NULL, + Value Int64, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + REPLACE INTO left (Key, Value) VALUES + (1, 10), + (2, 20), + (3, 30); + + REPLACE INTO right (Key, Value) VALUES + (1, 10), + (2, 200), + (3, 300), + (4, 40); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Key, l.Value, r.Key, r.Value FROM left as l FULL JOIN right as r + ON (l.Value = r.Value AND l.Key = r.Key) + ORDER BY l.Key, r.Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#;#;[2];[200]]; + [#;#;[3];[300]]; + [#;#;[4];[40]]; + [[1];[10];[1];[10]]; + [[2];[20];#;#]; + [[3];[30];#;#] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST_TWIN(AllowJoinsForComplexPredicates, StreamLookup) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup); + appConfig.MutableTableServiceConfig()->SetOldLookupJoinBehaviour(false); + appConfig.MutableTableServiceConfig()->SetIdxLookupJoinPointsLimit(10); + //appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + + auto appsettings = TKikimrSettings().SetAppConfig(appConfig); + + TKikimrRunner kikimr(appsettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + NYdb::NTable::TExecDataQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Fk21, l.Fk22, r.Key1, r.Key2, r.Name FROM Join1 as l JOIN Join2 as r + ON (l.Fk21 = r.Key1 AND l.Fk22 = r.Key2) + WHERE r.Key1 > 0 and r.Name > "" + ORDER BY l.Fk21 ASC, l.Fk22 ASC + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[101u];["One"];[101u];["One"];["Name1"]]; + [[101u];["Two"];[101u];["Two"];["Name1"]]; + [[103u];["One"];[103u];["One"];["Name1"]]; + [[105u];["One"];[105u];["One"];["Name2"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + AssertTableReads(result, "/Root/Join2", 5); + UNIT_ASSERT(result.GetQueryPlan().Contains("Lookup")); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Fk21, l.Fk22, r.Key1, r.Key2, r.Name FROM Join1 as l JOIN Join2 as r + ON (l.Fk21 = r.Key1 AND l.Fk22 = r.Key2) + WHERE r.Key1 = 101u and r.Key2 >= "One" and r.Key2 <= "Two" + ORDER BY l.Fk21 ASC, l.Fk22 ASC + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[101u];["One"];[101u];["One"];["Name1"]]; + [[101u];["Two"];[101u];["Two"];["Name1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + AssertTableReads(result, "/Root/Join2", 2); + UNIT_ASSERT(result.GetQueryPlan().Contains("Lookup")); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Fk21, l.Fk22, r.Key1, r.Key2, r.Name FROM Join1 as l JOIN Join2 as r + ON (l.Fk21 = r.Key1 AND l.Fk22 = r.Key2) + WHERE r.Key1 >= 101u and r.Key1 <= 103u and r.Key2 >= "One" and r.Key2 <= "Two" + ORDER BY l.Fk21 ASC, l.Fk22 ASC + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[101u];["One"];[101u];["One"];["Name1"]]; + [[101u];["Two"];[101u];["Two"];["Name1"]]; + [[103u];["One"];[103u];["One"];["Name1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + AssertTableReads(result, "/Root/Join2", 3); + UNIT_ASSERT(result.GetQueryPlan().Contains("Lookup")); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Fk21, l.Fk22, r.Key1, r.Key2, r.Name FROM Join1 as l JOIN Join2 as r + ON (l.Fk21 = r.Key1 AND l.Fk22 = r.Key2) + WHERE r.Key1 = 101u or r.Key1 = 105u + ORDER BY l.Fk21 ASC, l.Fk22 ASC + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[101u];["One"];[101u];["One"];["Name1"]]; + [[101u];["Two"];[101u];["Two"];["Name1"]]; + [[105u];["One"];[105u];["One"];["Name2"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + AssertTableReads(result, "/Root/Join2", 3); + UNIT_ASSERT(result.GetQueryPlan().Contains("Lookup")); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT l.Fk21, l.Fk22, r.Key1, r.Key2, r.Name FROM Join1 as l JOIN Join2 as r + ON (l.Fk21 = r.Key1 AND l.Fk22 = r.Key2) + WHERE (r.Key1 = 101u AND r.Key2 = "One") OR r.Key1 = 105u + ORDER BY l.Fk21 ASC, l.Fk22 ASC + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[101u];["One"];[101u];["One"];["Name1"]]; + [[105u];["One"];[105u];["One"];["Name2"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + AssertTableReads(result, "/Root/Join2", 2); + UNIT_ASSERT(result.GetQueryPlan().Contains("Lookup")); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 92fd3d3b1681..5207845e4028 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -273,4 +273,7 @@ message TTableServiceConfig { optional uint64 ExtractPredicateRangesLimit = 54 [default = 10000]; optional bool EnableOlapSink = 55 [default = false]; + + optional uint64 IdxLookupJoinPointsLimit = 58 [default = 1]; + optional bool OldLookupJoinBehaviour = 59 [default = true]; };