Skip to content

Commit

Permalink
Merge 8d336d5 into f2ccd9c
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelvelikhov authored Jan 29, 2024
2 parents f2ccd9c + 8d336d5 commit b11d10a
Show file tree
Hide file tree
Showing 28 changed files with 872 additions and 125 deletions.
8 changes: 6 additions & 2 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>


#include <ydb/core/kqp/opt/physical/kqp_opt_phy.h>
Expand Down Expand Up @@ -143,6 +144,7 @@ class TKqpRunner : public IKqpRunner {
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
, Pctx(TKqpProviderContext(*OptimizeCtx))
{
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
}
Expand Down Expand Up @@ -259,7 +261,7 @@ class TKqpRunner : public IKqpRunner {
.AddPostTypeAnnotation(/* forSubgraph */ true)
.AddCommonOptimization()
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config), "Statistics")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
.Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "KqpPhysicalOptimize")
Expand Down Expand Up @@ -293,7 +295,7 @@ class TKqpRunner : public IKqpRunner {
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
.AddPostTypeAnnotation()
.Add(CreateKqpBuildPhysicalQueryTransformer(OptimizeCtx, BuildQueryCtx), "BuildPhysicalQuery")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config), "Statistics")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Build(false);

auto physicalPeepholeTransformer = TTransformationPipeline(typesCtx)
Expand Down Expand Up @@ -355,6 +357,8 @@ class TKqpRunner : public IKqpRunner {
TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx;
TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx;

TKqpProviderContext Pctx;

TAutoPtr<IGraphTransformer> Transformer;
};

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
#include <ydb/library/yql/utils/plan/plan_utils.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

#include <library/cpp/json/writer/json.h>
#include <library/cpp/json/json_reader.h>
Expand Down Expand Up @@ -1357,7 +1358,7 @@ class TxPlanSerializer {
}

void AddOptimizerEstimates(TOperator& op, const TExprBase& expr) {
if (!SerializerCtx.Config->HasOptEnableCostBasedOptimization()) {
if (SerializerCtx.Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)==0) {
return;
}

Expand Down
9 changes: 6 additions & 3 deletions ydb/core/kqp/opt/kqp_statistics_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
#include <ydb/library/yql/core/yql_cost_function.h>

#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>


#include <charconv>

using namespace NYql;
Expand Down Expand Up @@ -187,7 +190,7 @@ IGraphTransformer::TStatus TKqpStatisticsTransformer::DoTransform(TExprNode::TPt
TExprNode::TPtr& output, TExprContext& ctx) {

output = input;
if (!Config->HasOptEnableCostBasedOptimization()) {
if (Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel) == 0) {
return IGraphTransformer::TStatus::Ok;
}

Expand Down Expand Up @@ -238,6 +241,6 @@ bool TKqpStatisticsTransformer::AfterLambdasSpecific(const TExprNode::TPtr& inpu
}

TAutoPtr<IGraphTransformer> NKikimr::NKqp::CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) {
return THolder<IGraphTransformer>(new TKqpStatisticsTransformer(kqpCtx, typeCtx, config));
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx) {
return THolder<IGraphTransformer>(new TKqpStatisticsTransformer(kqpCtx, typeCtx, config, pctx));
}
7 changes: 4 additions & 3 deletions ydb/core/kqp/opt/kqp_statistics_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/yql/core/yql_statistics.h>

#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
Expand Down Expand Up @@ -33,8 +34,8 @@ class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase

public:
TKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx,
const TKikimrConfiguration::TPtr& config) :
TDqStatisticsTransformerBase(&typeCtx),
const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx) :
TDqStatisticsTransformerBase(&typeCtx, pctx),
Config(config),
KqpCtx(*kqpCtx) {}

Expand All @@ -47,6 +48,6 @@ class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase
};

TAutoPtr<IGraphTransformer> CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config);
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx);
}
}
149 changes: 149 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#include "kqp_opt_cbo.h"
#include "kqp_opt_log_impl.h"

#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>


namespace NKikimr::NKqp::NOpt {

using namespace NYql;
using namespace NYql::NCommon;
using namespace NYql::NDq;
using namespace NYql::NNodes;

namespace {

/**
* KQP specific rule to check if a LookupJoin is applicable
*/
bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNode>& node, const TVector<TString>& joinColumns, const TKqpProviderContext& ctx) {

auto rel = std::static_pointer_cast<TKqpRelOptimizerNode>(node);
auto expr = TExprBase(rel->Node);

if (ctx.KqpCtx.IsScanQuery() && !ctx.KqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin) {
return false;
}

if (find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) { return node->Stats->KeyColumns[0] == s;})) {
return true;
}

auto readMatch = MatchRead<TKqlReadTable>(expr);
TMaybeNode<TKqlKeyInc> maybeTablePrefix;
size_t prefixSize;

if (readMatch) {
if (readMatch->FlatMap && !IsPassthroughFlatMap(readMatch->FlatMap.Cast(), nullptr)){
return false;
}
auto read = readMatch->Read.Cast<TKqlReadTable>();
maybeTablePrefix = GetRightTableKeyPrefix(read.Range());

if (!maybeTablePrefix) {
return false;
}

prefixSize = maybeTablePrefix.Cast().ArgCount();

if (!prefixSize) {
return true;
}
}
else {
readMatch = MatchRead<TKqlReadTableRangesBase>(expr);
if (readMatch) {
if (readMatch->FlatMap && !IsPassthroughFlatMap(readMatch->FlatMap.Cast(), nullptr)){
return false;
}
auto read = readMatch->Read.Cast<TKqlReadTableRangesBase>();
if (TCoVoid::Match(read.Ranges().Raw())) {
return true;
} else {
auto prompt = TKqpReadTableExplainPrompt::Parse(read);

if (prompt.PointPrefixLen != prompt.UsedKeyColumns.size()) {
return false;
}

if (prompt.ExpectedMaxRanges != TMaybe<ui64>(1)) {
return false;
}
prefixSize = prompt.PointPrefixLen;
}
}
}
if (! readMatch) {
return false;
}

if (prefixSize < node->Stats->KeyColumns.size() && !(find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) {
return node->Stats->KeyColumns[prefixSize] == s;
}))){
return false;
}

return true;
}

bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,
std::shared_ptr<IBaseOptimizerNode> right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
TKqpProviderContext& ctx) {

Y_UNUSED(left);

auto rightStats = right->Stats;

if (rightStats->Type != EStatisticsType::BaseTable) {
return false;
}
if (joinConditions.size() > rightStats->KeyColumns.size()) {
return false;
}

for (auto [leftCol, rightCol] : joinConditions) {
if (! find_if(rightStats->KeyColumns.begin(), rightStats->KeyColumns.end(),
[rightCol] (const TString& s) {
return rightCol.AttributeName == s;
} )) {
return false;
}
}

TVector<TString> joinKeys;
for( auto [leftJc, rightJc] : joinConditions ) {
joinKeys.emplace_back( rightJc.AttributeName);
}

return IsLookupJoinApplicableDetailed(std::static_pointer_cast<TRelOptimizerNode>(right), joinKeys, ctx);
}

}

bool TKqpProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinImplType joinImpl) {

switch( joinImpl ) {
case EJoinImplType::LookupJoin:
return IsLookupJoinApplicable(left, right, joinConditions, *this);
default:
return true;
}
}

double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinImplType joinImpl) const {

switch(joinImpl) {
case EJoinImplType::LookupJoin:
return -1;
default:
return leftStats.Nrows + 2.0 * rightStats.Nrows;
}
}


}
30 changes: 30 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>

#include <ydb/core/kqp/opt/kqp_opt.h>

namespace NKikimr::NKqp::NOpt {

struct TKqpRelOptimizerNode : public NYql::TRelOptimizerNode {
const NYql::TExprNode::TPtr Node;

TKqpRelOptimizerNode(TString label, std::shared_ptr<NYql::TOptimizerStatistics> stats, const NYql::TExprNode::TPtr node) :
TRelOptimizerNode(label, stats), Node(node) { }
};

struct TKqpProviderContext : public NYql::IProviderContext {
TKqpProviderContext(const TKqpOptimizeContext& kqpCtx) : KqpCtx(kqpCtx) {}

virtual bool IsJoinApplicable(const std::shared_ptr<NYql::IBaseOptimizerNode>& left,
const std::shared_ptr<NYql::IBaseOptimizerNode>& right,
const std::set<std::pair<NYql::NDq::TJoinColumn, NYql::NDq::TJoinColumn>>& joinConditions,
NYql::EJoinImplType joinImpl);

virtual double ComputeJoinCost(const NYql::TOptimizerStatistics& leftStats, const NYql::TOptimizerStatistics& rightStats, NYql::EJoinImplType joinImpl) const;

const TKqpOptimizeContext& KqpCtx;
};

}
7 changes: 6 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "kqp_opt_log_rules.h"
#include "kqp_opt_cbo.h"

#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
Expand Down Expand Up @@ -134,7 +135,11 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize);
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization(), maxDPccpDPTableSize);
TKqpProviderContext providerContext(KqpCtx);
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel),
maxDPccpDPTableSize, providerContext, [](auto& rels, auto label, auto node, auto stat) {
rels.emplace_back(std::make_shared<TKqpRelOptimizerNode>(TString(label), stat, node));
});
DumpAppliedRule("OptimizeEquiJoinWithCosts", node.Ptr(), output.Ptr(), ctx);
return output;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ TMaybe<TKqpMatchReadResult> MatchRead(NYql::NNodes::TExprBase node) {
return MatchRead(node, [] (NYql::NNodes::TExprBase node) { return node.Maybe<TRead>().IsValid(); });
}

NYql::NNodes::TMaybeNode<NYql::NNodes::TKqlKeyInc> GetRightTableKeyPrefix(const NYql::NNodes::TKqlKeyRange& range);

} // NKikimr::NKqp::NOpt


Expand Down
38 changes: 19 additions & 19 deletions ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,6 @@ TDqJoin FlipLeftSemiJoin(const TDqJoin& join, TExprContext& ctx) {
.Done();
}

TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
if (!range.From().Maybe<TKqlKeyInc>() || !range.To().Maybe<TKqlKeyInc>()) {
return {};
}
auto rangeFrom = range.From().Cast<TKqlKeyInc>();
auto rangeTo = range.To().Cast<TKqlKeyInc>();

if (rangeFrom.ArgCount() != rangeTo.ArgCount()) {
return {};
}
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) {
return {};
}
}

return rangeFrom;
}

TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos,
const TKqpTable& table, const TCoAtomList& columns,
const TExprBase& keysToLookup, const TVector<TCoAtom>& skipNullColumns, const TString& indexName,
Expand Down Expand Up @@ -859,6 +840,25 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext

} // anonymous namespace

TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
if (!range.From().Maybe<TKqlKeyInc>() || !range.To().Maybe<TKqlKeyInc>()) {
return {};
}
auto rangeFrom = range.From().Cast<TKqlKeyInc>();
auto rangeTo = range.To().Cast<TKqlKeyInc>();

if (rangeFrom.ArgCount() != rangeTo.ArgCount()) {
return {};
}
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
if (rangeFrom.Arg(i).Raw() != rangeTo.Arg(i).Raw()) {
return {};
}
}

return rangeFrom;
}

TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx)
{
if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin) || !node.Maybe<TDqJoin>()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/logical/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SRCS(
kqp_opt_log_sqlin.cpp
kqp_opt_log_sqlin_compact.cpp
kqp_opt_log.cpp
kqp_opt_cbo.cpp
)

PEERDIR(
Expand Down
Loading

0 comments on commit b11d10a

Please sign in to comment.