Skip to content

Commit

Permalink
Addressed Alexey's comments, added opt level 2
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelvelikhov committed Jan 30, 2024
1 parent 8d336d5 commit b3763f7
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 44 deletions.
6 changes: 4 additions & 2 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/core/yql_opt_proposed_by_data.h>

#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

#include <util/generic/is_in.h>

namespace NKikimr {
Expand Down Expand Up @@ -144,7 +146,7 @@ class TKqpRunner : public IKqpRunner {
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
, Pctx(TKqpProviderContext(*OptimizeCtx))
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)))
{
CreateGraphTransformer(typesCtx, sessionCtx, funcRegistry);
}
Expand Down Expand Up @@ -262,7 +264,7 @@ class TKqpRunner : public IKqpRunner {
.AddCommonOptimization()
.Add(CreateKqpConstantFoldingTransformer(OptimizeCtx, *typesCtx, Config), "ConstantFolding")
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config), "LogicalOptimize")
.Add(CreateKqpLogOptTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "LogicalOptimize")
.Add(CreateLogicalDataProposalsInspector(*typesCtx), "ProvidersLogicalOptimize")
.Add(CreateKqpPhyOptTransformer(OptimizeCtx, *typesCtx), "KqpPhysicalOptimize")
.Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize")
Expand Down
33 changes: 24 additions & 9 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,37 @@ bool IsLookupJoinApplicable(std::shared_ptr<IBaseOptimizerNode> left,
bool TKqpProviderContext::IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinImplType joinImpl) {
EJoinAlgoType joinAlgo) {

switch( joinImpl ) {
case EJoinImplType::LookupJoin:
switch( joinAlgo ) {
case EJoinAlgoType::LookupJoin:
if (OptLevel==2 && left->Stats->Nrows > 10e3) {
return false;
}
return IsLookupJoinApplicable(left, right, joinConditions, *this);
default:

case EJoinAlgoType::DictJoin:
return right->Stats->Nrows < 10e5;
case EJoinAlgoType::MapJoin:
return right->Stats->Nrows < 10e6;
case EJoinAlgoType::GraceJoin:
return true;
}
}

double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinImplType joinImpl) const {
double TKqpProviderContext::ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinAlgoType joinAlgo) const {

switch(joinImpl) {
case EJoinImplType::LookupJoin:
return -1;
default:
switch(joinAlgo) {
case EJoinAlgoType::LookupJoin:
if (OptLevel==1) {
return -1;
}
return leftStats.Nrows;
case EJoinAlgoType::DictJoin:
return leftStats.Nrows + 1.7 * rightStats.Nrows;
case EJoinAlgoType::MapJoin:
return leftStats.Nrows + 1.8 * rightStats.Nrows;
case EJoinAlgoType::GraceJoin:
return leftStats.Nrows + 2.0 * rightStats.Nrows;
}
}
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/opt/logical/kqp_opt_cbo.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,31 @@

namespace NKikimr::NKqp::NOpt {

/**
* KQP specific Rel node, includes a pointer to ExprNode
*/
struct TKqpRelOptimizerNode : public NYql::TRelOptimizerNode {
const NYql::TExprNode::TPtr Node;

TKqpRelOptimizerNode(TString label, std::shared_ptr<NYql::TOptimizerStatistics> stats, const NYql::TExprNode::TPtr node) :
TRelOptimizerNode(label, stats), Node(node) { }
};

/**
* KQP Specific cost function and join applicability cost function
*/
struct TKqpProviderContext : public NYql::IProviderContext {
TKqpProviderContext(const TKqpOptimizeContext& kqpCtx) : KqpCtx(kqpCtx) {}
TKqpProviderContext(const TKqpOptimizeContext& kqpCtx, const int optLevel) : KqpCtx(kqpCtx), OptLevel(optLevel) {}

virtual bool IsJoinApplicable(const std::shared_ptr<NYql::IBaseOptimizerNode>& left,
const std::shared_ptr<NYql::IBaseOptimizerNode>& right,
const std::set<std::pair<NYql::NDq::TJoinColumn, NYql::NDq::TJoinColumn>>& joinConditions,
NYql::EJoinImplType joinImpl);
NYql::EJoinAlgoType joinAlgo) override;

virtual double ComputeJoinCost(const NYql::TOptimizerStatistics& leftStats, const NYql::TOptimizerStatistics& rightStats, NYql::EJoinImplType joinImpl) const;
virtual double ComputeJoinCost(const NYql::TOptimizerStatistics& leftStats, const NYql::TOptimizerStatistics& rightStats, NYql::EJoinAlgoType joinAlgo) const override;

const TKqpOptimizeContext& KqpCtx;
int OptLevel;
};

}
12 changes: 7 additions & 5 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ using namespace NYql::NNodes;
class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
public:
TKqpLogicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
const TKikimrConfiguration::TPtr& config)
const TKikimrConfiguration::TPtr& config, TKqpProviderContext& pctx)
: TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
, TypesCtx(typesCtx)
, KqpCtx(*kqpCtx)
, Config(config)
, Pctx(pctx)
{
#define HNDL(name) "KqpLogical-"#name, Hndl(&TKqpLogicalOptTransformer::name)
AddHandler(0, &TCoFlatMap::Match, HNDL(PushPredicateToReadTable));
Expand Down Expand Up @@ -135,9 +136,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize);
TKqpProviderContext providerContext(KqpCtx);
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel),
maxDPccpDPTableSize, providerContext, [](auto& rels, auto label, auto node, auto stat) {
maxDPccpDPTableSize, Pctx, [](auto& rels, auto label, auto node, auto stat) {
rels.emplace_back(std::make_shared<TKqpRelOptimizerNode>(TString(label), stat, node));
});
DumpAppliedRule("OptimizeEquiJoinWithCosts", node.Ptr(), output.Ptr(), ctx);
Expand Down Expand Up @@ -274,12 +274,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
TTypeAnnotationContext& TypesCtx;
const TKqpOptimizeContext& KqpCtx;
const TKikimrConfiguration::TPtr& Config;
TKqpProviderContext& Pctx;
};

TAutoPtr<IGraphTransformer> CreateKqpLogOptTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config)
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config,
TKqpProviderContext& pctx)
{
return THolder<IGraphTransformer>(new TKqpLogicalOptTransformer(typesCtx, kqpCtx, config));
return THolder<IGraphTransformer>(new TKqpLogicalOptTransformer(typesCtx, kqpCtx, config, pctx));
}

} // namespace NKikimr::NKqp::NOpt
4 changes: 3 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include <ydb/core/kqp/opt/kqp_opt.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>

namespace NKikimr::NKqp::NOpt {

struct TKqpOptimizeContext;

TAutoPtr<NYql::IGraphTransformer> CreateKqpLogOptTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
NYql::TTypeAnnotationContext& typesCtx, const NYql::TKikimrConfiguration::TPtr& config);
NYql::TTypeAnnotationContext& typesCtx, const NYql::TKikimrConfiguration::TPtr& config,
NKikimr::NKqp::NOpt::TKqpProviderContext& pctx);

} // namespace NKikimr::NKqp::NOpt
5 changes: 3 additions & 2 deletions ydb/library/yql/core/cbo/cbo_optimizer_new.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ void TRelOptimizerNode::Print(std::stringstream& stream, int ntabs) {
}

TJoinOptimizerNode::TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const EJoinKind joinType, bool nonReorderable) :
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const EJoinKind joinType, const EJoinAlgoType joinAlgo, bool nonReorderable) :
IBaseOptimizerNode(JoinNodeType),
LeftArg(left),
RightArg(right),
JoinConditions(joinConditions),
JoinType(joinType) {
JoinType(joinType),
JoinAlgo(joinAlgo) {
IsReorderable = (JoinType==EJoinKind::InnerJoin) && (nonReorderable==false);
}

Expand Down
17 changes: 9 additions & 8 deletions ydb/library/yql/core/cbo/cbo_optimizer_new.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,36 @@ TString ConvertToJoinString(const EJoinKind kind);
struct IProviderContext {
virtual ~IProviderContext() = default;

virtual double ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinImplType joinImpl) const = 0;
virtual double ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinAlgoType joinAlgol) const = 0;

virtual bool IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinImplType joinImpl) = 0;
EJoinAlgoType joinAlgo) = 0;

};

/**
* Temporary solution for default provider context
*/

struct TDummyProviderContext : IProviderContext {
struct TDummyProviderContext : public IProviderContext {
TDummyProviderContext() {}

double ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinImplType joinImpl) const {
Y_UNUSED(joinImpl);
double ComputeJoinCost(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats, EJoinAlgoType joinAlgo) const override {
Y_UNUSED(joinAlgo);
return leftStats.Nrows + 2.0 * rightStats.Nrows;
}

bool IsJoinApplicable(const std::shared_ptr<IBaseOptimizerNode>& left,
const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
EJoinImplType joinImpl) {
EJoinAlgoType joinAlgo) override {

Y_UNUSED(left);
Y_UNUSED(right);
Y_UNUSED(joinConditions);
Y_UNUSED(joinImpl);
Y_UNUSED(joinAlgo);

return true;
}
Expand All @@ -139,10 +139,11 @@ struct TJoinOptimizerNode : public IBaseOptimizerNode {
std::shared_ptr<IBaseOptimizerNode> RightArg;
std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>> JoinConditions;
EJoinKind JoinType;
EJoinAlgoType JoinAlgo;
bool IsReorderable;

TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, const EJoinKind joinType, bool nonReorderable=false);
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, const EJoinKind joinType, const EJoinAlgoType joinAlgo, bool nonReorderable=false);
virtual ~TJoinOptimizerNode() {}
virtual TVector<TString> Labels();
virtual void Print(std::stringstream& stream, int ntabs=0);
Expand Down
8 changes: 4 additions & 4 deletions ydb/library/yql/core/yql_cost_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ bool NDq::operator < (const NDq::TJoinColumn& c1, const NDq::TJoinColumn& c2) {
*/

TOptimizerStatistics NYql::ComputeJoinStats(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats,
const TVector<TString>& leftJoinKeys, const TVector<TString>& rightJoinKeys, EJoinImplType joinImpl, const IProviderContext& ctx) {
const TVector<TString>& leftJoinKeys, const TVector<TString>& rightJoinKeys, EJoinAlgoType joinAlgo, const IProviderContext& ctx) {

double newCard;
EStatisticsType outputType;
Expand Down Expand Up @@ -70,15 +70,15 @@ TOptimizerStatistics NYql::ComputeJoinStats(const TOptimizerStatistics& leftStat

int newNCols = leftStats.Ncols + rightStats.Ncols;

double cost = ctx.ComputeJoinCost(leftStats, rightStats, joinImpl)
double cost = ctx.ComputeJoinCost(leftStats, rightStats, joinAlgo)
+ newCard
+ leftStats.Cost + rightStats.Cost;

return TOptimizerStatistics(outputType, newCard, newNCols, cost, joinedTableKeys);
}

TOptimizerStatistics NYql::ComputeJoinStats(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, EJoinImplType joinImpl, const IProviderContext& ctx) {
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, EJoinAlgoType joinAlgo, const IProviderContext& ctx) {

TVector<TString> leftJoinKeys;
TVector<TString> rightJoinKeys;
Expand All @@ -88,5 +88,5 @@ TOptimizerStatistics NYql::ComputeJoinStats(const TOptimizerStatistics& leftStat
rightJoinKeys.emplace_back(c.second.AttributeName);
}

return ComputeJoinStats(leftStats, rightStats, leftJoinKeys, rightJoinKeys, joinImpl, ctx);
return ComputeJoinStats(leftStats, rightStats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx);
}
8 changes: 4 additions & 4 deletions ydb/library/yql/core/yql_cost_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@ bool operator < (const TJoinColumn& c1, const TJoinColumn& c2);

}

enum EJoinImplType {
enum EJoinAlgoType {
DictJoin,
MapJoin,
GraceJoin,
LookupJoin
};

static const EJoinImplType AllJoinTypes[] = { DictJoin, MapJoin, GraceJoin, LookupJoin };
static const EJoinAlgoType AllJoinTypes[] = { DictJoin, MapJoin, GraceJoin, LookupJoin };

TOptimizerStatistics ComputeJoinStats(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats,
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, EJoinImplType joinType, const IProviderContext& ctx);
const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, EJoinAlgoType joinAlgo, const IProviderContext& ctx);

TOptimizerStatistics ComputeJoinStats(const TOptimizerStatistics& leftStats, const TOptimizerStatistics& rightStats,
const TVector<TString>& leftJoinKeys, const TVector<TString>& rightJoinKeys, EJoinImplType joinType, const IProviderContext& ctx);
const TVector<TString>& leftJoinKeys, const TVector<TString>& rightJoinKeys, EJoinAlgoType joinAlgo, const IProviderContext& ctx);

}
19 changes: 13 additions & 6 deletions ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,18 @@ void ComputeJoinConditions(const TCoEquiJoinTuple& joinTuple,
std::shared_ptr<TJoinOptimizerNode> MakeJoin(std::shared_ptr<IBaseOptimizerNode> left,
std::shared_ptr<IBaseOptimizerNode> right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
EJoinImplType joinImpl,
EJoinAlgoType joinAlgo,
IProviderContext& ctx) {

auto res = std::make_shared<TJoinOptimizerNode>(left, right, joinConditions, EJoinKind::InnerJoin);
res->Stats = std::make_shared<TOptimizerStatistics>( ComputeJoinStats(*left->Stats, *right->Stats, joinConditions, joinImpl, ctx));
auto res = std::make_shared<TJoinOptimizerNode>(left, right, joinConditions, EJoinKind::InnerJoin, joinAlgo);
res->Stats = std::make_shared<TOptimizerStatistics>( ComputeJoinStats(*left->Stats, *right->Stats, joinConditions, joinAlgo, ctx));
return res;
}

/**
* Iterate over all join algorithms and pick the best join that is applicable.
* Also considers commuting joins
*/
std::shared_ptr<TJoinOptimizerNode> PickBestJoin(std::shared_ptr<IBaseOptimizerNode> left,
std::shared_ptr<IBaseOptimizerNode> right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& leftJoinConditions,
Expand Down Expand Up @@ -145,6 +149,9 @@ std::shared_ptr<TJoinOptimizerNode> PickBestJoin(std::shared_ptr<IBaseOptimizerN
return res;
}

/**
* Iterate over all join algorithms and pick the best join that is applicable
*/
std::shared_ptr<TJoinOptimizerNode> PickBestNonReorderabeJoin(std::shared_ptr<IBaseOptimizerNode> left,
std::shared_ptr<IBaseOptimizerNode> right,
const std::set<std::pair<TJoinColumn, TJoinColumn>>& leftJoinConditions,
Expand Down Expand Up @@ -927,7 +934,7 @@ std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& jo
TJoinColumn(rightScope, rightColumn)));
}

return std::make_shared<TJoinOptimizerNode>(left,right,joinConds,ConvertToJoinKind(joinTuple.Type().StringValue()));
return std::make_shared<TJoinOptimizerNode>(left, right, joinConds, ConvertToJoinKind(joinTuple.Type().StringValue()), EJoinAlgoType::DictJoin);
}

/**
Expand Down Expand Up @@ -992,7 +999,7 @@ void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join, IProvide
if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), ctx);
}
join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, join->JoinConditions, EJoinImplType::DictJoin, ctx));
join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, join->JoinConditions, EJoinAlgoType::DictJoin, ctx));
}

/**
Expand Down Expand Up @@ -1086,7 +1093,7 @@ class TOptimizerNativeNew: public IOptimizerNew {
if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), MaxDPccpDPTableSize, Pctx);
}
join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, join->JoinConditions, EJoinImplType::DictJoin, Pctx));
join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, join->JoinConditions, EJoinAlgoType::DictJoin, Pctx));
}

// Optimize the root
Expand Down

0 comments on commit b3763f7

Please sign in to comment.