Skip to content

Commit

Permalink
fix(kqp): add left filter for equal columns before stream join (ydb-p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored and MrLolthe1st committed May 28, 2024
1 parent 1291824 commit 810f05e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
7 changes: 7 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,13 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
};

if (useStreamIndexLookupJoin) {
if (filter.IsValid()) {
leftData = Build<TCoFilter>(ctx, join.Pos())
.Input(leftData)
.Lambda(filter.Cast())
.Done();
}

auto leftInput = Build<TCoFlatMap>(ctx, join.Pos())
.Input(leftData)
.Lambda()
Expand Down
102 changes: 102 additions & 0 deletions ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,108 @@ Y_UNIT_TEST(CheckCastUtf8ToString) {
TestKeyCast(session, "LEFT", "Utf8", "String", answer, 1);
}

Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
auto settings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{ // create tables
const TString query = R"(
CREATE TABLE `/Root/Left` (
Key1 Int64,
Key2 Int64,
Fk Int64,
Value1 String,
Value2 String,
PRIMARY KEY (Key1, Key2)
);
CREATE TABLE `/Root/Right` (
Key1 Int64,
Key2 String,
Value String,
PRIMARY KEY (Key1, Key2)
);
)";
UNIT_ASSERT(session.ExecuteSchemeQuery(query).GetValueSync().IsSuccess());
}

{ // fill tables
const TString query = R"(
REPLACE INTO `/Root/Left` (Key1, Key2, Fk, Value1, Value2) VALUES
(1, 1, 1, "one", "value1"),
(2, 2, 20, "two", "two"),
(NULL, 3, NULL, "three", "value3");
REPLACE INTO `/Root/Right` (Key1, Key2, Value) VALUES
(1, "one", "value1"),
(2, "two", "value2"),
(NULL, "three", "value3");
)";
UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess());
}

{ // execute join with left filter before lookup join: l.Key1 = l.Key2 = l.Fk
TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);

const TString query = R"(
SELECT l.Key1, l.Key2, l.Fk, r.Key1
FROM `/Root/Left` AS l
INNER JOIN `/Root/Right` AS r
ON l.Key1 = r.Key1
AND l.Key2 = r.Key1
AND l.Fk = r.Key1
ORDER BY l.Key1, l.Key2, l.Fk, r.Key1
)";

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
CompareYson(R"([
[[1];[1];[1];[1]]
])", FormatResultSetYson(result.GetResultSet(0)));

const ui32 index = (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin() ? 0 : 1);
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
for (const auto& tableStats : stats.query_phases(index).table_access()) {
if (tableStats.name() == "/Root/Right") {
UNIT_ASSERT_VALUES_EQUAL(tableStats.reads().rows(), 1);
}
}
}

{ // execute join with left filter before lookup join: l.Key1 = l.Key2 AND l.Value1 = l.Value2
TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);

const TString query = R"(
SELECT l.Key1, l.Key2, r.Key1, l.Value1, l.Value2, r.Key2
FROM `/Root/Left` AS l
INNER JOIN `/Root/Right` AS r
ON l.Key1 = r.Key1
AND l.Key2 = r.Key1
AND l.Value1 = r.Key2
AND l.Value2 = r.Key2
ORDER BY l.Key1, l.Key2, r.Key1
)";

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
CompareYson(R"([
[[2];[2];[2];["two"];["two"];["two"]]
])", FormatResultSetYson(result.GetResultSet(0)));

const ui32 index = (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin() ? 0 : 1);
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
for (const auto& tableStats : stats.query_phases(index).table_access()) {
if (tableStats.name() == "/Root/Right") {
UNIT_ASSERT_VALUES_EQUAL(tableStats.reads().rows(), 1);
}
}
}
}

} // suite

} // namespace NKqp
Expand Down

0 comments on commit 810f05e

Please sign in to comment.