Skip to content

Commit

Permalink
PushLeftStage fix (#7465)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Aug 6, 2024
1 parent 20af1f5 commit b5c55a4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
{
// TODO: Allow push to left stage for data queries.
// It is now possible as we don't use datashard transactions for reads in data queries.
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
pushLeftStage, KqpCtx.Config->GetHashJoinMode()
);
Expand Down
65 changes: 65 additions & 0 deletions ydb/core/kqp/ut/join/kqp_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,71 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
}
}

Y_UNIT_TEST(TwoJoinsWithQueryService) {
NKikimrConfig::TAppConfig appConfig;
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(serverSettings);
auto client = kikimr.GetTableClient();
auto db = kikimr.GetQueryClient();
auto settings = NYdb::NQuery::TExecuteQuerySettings();

{
auto session = client.CreateSession().GetValueSync().GetSession();
const auto query = Q_(R"(
CREATE TABLE ta(
a Int64,
b Int64,
c Int64,
PRIMARY KEY(a)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto session = client.CreateSession().GetValueSync().GetSession();
const auto query = Q_(R"(
CREATE TABLE tb(
b Int64,
bval Int64,
PRIMARY KEY(b)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto session = client.CreateSession().GetValueSync().GetSession();
const auto query = Q_(R"(
CREATE TABLE tc(
c Int64,
cval Int64,
PRIMARY KEY(c)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto result = db.ExecuteQuery(R"(
UPSERT INTO ta(a, b, c) VALUES (1, 1001, 2001), (2, 1002, 2002), (3, 1003, 2003);
UPSERT INTO tb(b, bval) VALUES (1001, 1001), (1002, 1002), (1003, 1003);
UPSERT INTO tc(c, cval) VALUES (2001, 2001), (2002, 2002), (2003, 2003);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
auto result = db.ExecuteQuery(R"(
SELECT ta.a, tb.bval, tc.cval FROM ta INNER JOIN tb ON ta.b = tb.b LEFT JOIN tc ON ta.c = tc.cval;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[[3];[1003];[2003]];[[2];[1002];[2002]];[[1];[1001];[2001]]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

// join on key prefix => index-lookup
Y_UNIT_TEST(RightSemiJoin_KeyPrefix) {
TKikimrRunner kikimr(SyntaxV1Settings());
Expand Down

0 comments on commit b5c55a4

Please sign in to comment.