From e6bd7194db997cbab85346340ed4ba4c680c8f13 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Fri, 9 Feb 2024 08:45:18 +0000 Subject: [PATCH] Moved optimizers from providers/dq to dq --- .../yql/dq/expr_nodes/dq_expr_nodes.json | 26 ++ ydb/library/yql/dq/opt/dq_opt_log.cpp | 318 ++++++++++++++++++ ydb/library/yql/dq/opt/dq_opt_log.h | 17 + ydb/library/yql/dq/opt/dq_opt_phy.cpp | 43 +++ ydb/library/yql/dq/opt/dq_opt_phy.h | 2 + .../provider/yql_clickhouse_logical_opt.cpp | 1 + .../provider/yql_clickhouse_physical_opt.cpp | 1 + .../dq/expr_nodes/dqs_expr_nodes.json | 26 -- .../yql/providers/dq/opt/logical_optimize.cpp | 301 +---------------- .../providers/dq/opt/physical_optimize.cpp | 41 +-- .../provider/yql_generic_logical_opt.cpp | 1 + .../provider/yql_generic_physical_opt.cpp | 1 + .../ydb/provider/yql_ydb_logical_opt.cpp | 1 + 13 files changed, 420 insertions(+), 359 deletions(-) diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index cfe5b3a054ca..0f65090e45f6 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -257,6 +257,32 @@ {"Index": 0, "Name": "Input", "Type": "TExprBase"}, {"Index": 1, "Name": "Name", "Type": "TCoAtom"} ] + }, + { + "Name": "TDqReadWrapBase", + "Base": "TExprBase", + "Match": {"Type": "CallableBase"}, + "Builder": {"Generate": "None"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"}, + {"Index": 1, "Name": "Flags", "Type": "TCoAtomList"}, + {"Index": 2, "Name": "Token", "Type": "TCoSecureParam", "Optional": true} + ] + }, + { + "Name": "TDqReadWrap", + "Base": "TDqReadWrapBase", + "Match": {"Type": "Callable", "Name": "DqReadWrap"} + }, + { + "Name": "TDqReadWideWrap", + "Base": "TDqReadWrapBase", + "Match": {"Type": "Callable", "Name": "DqReadWideWrap"} + }, + { + "Name": "TDqReadBlockWideWrap", + "Base": "TDqReadWrapBase", + "Match": {"Type": "Callable", "Name": "DqReadBlockWideWrap"} } ] } diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp index ea6746b5d77e..b86fc9da74d7 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp @@ -10,6 +10,7 @@ #include #include #include +#include using namespace NYql::NNodes; @@ -374,4 +375,321 @@ TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, TTypeAnnotat return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx)); } +IDqOptimization* GetDqOptCallback(const TExprBase& providerRead, TTypeAnnotationContext& typeAnnCtx) { + if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) { + auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content(); + auto datasource = typeAnnCtx.DataSourceMap.FindPtr(dataSourceName); + YQL_ENSURE(datasource); + return (*datasource)->GetDqOptimization(); + } + return nullptr; +} + +TMaybeNode UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) { + const auto unordered = node.Cast(); + if (const auto maybeRead = unordered.Input().Maybe().Input()) { + if (enableDqReplicate) { + const TParentsMap* parentsMap = getParents(); + auto parentsIt = parentsMap->find(unordered.Input().Raw()); + YQL_ENSURE(parentsIt != parentsMap->cend()); + if (parentsIt->second.size() > 1) { + return node; + } + } + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); + } + } + } + + return node; +} + +TMaybeNode ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) { + auto extract = node.Cast(); + if (const auto maybeRead = extract.Input().Maybe().Input()) { + if (enableDqReplicate) { + const TParentsMap* parentsMap = getParents(); + auto parentsIt = parentsMap->find(extract.Input().Raw()); + YQL_ENSURE(parentsIt != parentsMap->cend()); + if (parentsIt->second.size() > 1) { + return node; + } + } + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrap::idx_Input, std::move(updatedRead))); + } + } + } + + return node; +} + +TMaybeNode TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) { + auto countBase = node.Cast(); + + // TODO: support via precomputes + if (!TCoIntegralCtor::Match(countBase.Count().Raw())) { + return node; + } + + if (const auto maybeRead = countBase.Input().Maybe().Input()) { + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + auto updatedRead = dqOpt->ApplyTakeOrSkip(providerRead.Ptr(), countBase.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); + } + } + } + + return node; +} + +TMaybeNode ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) { + auto extend = node.Cast(); + const bool ordered = node.Maybe().IsValid(); + const TExprNode* flags = nullptr; + const TExprNode* token = nullptr; + bool first = true; + std::unordered_map>> readers; + IDqOptimization* prevDqOpt = nullptr; + for (size_t i = 0; i < extend.ArgCount(); ++i) { + const auto child = extend.Arg(i); + if (!TDqReadWrapBase::Match(child.Raw())) { + prevDqOpt = nullptr; + continue; + } + auto dqReadWrap = child.Cast(); + + if (first) { + flags = dqReadWrap.Flags().Raw(); + token = dqReadWrap.Token().Raw(); + first = false; + } else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw()) { + prevDqOpt = nullptr; + continue; + } + IDqOptimization* dqOpt = GetDqOptCallback(dqReadWrap.Input(), typeAnnCtx); + if (!dqOpt) { + prevDqOpt = nullptr; + continue; + } + if (ordered && prevDqOpt != dqOpt) { + readers[dqOpt].assign(1, std::make_pair(i, dqReadWrap.Input().Ptr())); + } else { + readers[dqOpt].emplace_back(i, dqReadWrap.Input().Ptr()); + } + prevDqOpt = dqOpt; + } + + if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) { + return node; + } + + TExprNode::TListType newChildren = extend.Ref().ChildrenList(); + for (auto& [dqOpt, list]: readers) { + if (list.size() > 1) { + TExprNode::TListType inReaders; + std::transform(list.begin(), list.end(), std::back_inserter(inReaders), [](const auto& item) { return item.second; }); + TExprNode::TListType outReaders = dqOpt->ApplyExtend(inReaders, ordered, ctx); + if (outReaders.empty()) { + return {}; + } + if (inReaders == outReaders) { + return node; + } + YQL_ENSURE(outReaders.size() <= inReaders.size()); + size_t i = 0; + for (; i < outReaders.size(); ++i) { + newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i])); + } + for (; i < list.size(); ++i) { + newChildren[list[i].first] = nullptr; + } + } + } + newChildren.erase(std::remove(newChildren.begin(), newChildren.end(), TExprNode::TPtr{}), newChildren.end()); + YQL_ENSURE(!newChildren.empty()); + if (newChildren.size() > 1) { + return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(newChildren))); + } else { + return TExprBase(newChildren.front()); + } +} + +TMaybeNode DqReadWideWrapFieldSubset(TExprBase node, TExprContext& ctx, const std::function& getParents, TTypeAnnotationContext& typeAnnCtx) { + auto map = node.Cast(); + + if (const auto maybeRead = map.Input().Maybe().Input()) { + const TParentsMap* parentsMap = getParents(); + auto parentsIt = parentsMap->find(map.Input().Raw()); + YQL_ENSURE(parentsIt != parentsMap->cend()); + if (parentsIt->second.size() > 1) { + return node; + } + + TDynBitMap unusedArgs; + for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) { + if (auto parentsIt = parentsMap->find(map.Lambda().Args().Arg(i).Raw()); parentsIt == parentsMap->cend() || parentsIt->second.empty()) { + unusedArgs.Set(i); + } + } + if (unusedArgs.Empty()) { + return node; + } + + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + + auto structType = GetSeqItemType(*providerRead.Ref().GetTypeAnn()->Cast()->GetItems()[1]).Cast(); + TExprNode::TListType newMembers; + for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) { + if (!unusedArgs.Get(i)) { + newMembers.push_back(ctx.NewAtom(providerRead.Pos(), structType->GetItems().at(i)->GetName())); + } + } + + auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), ctx.NewList(providerRead.Pos(), std::move(newMembers)), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead == providerRead.Ptr()) { + return node; + } + + TExprNode::TListType newArgs; + TNodeOnNodeOwnedMap replaces; + for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) { + if (!unusedArgs.Get(i)) { + auto newArg = ctx.NewArgument(map.Lambda().Args().Arg(i).Pos(), map.Lambda().Args().Arg(i).Name()); + newArgs.push_back(newArg); + replaces.emplace(map.Lambda().Args().Arg(i).Raw(), std::move(newArg)); + } + } + + auto newLambda = ctx.NewLambda( + map.Lambda().Pos(), + ctx.NewArguments(map.Lambda().Args().Pos(), std::move(newArgs)), + ctx.ReplaceNodes(GetLambdaBody(map.Lambda().Ref()), replaces)); + + return Build(ctx, map.Pos()) + .CallableName(map.CallableName()) + .Input() + .InitFrom(map.Input().Cast()) + .Input(updatedRead) + .Build() + .Lambda(newLambda) + .Done(); + } + } + return node; +} + +TMaybeNode DqReadWrapByProvider(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) { + auto providerRead = node.Cast().Input(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + auto updatedRead = dqOpt->RewriteRead(providerRead.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); + } + } + return node; +} + +TMaybeNode ExtractMembersOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function& getParents, TTypeAnnotationContext& typeAnnCtx) { + auto providerRead = node.Cast().Input(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + TNodeOnNodeOwnedMap toOptimize; + TExprNode::TPtr res; + bool error = false; + OptimizeSubsetFieldsForNodeWithMultiUsage(node.Ptr(), *getParents(), toOptimize, ctx, + [&] (const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) -> TExprNode::TPtr { + auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), members, ctx); + if (!updatedRead) { + error = true; + return {}; + } + if (updatedRead != providerRead.Ptr()) { + res = ctx.ChangeChild(node.Ref(), TDqReadWrap::idx_Input, std::move(updatedRead)); + return res; + } + + return input; + } + ); + if (error) { + return {}; + } + if (!toOptimize.empty()) { + for (auto& [s, d]: toOptimize) { + optCtx.RemapNode(*s, d); + } + return TExprBase(res); + } + } + + return node; +} + +TMaybeNode UnorderedOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function& getParents, TTypeAnnotationContext& typeAnnCtx) { + auto providerRead = node.Cast().Input(); + if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) { + auto parentsMap = getParents(); + auto it = parentsMap->find(node.Raw()); + if (it == parentsMap->cend() || it->second.size() <= 1) { + return node; + } + + bool hasUnordered = false; + for (auto parent: it->second) { + if (TCoUnorderedBase::Match(parent)) { + hasUnordered = true; + } else if (!TCoAggregateBase::Match(parent) && !TCoFlatMap::Match(parent)) { + return node; + } + } + + if (!hasUnordered) { + return node; + } + + auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + auto newDqReadWrap = ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)); + for (auto parent: it->second) { + if (TCoUnorderedBase::Match(parent)) { + optCtx.RemapNode(*parent, newDqReadWrap); + } else if (TCoAggregateBase::Match(parent) || TCoFlatMap::Match(parent)) { + optCtx.RemapNode(*parent, ctx.ChangeChild(*parent, 0, TExprNode::TPtr(newDqReadWrap))); + } + } + + return TExprBase(newDqReadWrap); + } + } + return node; +} + } diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h index 0c140b9d99d0..8903cd19ec02 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.h +++ b/ydb/library/yql/dq/opt/dq_opt_log.h @@ -9,6 +9,7 @@ #include namespace NYql { + class IOptimizationContext; struct TTypeAnnotationContext; struct TDqSettings; struct IProviderContext; @@ -58,4 +59,20 @@ NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& c IOptimizer* MakeNativeOptimizer(const IOptimizer::TInput& input, const std::function& log); +NNodes::TMaybeNode UnorderedOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, const std::function& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode ExtractMembersOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, const std::function& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode TakeOrSkipOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode ExtendOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode DqReadWideWrapFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const std::function& getParents, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode DqReadWrapByProvider(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode ExtractMembersOverDqReadWrapMultiUsage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function& getParents, TTypeAnnotationContext& typeAnnCtx); + +NNodes::TMaybeNode UnorderedOverDqReadWrapMultiUsage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function& getParents, TTypeAnnotationContext& typeAnnCtx); + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 7e918f8d345d..bc6e5a25cd4c 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -2924,4 +2924,47 @@ NNodes::TExprBase DqBuildStageWithSourceWrap(NNodes::TExprBase node, TExprContex .Build().Done(); } +NNodes::TExprBase DqBuildStageWithReadWrap(NNodes::TExprBase node, TExprContext& ctx) { + const auto wrap = node.Cast(); + const auto read = Build(ctx, node.Pos()) + .Input(wrap.Input()) + .Flags().Build() + .Token(wrap.Token()) + .Done(); + + const auto structType = GetSeqItemType(*wrap.Ref().GetTypeAnn()).Cast(); + auto narrow = ctx.Builder(node.Pos()) + .Lambda() + .Callable("NarrowMap") + .Add(0, read.Ptr()) + .Lambda(1) + .Params("fields", structType->GetSize()) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& item : structType->GetItems()) { + parent.List(i) + .Atom(0, item->GetName()) + .Arg(1, "fields", i) + .Seal(); + ++i; + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Seal().Build(); + + return Build(ctx, node.Pos()) + .Output() + .Stage() + .Inputs().Build() + .Program(narrow) + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Build() + .Index().Build("0") + .Build() .Done(); +} + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 20d2041e2ec6..d100e2ef2fb5 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -127,4 +127,6 @@ TVector PrepareArgumentsReplacement(const NYql::NNode NNodes::TExprBase DqBuildStageWithSourceWrap(NNodes::TExprBase node, TExprContext& ctx); +NNodes::TExprBase DqBuildStageWithReadWrap(NNodes::TExprBase node, TExprContext& ctx); + } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_logical_opt.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_logical_opt.cpp index 0796293a2c0b..013d68c30590 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_logical_opt.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_logical_opt.cpp @@ -1,5 +1,6 @@ #include "yql_clickhouse_provider_impl.h" +#include #include #include #include diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_physical_opt.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_physical_opt.cpp index 229c80091a3f..6b2427d98883 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_physical_opt.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_physical_opt.cpp @@ -1,5 +1,6 @@ #include "yql_clickhouse_provider_impl.h" +#include #include #include #include diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json index 2d3868424c1e..34a9e2af5f8d 100644 --- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json +++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json @@ -5,32 +5,6 @@ "FreeArgCallableBase": "TFreeArgCallable", "FreeArgBuilderBase": "TFreeArgCallableBuilderBase", "Nodes": [ - { - "Name": "TDqReadWrapBase", - "Base": "TExprBase", - "Match": {"Type": "CallableBase"}, - "Builder": {"Generate": "None"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "Flags", "Type": "TCoAtomList"}, - {"Index": 2, "Name": "Token", "Type": "TCoSecureParam", "Optional": true} - ] - }, - { - "Name": "TDqReadWrap", - "Base": "TDqReadWrapBase", - "Match": {"Type": "Callable", "Name": "DqReadWrap"} - }, - { - "Name": "TDqReadWideWrap", - "Base": "TDqReadWrapBase", - "Match": {"Type": "Callable", "Name": "DqReadWideWrap"} - }, - { - "Name": "TDqReadBlockWideWrap", - "Base": "TDqReadWrapBase", - "Match": {"Type": "Callable", "Name": "DqReadBlockWideWrap"} - }, { "Name": "TDqWrite", "Base": "TCallable", diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index efaf2822b95d..fe0a73fdbf32 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -99,219 +99,23 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { - const auto unordered = node.Cast(); - if (const auto maybeRead = unordered.Input().Maybe().Input()) { - if (Config->IsDqReplicateEnabled(TypesCtx)) { - const TParentsMap* parentsMap = getParents(); - auto parentsIt = parentsMap->find(unordered.Input().Raw()); - YQL_ENSURE(parentsIt != parentsMap->cend()); - if (parentsIt->second.size() > 1) { - return node; - } - } - auto providerRead = maybeRead.Cast(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx); - if (!updatedRead) { - return {}; - } - if (updatedRead != providerRead.Ptr()) { - return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); - } - } - } - - return node; + return NDq::UnorderedOverDqReadWrap(node, ctx, getParents, Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate), TypesCtx); } TMaybeNode ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - auto extract = node.Cast(); - if (const auto maybeRead = extract.Input().Maybe().Input()) { - if (Config->IsDqReplicateEnabled(TypesCtx)) { - const TParentsMap* parentsMap = getParents(); - auto parentsIt = parentsMap->find(extract.Input().Raw()); - YQL_ENSURE(parentsIt != parentsMap->cend()); - if (parentsIt->second.size() > 1) { - return node; - } - } - auto providerRead = maybeRead.Cast(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx); - if (!updatedRead) { - return {}; - } - if (updatedRead != providerRead.Ptr()) { - return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrap::idx_Input, std::move(updatedRead))); - } - } - } - - return node; + return NDq::ExtractMembersOverDqReadWrap(node, ctx, getParents, Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate), TypesCtx); } TMaybeNode TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx) { - auto countBase = node.Cast(); - - // TODO: support via precomputes - if (!TCoIntegralCtor::Match(countBase.Count().Raw())) { - return node; - } - - if (const auto maybeRead = countBase.Input().Maybe().Input()) { - auto providerRead = maybeRead.Cast(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - auto updatedRead = dqOpt->ApplyTakeOrSkip(providerRead.Ptr(), countBase.Ptr(), ctx); - if (!updatedRead) { - return {}; - } - if (updatedRead != providerRead.Ptr()) { - return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); - } - } - } - - return node; + return NDq::TakeOrSkipOverDqReadWrap(node, ctx, TypesCtx); } TMaybeNode ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx) const { - auto extend = node.Cast(); - const bool ordered = node.Maybe().IsValid(); - const TExprNode* flags = nullptr; - const TExprNode* token = nullptr; - bool first = true; - std::unordered_map>> readers; - IDqOptimization* prevDqOpt = nullptr; - for (size_t i = 0; i < extend.ArgCount(); ++i) { - const auto child = extend.Arg(i); - if (!TDqReadWrapBase::Match(child.Raw())) { - prevDqOpt = nullptr; - continue; - } - auto dqReadWrap = child.Cast(); - - if (first) { - flags = dqReadWrap.Flags().Raw(); - token = dqReadWrap.Token().Raw(); - first = false; - } else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw()) { - prevDqOpt = nullptr; - continue; - } - IDqOptimization* dqOpt = GetDqOptCallback(dqReadWrap.Input()); - if (!dqOpt) { - prevDqOpt = nullptr; - continue; - } - if (ordered && prevDqOpt != dqOpt) { - readers[dqOpt].assign(1, std::make_pair(i, dqReadWrap.Input().Ptr())); - } else { - readers[dqOpt].emplace_back(i, dqReadWrap.Input().Ptr()); - } - prevDqOpt = dqOpt; - } - - if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) { - return node; - } - - TExprNode::TListType newChildren = extend.Ref().ChildrenList(); - for (auto& [dqOpt, list]: readers) { - if (list.size() > 1) { - TExprNode::TListType inReaders; - std::transform(list.begin(), list.end(), std::back_inserter(inReaders), [](const auto& item) { return item.second; }); - TExprNode::TListType outReaders = dqOpt->ApplyExtend(inReaders, ordered, ctx); - if (outReaders.empty()) { - return {}; - } - if (inReaders == outReaders) { - return node; - } - YQL_ENSURE(outReaders.size() <= inReaders.size()); - size_t i = 0; - for (; i < outReaders.size(); ++i) { - newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i])); - } - for (; i < list.size(); ++i) { - newChildren[list[i].first] = nullptr; - } - } - } - newChildren.erase(std::remove(newChildren.begin(), newChildren.end(), TExprNode::TPtr{}), newChildren.end()); - YQL_ENSURE(!newChildren.empty()); - if (newChildren.size() > 1) { - return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(newChildren))); - } else { - return TExprBase(newChildren.front()); - } + return NDq::ExtendOverDqReadWrap(node, ctx, TypesCtx); } TMaybeNode DqReadWideWrapFieldSubset(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - auto map = node.Cast(); - - if (const auto maybeRead = map.Input().Maybe().Input()) { - const TParentsMap* parentsMap = getParents(); - auto parentsIt = parentsMap->find(map.Input().Raw()); - YQL_ENSURE(parentsIt != parentsMap->cend()); - if (parentsIt->second.size() > 1) { - return node; - } - - TDynBitMap unusedArgs; - for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) { - if (auto parentsIt = parentsMap->find(map.Lambda().Args().Arg(i).Raw()); parentsIt == parentsMap->cend() || parentsIt->second.empty()) { - unusedArgs.Set(i); - } - } - if (unusedArgs.Empty()) { - return node; - } - - auto providerRead = maybeRead.Cast(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - - auto structType = GetSeqItemType(*providerRead.Ref().GetTypeAnn()->Cast()->GetItems()[1]).Cast(); - TExprNode::TListType newMembers; - for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) { - if (!unusedArgs.Get(i)) { - newMembers.push_back(ctx.NewAtom(providerRead.Pos(), structType->GetItems().at(i)->GetName())); - } - } - - auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), ctx.NewList(providerRead.Pos(), std::move(newMembers)), ctx); - if (!updatedRead) { - return {}; - } - if (updatedRead == providerRead.Ptr()) { - return node; - } - - TExprNode::TListType newArgs; - TNodeOnNodeOwnedMap replaces; - for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) { - if (!unusedArgs.Get(i)) { - auto newArg = ctx.NewArgument(map.Lambda().Args().Arg(i).Pos(), map.Lambda().Args().Arg(i).Name()); - newArgs.push_back(newArg); - replaces.emplace(map.Lambda().Args().Arg(i).Raw(), std::move(newArg)); - } - } - - auto newLambda = ctx.NewLambda( - map.Lambda().Pos(), - ctx.NewArguments(map.Lambda().Args().Pos(), std::move(newArgs)), - ctx.ReplaceNodes(GetLambdaBody(map.Lambda().Ref()), replaces)); - - return Build(ctx, map.Pos()) - .CallableName(map.CallableName()) - .Input() - .InitFrom(map.Input().Cast()) - .Input(updatedRead) - .Build() - .Lambda(newLambda) - .Done(); - } - } - return node; + return NDq::DqReadWideWrapFieldSubset(node, ctx, getParents, TypesCtx); } TMaybeNode FlatMapOverExtend(TExprBase node, TExprContext& ctx) { @@ -417,94 +221,15 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode DqReadWrapByProvider(TExprBase node, TExprContext& ctx) const { - auto providerRead = node.Cast().Input(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - auto updatedRead = dqOpt->RewriteRead(providerRead.Ptr(), ctx); - if (!updatedRead) { - return {}; - } - if (updatedRead != providerRead.Ptr()) { - return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); - } - } - return node; + return NDq::DqReadWrapByProvider(node, ctx, TypesCtx); } TMaybeNode ExtractMembersOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - auto providerRead = node.Cast().Input(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - TNodeOnNodeOwnedMap toOptimize; - TExprNode::TPtr res; - bool error = false; - OptimizeSubsetFieldsForNodeWithMultiUsage(node.Ptr(), *getParents(), toOptimize, ctx, - [&] (const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) -> TExprNode::TPtr { - auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), members, ctx); - if (!updatedRead) { - error = true; - return {}; - } - if (updatedRead != providerRead.Ptr()) { - res = ctx.ChangeChild(node.Ref(), TDqReadWrap::idx_Input, std::move(updatedRead)); - return res; - } - - return input; - } - ); - if (error) { - return {}; - } - if (!toOptimize.empty()) { - for (auto& [s, d]: toOptimize) { - optCtx.RemapNode(*s, d); - } - return TExprBase(res); - } - } - - return node; + return NDq::ExtractMembersOverDqReadWrapMultiUsage(node, ctx, optCtx, getParents, TypesCtx); } TMaybeNode UnorderedOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - auto providerRead = node.Cast().Input(); - if (auto dqOpt = GetDqOptCallback(providerRead)) { - auto parentsMap = getParents(); - auto it = parentsMap->find(node.Raw()); - if (it == parentsMap->cend() || it->second.size() <= 1) { - return node; - } - - bool hasUnordered = false; - for (auto parent: it->second) { - if (TCoUnorderedBase::Match(parent)) { - hasUnordered = true; - } else if (!TCoAggregateBase::Match(parent) && !TCoFlatMap::Match(parent)) { - return node; - } - } - - if (!hasUnordered) { - return node; - } - - auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx); - if (!updatedRead) { - return {}; - } - if (updatedRead != providerRead.Ptr()) { - auto newDqReadWrap = ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)); - for (auto parent: it->second) { - if (TCoUnorderedBase::Match(parent)) { - optCtx.RemapNode(*parent, newDqReadWrap); - } else if (TCoAggregateBase::Match(parent) || TCoFlatMap::Match(parent)) { - optCtx.RemapNode(*parent, ctx.ChangeChild(*parent, 0, TExprNode::TPtr(newDqReadWrap))); - } - } - - return TExprBase(newDqReadWrap); - } - } - return node; + return NDq::UnorderedOverDqReadWrapMultiUsage(node, ctx, optCtx, getParents, TypesCtx); } private: @@ -1247,16 +972,6 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase { return enableWatermarks; } - IDqOptimization* GetDqOptCallback(const TExprBase& providerRead) const { - if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) { - auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content(); - auto datasource = TypesCtx.DataSourceMap.FindPtr(dataSourceName); - YQL_ENSURE(datasource); - return (*datasource)->GetDqOptimization(); - } - return nullptr; - } - private: TDqConfiguration::TPtr Config; TTypeAnnotationContext& TypesCtx; diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 43782cc697f8..101e4613c605 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -98,46 +98,7 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode BuildStageWithReadWrap(TExprBase node, TExprContext& ctx) { - const auto wrap = node.Cast(); - const auto read = Build(ctx, node.Pos()) - .Input(wrap.Input()) - .Flags().Build() - .Token(wrap.Token()) - .Done(); - - const auto structType = GetSeqItemType(*wrap.Ref().GetTypeAnn()).Cast(); - auto narrow = ctx.Builder(node.Pos()) - .Lambda() - .Callable("NarrowMap") - .Add(0, read.Ptr()) - .Lambda(1) - .Params("fields", structType->GetSize()) - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 i = 0U; - for (const auto& item : structType->GetItems()) { - parent.List(i) - .Atom(0, item->GetName()) - .Arg(1, "fields", i) - .Seal(); - ++i; - } - return parent; - }) - .Seal() - .Seal() - .Seal() - .Seal().Build(); - - return Build(ctx, node.Pos()) - .Output() - .Stage() - .Inputs().Build() - .Program(narrow) - .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) - .Build() - .Index().Build("0") - .Build() .Done(); + return DqBuildStageWithReadWrap(node, ctx); } template diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp index 52a08660081b..b52082c39168 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp index d55c02b88c45..f68261923e36 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_logical_opt.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_logical_opt.cpp index b9c18d1ec114..3a2ad4172c60 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_logical_opt.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_logical_opt.cpp @@ -1,5 +1,6 @@ #include "yql_ydb_provider_impl.h" +#include #include #include #include