Skip to content

Commit

Permalink
Merge e6bd719 into b24ad78
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 9, 2024
2 parents b24ad78 + e6bd719 commit aea4e37
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 359 deletions.
26 changes: 26 additions & 0 deletions ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,32 @@
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
{"Index": 1, "Name": "Name", "Type": "TCoAtom"}
]
},
{
"Name": "TDqReadWrapBase",
"Base": "TExprBase",
"Match": {"Type": "CallableBase"},
"Builder": {"Generate": "None"},
"Children": [
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
{"Index": 1, "Name": "Flags", "Type": "TCoAtomList"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
]
},
{
"Name": "TDqReadWrap",
"Base": "TDqReadWrapBase",
"Match": {"Type": "Callable", "Name": "DqReadWrap"}
},
{
"Name": "TDqReadWideWrap",
"Base": "TDqReadWrapBase",
"Match": {"Type": "Callable", "Name": "DqReadWideWrap"}
},
{
"Name": "TDqReadBlockWideWrap",
"Base": "TDqReadWrapBase",
"Match": {"Type": "Callable", "Name": "DqReadBlockWideWrap"}
}
]
}
318 changes: 318 additions & 0 deletions ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>

using namespace NYql::NNodes;

Expand Down Expand Up @@ -374,4 +375,321 @@ TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, TTypeAnnotat
return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx));
}

IDqOptimization* GetDqOptCallback(const TExprBase& providerRead, TTypeAnnotationContext& typeAnnCtx) {
if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) {
auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content();
auto datasource = typeAnnCtx.DataSourceMap.FindPtr(dataSourceName);
YQL_ENSURE(datasource);
return (*datasource)->GetDqOptimization();
}
return nullptr;
}

TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) {
const auto unordered = node.Cast<TCoUnorderedBase>();
if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) {
if (enableDqReplicate) {
const TParentsMap* parentsMap = getParents();
auto parentsIt = parentsMap->find(unordered.Input().Raw());
YQL_ENSURE(parentsIt != parentsMap->cend());
if (parentsIt->second.size() > 1) {
return node;
}
}
auto providerRead = maybeRead.Cast();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx);
if (!updatedRead) {
return {};
}
if (updatedRead != providerRead.Ptr()) {
return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
}
}
}

return node;
}

TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) {
auto extract = node.Cast<TCoExtractMembers>();
if (const auto maybeRead = extract.Input().Maybe<TDqReadWrap>().Input()) {
if (enableDqReplicate) {
const TParentsMap* parentsMap = getParents();
auto parentsIt = parentsMap->find(extract.Input().Raw());
YQL_ENSURE(parentsIt != parentsMap->cend());
if (parentsIt->second.size() > 1) {
return node;
}
}
auto providerRead = maybeRead.Cast();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx);
if (!updatedRead) {
return {};
}
if (updatedRead != providerRead.Ptr()) {
return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrap::idx_Input, std::move(updatedRead)));
}
}
}

return node;
}

TMaybeNode<TExprBase> TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
auto countBase = node.Cast<TCoCountBase>();

// TODO: support via precomputes
if (!TCoIntegralCtor::Match(countBase.Count().Raw())) {
return node;
}

if (const auto maybeRead = countBase.Input().Maybe<TDqReadWrapBase>().Input()) {
auto providerRead = maybeRead.Cast();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
auto updatedRead = dqOpt->ApplyTakeOrSkip(providerRead.Ptr(), countBase.Ptr(), ctx);
if (!updatedRead) {
return {};
}
if (updatedRead != providerRead.Ptr()) {
return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
}
}
}

return node;
}

TMaybeNode<TExprBase> ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
auto extend = node.Cast<TCoExtendBase>();
const bool ordered = node.Maybe<TCoOrderedExtend>().IsValid();
const TExprNode* flags = nullptr;
const TExprNode* token = nullptr;
bool first = true;
std::unordered_map<IDqOptimization*, std::vector<std::pair<size_t, TExprNode::TPtr>>> readers;
IDqOptimization* prevDqOpt = nullptr;
for (size_t i = 0; i < extend.ArgCount(); ++i) {
const auto child = extend.Arg(i);
if (!TDqReadWrapBase::Match(child.Raw())) {
prevDqOpt = nullptr;
continue;
}
auto dqReadWrap = child.Cast<TDqReadWrapBase>();

if (first) {
flags = dqReadWrap.Flags().Raw();
token = dqReadWrap.Token().Raw();
first = false;
} else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw()) {
prevDqOpt = nullptr;
continue;
}
IDqOptimization* dqOpt = GetDqOptCallback(dqReadWrap.Input(), typeAnnCtx);
if (!dqOpt) {
prevDqOpt = nullptr;
continue;
}
if (ordered && prevDqOpt != dqOpt) {
readers[dqOpt].assign(1, std::make_pair(i, dqReadWrap.Input().Ptr()));
} else {
readers[dqOpt].emplace_back(i, dqReadWrap.Input().Ptr());
}
prevDqOpt = dqOpt;
}

if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) {
return node;
}

TExprNode::TListType newChildren = extend.Ref().ChildrenList();
for (auto& [dqOpt, list]: readers) {
if (list.size() > 1) {
TExprNode::TListType inReaders;
std::transform(list.begin(), list.end(), std::back_inserter(inReaders), [](const auto& item) { return item.second; });
TExprNode::TListType outReaders = dqOpt->ApplyExtend(inReaders, ordered, ctx);
if (outReaders.empty()) {
return {};
}
if (inReaders == outReaders) {
return node;
}
YQL_ENSURE(outReaders.size() <= inReaders.size());
size_t i = 0;
for (; i < outReaders.size(); ++i) {
newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i]));
}
for (; i < list.size(); ++i) {
newChildren[list[i].first] = nullptr;
}
}
}
newChildren.erase(std::remove(newChildren.begin(), newChildren.end(), TExprNode::TPtr{}), newChildren.end());
YQL_ENSURE(!newChildren.empty());
if (newChildren.size() > 1) {
return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(newChildren)));
} else {
return TExprBase(newChildren.front());
}
}

TMaybeNode<TExprBase> DqReadWideWrapFieldSubset(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx) {
auto map = node.Cast<TCoMapBase>();

if (const auto maybeRead = map.Input().Maybe<TDqReadWideWrap>().Input()) {
const TParentsMap* parentsMap = getParents();
auto parentsIt = parentsMap->find(map.Input().Raw());
YQL_ENSURE(parentsIt != parentsMap->cend());
if (parentsIt->second.size() > 1) {
return node;
}

TDynBitMap unusedArgs;
for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) {
if (auto parentsIt = parentsMap->find(map.Lambda().Args().Arg(i).Raw()); parentsIt == parentsMap->cend() || parentsIt->second.empty()) {
unusedArgs.Set(i);
}
}
if (unusedArgs.Empty()) {
return node;
}

auto providerRead = maybeRead.Cast();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {

auto structType = GetSeqItemType(*providerRead.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]).Cast<TStructExprType>();
TExprNode::TListType newMembers;
for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) {
if (!unusedArgs.Get(i)) {
newMembers.push_back(ctx.NewAtom(providerRead.Pos(), structType->GetItems().at(i)->GetName()));
}
}

auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), ctx.NewList(providerRead.Pos(), std::move(newMembers)), ctx);
if (!updatedRead) {
return {};
}
if (updatedRead == providerRead.Ptr()) {
return node;
}

TExprNode::TListType newArgs;
TNodeOnNodeOwnedMap replaces;
for (ui32 i = 0; i < map.Lambda().Args().Size(); ++i) {
if (!unusedArgs.Get(i)) {
auto newArg = ctx.NewArgument(map.Lambda().Args().Arg(i).Pos(), map.Lambda().Args().Arg(i).Name());
newArgs.push_back(newArg);
replaces.emplace(map.Lambda().Args().Arg(i).Raw(), std::move(newArg));
}
}

auto newLambda = ctx.NewLambda(
map.Lambda().Pos(),
ctx.NewArguments(map.Lambda().Args().Pos(), std::move(newArgs)),
ctx.ReplaceNodes(GetLambdaBody(map.Lambda().Ref()), replaces));

return Build<TCoMapBase>(ctx, map.Pos())
.CallableName(map.CallableName())
.Input<TDqReadWideWrap>()
.InitFrom(map.Input().Cast<TDqReadWideWrap>())
.Input(updatedRead)
.Build()
.Lambda(newLambda)
.Done();
}
}
return node;
}

TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx) {
auto providerRead = node.Cast<TDqReadWrapBase>().Input();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
auto updatedRead = dqOpt->RewriteRead(providerRead.Ptr(), ctx);
if (!updatedRead) {
return {};
}
if (updatedRead != providerRead.Ptr()) {
return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
}
}
return node;
}

TMaybeNode<TExprBase> ExtractMembersOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx) {
auto providerRead = node.Cast<TDqReadWrap>().Input();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
TNodeOnNodeOwnedMap toOptimize;
TExprNode::TPtr res;
bool error = false;
OptimizeSubsetFieldsForNodeWithMultiUsage(node.Ptr(), *getParents(), toOptimize, ctx,
[&] (const TExprNode::TPtr& input, const TExprNode::TPtr& members, const TParentsMap&, TExprContext& ctx) -> TExprNode::TPtr {
auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), members, ctx);
if (!updatedRead) {
error = true;
return {};
}
if (updatedRead != providerRead.Ptr()) {
res = ctx.ChangeChild(node.Ref(), TDqReadWrap::idx_Input, std::move(updatedRead));
return res;
}

return input;
}
);
if (error) {
return {};
}
if (!toOptimize.empty()) {
for (auto& [s, d]: toOptimize) {
optCtx.RemapNode(*s, d);
}
return TExprBase(res);
}
}

return node;
}

TMaybeNode<TExprBase> UnorderedOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx) {
auto providerRead = node.Cast<TDqReadWrapBase>().Input();
if (auto dqOpt = GetDqOptCallback(providerRead, typeAnnCtx)) {
auto parentsMap = getParents();
auto it = parentsMap->find(node.Raw());
if (it == parentsMap->cend() || it->second.size() <= 1) {
return node;
}

bool hasUnordered = false;
for (auto parent: it->second) {
if (TCoUnorderedBase::Match(parent)) {
hasUnordered = true;
} else if (!TCoAggregateBase::Match(parent) && !TCoFlatMap::Match(parent)) {
return node;
}
}

if (!hasUnordered) {
return node;
}

auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx);
if (!updatedRead) {
return {};
}
if (updatedRead != providerRead.Ptr()) {
auto newDqReadWrap = ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead));
for (auto parent: it->second) {
if (TCoUnorderedBase::Match(parent)) {
optCtx.RemapNode(*parent, newDqReadWrap);
} else if (TCoAggregateBase::Match(parent) || TCoFlatMap::Match(parent)) {
optCtx.RemapNode(*parent, ctx.ChangeChild(*parent, 0, TExprNode::TPtr(newDqReadWrap)));
}
}

return TExprBase(newDqReadWrap);
}
}
return node;
}

}
17 changes: 17 additions & 0 deletions ydb/library/yql/dq/opt/dq_opt_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <functional>

namespace NYql {
class IOptimizationContext;
struct TTypeAnnotationContext;
struct TDqSettings;
struct IProviderContext;
Expand Down Expand Up @@ -58,4 +59,20 @@ NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& c

IOptimizer* MakeNativeOptimizer(const IOptimizer::TInput& input, const std::function<void(const TString&)>& log);

NNodes::TMaybeNode<NNodes::TExprBase> UnorderedOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> ExtractMembersOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> TakeOrSkipOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> ExtendOverDqReadWrap(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> DqReadWideWrapFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> DqReadWrapByProvider(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> ExtractMembersOverDqReadWrapMultiUsage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx);

NNodes::TMaybeNode<NNodes::TExprBase> UnorderedOverDqReadWrapMultiUsage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const std::function<const TParentsMap*()>& getParents, TTypeAnnotationContext& typeAnnCtx);

} // namespace NYql::NDq
Loading

0 comments on commit aea4e37

Please sign in to comment.