Skip to content

Commit

Permalink
Merge b3763f7 into 8472b26
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelvelikhov authored Jan 30, 2024
2 parents 8472b26 + b3763f7 commit a7db3b2
Show file tree
Hide file tree
Showing 30 changed files with 920 additions and 136 deletions.
12 changes: 9 additions & 3 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/kqp/opt/logical/kqp_opt_log.h>
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>


#include <ydb/core/kqp/opt/physical/kqp_opt_phy.h>
Expand All @@ -20,6 +21,8 @@
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/core/yql_opt_proposed_by_data.h>

#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

#include <util/generic/is_in.h>

namespace NKikimr {
Expand Down Expand Up @@ -143,6 +146,7 @@ class TKqpRunner : public IKqpRunner {
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)))
{
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
}
Expand Down Expand Up @@ -259,8 +263,8 @@ class TKqpRunner : public IKqpRunner {
.AddPostTypeAnnotation(/* forSubgraph */ true)
.AddCommonOptimization()
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config), "Statistics")
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "LogicalOptimize")
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
.Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "KqpPhysicalOptimize")
.Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize")
Expand Down Expand Up @@ -293,7 +297,7 @@ class TKqpRunner : public IKqpRunner {
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
.AddPostTypeAnnotation()
.Add(CreateKqpBuildPhysicalQueryTransformer(OptimizeCtx, BuildQueryCtx), "BuildPhysicalQuery")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config), "Statistics")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Build(false);

auto physicalPeepholeTransformer = TTransformationPipeline(typesCtx)
Expand Down Expand Up @@ -355,6 +359,8 @@ class TKqpRunner : public IKqpRunner {
TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx;
TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx;

TKqpProviderContext Pctx;

TAutoPtr<IGraphTransformer> Transformer;
};

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
#include <ydb/library/yql/utils/plan/plan_utils.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

#include <library/cpp/json/writer/json.h>
#include <library/cpp/json/json_reader.h>
Expand Down Expand Up @@ -1357,7 +1358,7 @@ class TxPlanSerializer {
}

void AddOptimizerEstimates(TOperator& op, const TExprBase& expr) {
if (!SerializerCtx.Config->HasOptEnableCostBasedOptimization()) {
if (SerializerCtx.Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)==0) {
return;
}

Expand Down
9 changes: 6 additions & 3 deletions ydb/core/kqp/opt/kqp_statistics_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
#include <ydb/library/yql/core/yql_cost_function.h>

#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>


#include <charconv>

using namespace NYql;
Expand Down Expand Up @@ -187,7 +190,7 @@ IGraphTransformer::TStatus TKqpStatisticsTransformer::DoTransform(TExprNode::TPt
TExprNode::TPtr& output, TExprContext& ctx) {

output = input;
if (!Config->HasOptEnableCostBasedOptimization()) {
if (Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel) == 0) {
return IGraphTransformer::TStatus::Ok;
}

Expand Down Expand Up @@ -238,6 +241,6 @@ bool TKqpStatisticsTransformer::AfterLambdasSpecific(const TExprNode::TPtr& inpu
}

TAutoPtr<IGraphTransformer> NKikimr::NKqp::CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) {
return THolder<IGraphTransformer>(new TKqpStatisticsTransformer(kqpCtx, typeCtx, config));
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx) {
return THolder<IGraphTransformer>(new TKqpStatisticsTransformer(kqpCtx, typeCtx, config, pctx));
}
7 changes: 4 additions & 3 deletions ydb/core/kqp/opt/kqp_statistics_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/yql/core/yql_statistics.h>

#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
Expand Down Expand Up @@ -33,8 +34,8 @@ class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase

public:
TKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx,
const TKikimrConfiguration::TPtr& config) :
TDqStatisticsTransformerBase(&typeCtx),
const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx) :
TDqStatisticsTransformerBase(&typeCtx, pctx),
Config(config),
KqpCtx(*kqpCtx) {}

Expand All @@ -47,6 +48,6 @@ class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase
};

TAutoPtr<IGraphTransformer> CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config);
TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, const TKqpProviderContext& pctx);
}
}
164 changes: 164 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#include "kqp_opt_cbo.h"
#include "kqp_opt_log_impl.h"

#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>


namespace NKikimr::NKqp::NOpt {

using namespace NYql;
using namespace NYql::NCommon;
using namespace NYql::NDq;
using namespace NYql::NNodes;

namespace {

/**
* KQP specific rule to check if a LookupJoin is applicable
*/
bool IsLookupJoinApplicableDetailed(const std::shared_ptr<NYql::TRelOptimizerNode>& node, const TVector<TString>& joinColumns, const TKqpProviderContext& ctx) {

auto rel = std::static_pointer_cast<TKqpRelOptimizerNode>(node);
auto expr = TExprBase(rel->Node);

if (ctx.KqpCtx.IsScanQuery() && !ctx.KqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin) {
return false;
}

if (find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) { return node->Stats->KeyColumns[0] == s;})) {
return true;
}

auto readMatch = MatchRead<TKqlReadTable>(expr);
TMaybeNode<TKqlKeyInc> maybeTablePrefix;
size_t prefixSize;

if (readMatch) {
if (readMatch->FlatMap && !IsPassthroughFlatMap(readMatch->FlatMap.Cast(), nullptr)){
return false;
}
auto read = readMatch->Read.Cast<TKqlReadTable>();
maybeTablePrefix = GetRightTableKeyPrefix(read.Range());

if (!maybeTablePrefix) {
return false;
}

prefixSize = maybeTablePrefix.Cast().ArgCount();

if (!prefixSize) {
return true;
}
}
else {
readMatch = MatchRead<TKqlReadTableRangesBase>(expr);
if (readMatch) {
if (readMatch->FlatMap && !IsPassthroughFlatMap(readMatch->FlatMap.Cast(), nullptr)){
return false;
}
auto read = readMatch->Read.Cast<TKqlReadTableRangesBase>();
if (TCoVoid::Match(read.Ranges().Raw())) {
return true;
} else {
auto prompt = TKqpReadTableExplainPrompt::Parse(read);

if (prompt.PointPrefixLen != prompt.UsedKeyColumns.size()) {
return false;
}

if (prompt.ExpectedMaxRanges != TMaybe<ui64>(1)) {
return false;
}
prefixSize = prompt.PointPrefixLen;
}
}
}
if (! readMatch) {
return false;
}

if (prefixSize < node->Stats->KeyColumns.size() && !(find_if(joinColumns.begin(), joinColumns.end(), [&] (const TString& s) {
return node->Stats->KeyColumns[prefixSize] == s;
}))){
return false;
}

return true;
}

bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,
std::shared_ptr<IBaseOptimizerNode> right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
TKqpProviderContext& ctx) {

Y_UNUSED(left);

auto rightStats = right->Stats;

if (rightStats->Type != EStatisticsType::BaseTable) {
return false;
}
if (joinConditions.size() > rightStats->KeyColumns.size()) {
return false;
}

for (auto [leftCol, rightCol] : joinConditions) {
if (! find_if(rightStats->KeyColumns.begin(), rightStats->KeyColumns.end(),
[rightCol] (const TString& s) {
return rightCol.AttributeName == s;
} )) {
return false;
}
}

TVector<TString> joinKeys;
for( auto [leftJc, rightJc] : joinConditions ) {
joinKeys.emplace_back( rightJc.AttributeName);
}

return IsLookupJoinApplicableDetailed(std::static_pointer_cast<TRelOptimizerNode>(right), joinKeys, ctx);
}

}

bool TKqpProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinAlgoType joinAlgo) {

switch( joinAlgo ) {
case EJoinAlgoType::LookupJoin:
if (OptLevel==2 && left->Stats->Nrows > 10e3) {
return false;
}
return IsLookupJoinApplicable(left, right, joinConditions, *this);

case EJoinAlgoType::DictJoin:
return right->Stats->Nrows < 10e5;
case EJoinAlgoType::MapJoin:
return right->Stats->Nrows < 10e6;
case EJoinAlgoType::GraceJoin:
return true;
}
}

double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinAlgoType joinAlgo) const {

switch(joinAlgo) {
case EJoinAlgoType::LookupJoin:
if (OptLevel==1) {
return -1;
}
return leftStats.Nrows;
case EJoinAlgoType::DictJoin:
return leftStats.Nrows + 1.7 * rightStats.Nrows;
case EJoinAlgoType::MapJoin:
return leftStats.Nrows + 1.8 * rightStats.Nrows;
case EJoinAlgoType::GraceJoin:
return leftStats.Nrows + 2.0 * rightStats.Nrows;
}
}


}
37 changes: 37 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>

#include <ydb/core/kqp/opt/kqp_opt.h>

namespace NKikimr::NKqp::NOpt {

/**
* KQP specific Rel node, includes a pointer to ExprNode
*/
struct TKqpRelOptimizerNode : public NYql::TRelOptimizerNode {
const NYql::TExprNode::TPtr Node;

TKqpRelOptimizerNode(TString label, std::shared_ptr<NYql::TOptimizerStatistics> stats, const NYql::TExprNode::TPtr node) :
TRelOptimizerNode(label, stats), Node(node) { }
};

/**
* KQP Specific cost function and join applicability cost function
*/
struct TKqpProviderContext : public NYql::IProviderContext {
TKqpProviderContext(const TKqpOptimizeContext& kqpCtx, const int optLevel) : KqpCtx(kqpCtx), OptLevel(optLevel) {}

virtual bool IsJoinApplicable(const std::shared_ptr<NYql::IBaseOptimizerNode>& left,
const std::shared_ptr<NYql::IBaseOptimizerNode>& right,
const std::set<std::pair<NYql::NDq::TJoinColumn, NYql::NDq::TJoinColumn>>& joinConditions,
NYql::EJoinAlgoType joinAlgo) override;

virtual double ComputeJoinCost(const NYql::TOptimizerStatistics& leftStats, const NYql::TOptimizerStatistics& rightStats, NYql::EJoinAlgoType joinAlgo) const override;

const TKqpOptimizeContext& KqpCtx;
int OptLevel;
};

}
15 changes: 11 additions & 4 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "kqp_opt_log_rules.h"
#include "kqp_opt_cbo.h"

#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
Expand All @@ -21,11 +22,12 @@ using namespace NYql::NNodes;
class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
public:
TKqpLogicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
const TKikimrConfiguration::TPtr& config)
const TKikimrConfiguration::TPtr& config, TKqpProviderContext& pctx)
: TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
, TypesCtx(typesCtx)
, KqpCtx(*kqpCtx)
, Config(config)
, Pctx(pctx)
{
#define HNDL(name) "KqpLogical-"#name, Hndl(&TKqpLogicalOptTransformer::name)
AddHandler(0, &TCoFlatMapBase::Match, HNDL(PushPredicateToReadTable));
Expand Down Expand Up @@ -134,7 +136,10 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize);
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization(), maxDPccpDPTableSize);
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel),
maxDPccpDPTableSize, Pctx, [](auto& rels, auto label, auto node, auto stat) {
rels.emplace_back(std::make_shared<TKqpRelOptimizerNode>(TString(label), stat, node));
});
DumpAppliedRule("OptimizeEquiJoinWithCosts", node.Ptr(), output.Ptr(), ctx);
return output;
}
Expand Down Expand Up @@ -269,12 +274,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
TTypeAnnotationContext& TypesCtx;
const TKqpOptimizeContext& KqpCtx;
const TKikimrConfiguration::TPtr& Config;
TKqpProviderContext& Pctx;
};

TAutoPtr<IGraphTransformer> CreateKqpLogOptTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config)
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config,
TKqpProviderContext& pctx)
{
return THolder<IGraphTransformer>(new TKqpLogicalOptTransformer(typesCtx, kqpCtx, config));
return THolder<IGraphTransformer>(new TKqpLogicalOptTransformer(typesCtx, kqpCtx, config, pctx));
}

} // namespace NKikimr::NKqp::NOpt
4 changes: 3 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include <ydb/core/kqp/opt/kqp_opt.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>

namespace NKikimr::NKqp::NOpt {

struct TKqpOptimizeContext;

TAutoPtr<NYql::IGraphTransformer> CreateKqpLogOptTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
NYql::TTypeAnnotationContext& typesCtx, const NYql::TKikimrConfiguration::TPtr& config);
NYql::TTypeAnnotationContext& typesCtx, const NYql::TKikimrConfiguration::TPtr& config,
NKikimr::NKqp::NOpt::TKqpProviderContext& pctx);

} // namespace NKikimr::NKqp::NOpt
Loading

0 comments on commit a7db3b2

Please sign in to comment.