Skip to content

Commit

Permalink
YQ-2068 flag to enable streamlookup join strategy (#3488)
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored Apr 5, 2024
1 parent 8addd9a commit 5bb9671
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 8 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {

TMaybeNode<TExprBase> RewriteEquiJoin(TExprBase node, TExprContext& ctx) {
bool useCBO = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel) == 3;
TExprBase output = DqRewriteEquiJoin(node, KqpCtx.Config->GetHashJoinMode(), useCBO, ctx);
TExprBase output = DqRewriteEquiJoin(node, KqpCtx.Config->GetHashJoinMode(), useCBO, ctx, TypesCtx);
DumpAppliedRule("RewriteEquiJoin", node.Ptr(), output.Ptr(), ctx);
return output;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/cfg/tests/gateways.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ YqlCore {
Flags {
Name: "_EnableMatchRecognize"
}
Flags {
Name: "_EnableStreamLookupJoin"
}
}

Dq {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_type_annotation.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
bool OrderedColumns = false;
TColumnOrderStorage::TPtr ColumnOrderStorage = new TColumnOrderStorage;
THashSet<TString> OptimizerFlags;
bool StreamLookupJoin = false;

TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const;
IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx);
Expand Down
11 changes: 6 additions & 5 deletions ydb/library/yql/dq/opt/dq_opt_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ TExprBase BuildDqJoinInput(TExprContext& ctx, TPositionHandle pos, const TExprBa
}

TMaybe<TJoinInputDesc> BuildDqJoin(const TCoEquiJoinTuple& joinTuple,
const THashMap<TStringBuf, TJoinInputDesc>& inputs, EHashJoinMode mode, bool useCBO, TExprContext& ctx)
const THashMap<TStringBuf, TJoinInputDesc>& inputs, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx)
{
auto options = joinTuple.Options();
auto linkSettings = GetEquiJoinLinkSettings(options.Ref());
YQL_ENSURE(linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin || typeCtx.StreamLookupJoin, "Unsupported join strategy: streamlookup");
bool leftAny = linkSettings.LeftHints.contains("any");
bool rightAny = linkSettings.RightHints.contains("any");

Expand All @@ -129,7 +130,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(const TCoEquiJoinTuple& joinTuple,
left = inputs.at(joinTuple.LeftScope().Cast<TCoAtom>().Value());
YQL_ENSURE(left, "unknown scope " << joinTuple.LeftScope().Cast<TCoAtom>().Value());
} else {
left = BuildDqJoin(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), inputs, mode, useCBO, ctx);
left = BuildDqJoin(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), inputs, mode, useCBO, ctx, typeCtx);
if (!left) {
return {};
}
Expand All @@ -140,7 +141,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(const TCoEquiJoinTuple& joinTuple,
right = inputs.at(joinTuple.RightScope().Cast<TCoAtom>().Value());
YQL_ENSURE(right, "unknown scope " << joinTuple.RightScope().Cast<TCoAtom>().Value());
} else {
right = BuildDqJoin(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), inputs, mode, useCBO, ctx);
right = BuildDqJoin(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), inputs, mode, useCBO, ctx, typeCtx);
if (!right) {
return {};
}
Expand Down Expand Up @@ -373,7 +374,7 @@ bool CheckJoinColumns(const TExprBase& node) {
* physical stages with join operators.
* Potentially this optimizer can also perform joins reorder given cardinality information.
*/
TExprBase DqRewriteEquiJoin(const TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx) {
TExprBase DqRewriteEquiJoin(const TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx) {
if (!node.Maybe<TCoEquiJoin>()) {
return node;
}
Expand All @@ -390,7 +391,7 @@ TExprBase DqRewriteEquiJoin(const TExprBase& node, EHashJoinMode mode, bool useC
}

auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>();
auto result = BuildDqJoin(joinTuple, inputs, mode, useCBO, ctx);
auto result = BuildDqJoin(joinTuple, inputs, mode, useCBO, ctx, typeCtx);
if (!result) {
return node;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/opt/dq_opt_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct TRelOptimizerNode;

namespace NDq {

NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx);
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx);

NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);

Expand Down
7 changes: 7 additions & 0 deletions ydb/library/yql/providers/config/yql_config_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,13 @@ namespace {
Types.OptimizerFlags.insert(to_lower(ToString(arg)));
}
}
else if (name == "_EnableStreamLookupJoin" || name == "DisableStreamLookupJoin") {
if (args.size() != 0) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected no arguments, but got " << args.size()));
return false;
}
Types.StreamLookupJoin = name == "_EnableStreamLookupJoin";
}
else {
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
return false;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/opt/logical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
hasDqConnections |= !!list.Maybe<TDqConnection>();
}

return hasDqConnections ? DqRewriteEquiJoin(node, Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off), false, ctx) : node;
return hasDqConnections ? DqRewriteEquiJoin(node, Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off), false, ctx, TypesCtx) : node;
}

TMaybeNode<TExprBase> ExpandWindowFunctions(TExprBase node, TExprContext& ctx) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/yql/tools/dqrun/examples/gateways.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ HttpGateway {
DownloadBufferBytesLimit: 131072
}

YqlCore {
Flags {
Name: "_EnableStreamLookupJoin"
}
}

SqlCore {
TranslationFlags: ["FlexibleTypes", "DisableAnsiOptionalAs", "EmitAggApply"]
}
Expand Down

0 comments on commit 5bb9671

Please sign in to comment.