From 8c71b1e07a8d4018d233d4fbb23780075e5bd26b Mon Sep 17 00:00:00 2001 From: Iuliia Sidorina Date: Thu, 16 May 2024 15:34:19 +0200 Subject: [PATCH] fix(kqp): use safe key cast for stream lookup join (#4326) --- ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 27 +- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 15 +- .../kqp/ut/join/kqp_index_lookup_join_ut.cpp | 305 +++++++++++++----- 3 files changed, 250 insertions(+), 97 deletions(-) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 44ac4d96eea3..91118fda904c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -598,6 +598,11 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext leftJoinKeys.emplace(leftKey); } + const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) + && kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin + && supportedStreamJoinKinds.contains(join.JoinType().Value()) + && !indexName; + auto leftRowArg = Build(ctx, join.Pos()) .Name("leftRowArg") .Done(); @@ -677,10 +682,19 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext } if (canCast) { DBG("------ cast " << leftDataType->GetName() << " to " << rightDataType->GetName()); - member = Build(ctx, join.Pos()) - .Input(member) - .Type().Build(rightDataType->GetName()) - .Done().Ptr(); + + if (useStreamIndexLookupJoin) { + // For stream lookup join we should cast keys before join + member = Build(ctx, join.Pos()) + .Value(member) + .Type(ExpandType(join.Pos(), *rightType, ctx)) + .Done().Ptr(); + } else { + member = Build(ctx, join.Pos()) + .Input(member) + .Type().Build(rightDataType->GetName()) + .Done().Ptr(); + } } else { DBG("------ can not cast " << leftDataType->GetName() << " to " << rightDataType->GetName()); return {}; @@ -704,11 +718,6 @@ TMaybeNode KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext return {}; } - const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) - && kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin - && supportedStreamJoinKinds.contains(join.JoinType().Value()) - && !indexName; - bool needPrecomputeLeft = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && !join.LeftInput().Maybe() && !IsParameterToListOfStructsRepack(join.LeftInput()) diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 854c178780a2..62c89de56a71 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -601,9 +601,18 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { break; } - UnprocessedRows.pop_front(); + auto hasNulls = [](const TOwnedCellVec& cellVec) { + for (const auto& cell : cellVec) { + if (cell.IsNull()) { + return true; + } + } - if (!joinKey.data()->IsNull()) { // don't use nulls as lookup keys, because null != null + return false; + }; + + UnprocessedRows.pop_front(); + if (!hasNulls(joinKey)) { // don't use nulls as lookup keys, because null != null std::vector > partitions; if (joinKey.size() < KeyColumns.size()) { // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) @@ -730,7 +739,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) { auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name); YQL_ENSURE(it != ReadColumns.end()); - joinKeyCells[joinKeyIdx] = row[std::distance(ReadColumns.begin(), it)]; + joinKeyCells[LookupKeyColumns[joinKeyIdx]->KeyOrder] = row[std::distance(ReadColumns.begin(), it)]; } auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 5c01a04a4ce6..51937df2ee43 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -9,6 +9,7 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; +using namespace fmt::literals; void PrepareTables(TSession session) { UNIT_ASSERT(session.ExecuteSchemeQuery(R"( @@ -536,24 +537,21 @@ Y_UNIT_TEST_TWIN(LeftJoinSkipNullFilter, StreamLookup) { ])", 4, StreamLookup); } -void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) { - using namespace fmt::literals; - +void CreateSimpleTableWithKeyType(TSession session, const TString& tableName, const TString& columnType) { const TString query = fmt::format(R"( - CREATE TABLE `/Root/Table{columnType}` ( + CREATE TABLE `/Root/{tableName}` ( Key {columnType}, Value String, PRIMARY KEY (Key) ) )", + "tableName"_a = tableName, "columnType"_a = columnType ); UNIT_ASSERT(session.ExecuteSchemeQuery(query).GetValueSync().IsSuccess()); } TString GetQuery(const TString& joinType, const TString& leftTable, const TString& rightTable) { - using namespace fmt::literals; - TString selectColumns; TString sortColumns; if (joinType == "RIGHT SEMI") { @@ -572,8 +570,8 @@ TString GetQuery(const TString& joinType, const TString& leftTable, const TStrin return fmt::format(R"( SELECT {selectColumns} - FROM `/Root/Table{leftTable}` AS l - {joinType} JOIN `/Root/Table{rightTable}` AS r + FROM `/Root/{leftTable}` AS l + {joinType} JOIN `/Root/{rightTable}` AS r ON l.Key = r.Key ORDER BY {sortColumns} )", "selectColumns"_a = selectColumns, @@ -652,7 +650,7 @@ Y_UNIT_TEST(CheckAllKeyTypesCast) { }; for (const auto& columnType : columnTypes) { - CreateSimpleTableWithKeyType(session, columnType); + CreateSimpleTableWithKeyType(session, columnType, columnType); } for (const auto& leftColumnType : columnTypes) { @@ -672,7 +670,7 @@ Y_UNIT_TEST(CheckAllKeyTypesCast) { } } -void TestKeyCast(TSession session, const TString& joinType, const TString& leftTable, const TString& rightTable, +void TestKeyCast(const TKikimrSettings& settings, TSession session, const TString& joinType, const TString& leftTable, const TString& rightTable, TString answer, size_t rightTableReads) { TExecDataQuerySettings execSettings; execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); @@ -680,145 +678,282 @@ void TestKeyCast(TSession session, const TString& joinType, const TString& leftT const TString query = GetQuery(joinType, leftTable, rightTable); auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); - TKikimrSettings settings; - ui32 index = (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup() ? 1 : 2); + ui32 index = settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin() ? 0 + : (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup() ? 1 : 2); CompareYson(answer, FormatResultSetYson(result.GetResultSet(0))); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), rightTableReads); + for (const auto& tableStats : stats.query_phases(index).table_access()) { + if (tableStats.name() == rightTable) { + UNIT_ASSERT_VALUES_EQUAL(tableStats.reads().rows(), rightTableReads); + } + } } -Y_UNIT_TEST(CheckCastInt32ToInt16) { - TKikimrSettings settings; +Y_UNIT_TEST_QUAD(CheckCastInt32ToInt16, StreamLookupJoin, NotNull) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSimpleTableWithKeyType(session, "Int32"); - CreateSimpleTableWithKeyType(session, "Int16"); + const TString leftKeyColumnType = "Int32"; + const TString rightKeyColumnType = "Int16"; + const TString rightTableName = rightKeyColumnType + (NotNull ? "NotNull" : ""); + const TString rightType = rightKeyColumnType + (NotNull ? " NOT NULL" : ""); + + CreateSimpleTableWithKeyType(session, leftKeyColumnType, leftKeyColumnType); + CreateSimpleTableWithKeyType(session, rightTableName, rightType); + + TString query = fmt::format( + R"( + REPLACE INTO `/Root/{leftTable}` (Key, Value) VALUES + (1, "Value11"), + (-32769, "Value12"); + REPLACE INTO `/Root/{rightTable}` (Key, Value) VALUES + (1, "Value21"), + (32767, "Value22"); + )", + "leftTable"_a = leftKeyColumnType, + "rightTable"_a = rightTableName + ); - TString query = R"( - REPLACE INTO `/Root/TableInt32` (Key, Value) VALUES - (1, "Value11"), - (-32769, "Value12"); - REPLACE INTO `/Root/TableInt16` (Key, Value) VALUES - (1, "Value21"), - (32767, "Value22"); - )"; UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); - TString answer = R"([ + const TString answer = R"([ [[-32769];["Value12"];#;#]; [[1];["Value11"];[1];["Value21"]] ])"; - TestKeyCast(session, "LEFT", "Int32", "Int16", answer, 2); + + TestKeyCast(settings, session, "LEFT", leftKeyColumnType, rightTableName, answer, StreamLookupJoin ? 1 : 2); } -Y_UNIT_TEST(CheckCastUint32ToUint16) { - TKikimrSettings settings; +Y_UNIT_TEST_QUAD(CheckCastUint32ToUint16, StreamLookupJoin, NotNull) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSimpleTableWithKeyType(session, "Uint32"); - CreateSimpleTableWithKeyType(session, "Uint16"); + const TString leftKeyColumnType = "Uint32"; + const TString rightKeyColumnType = "Uint16"; + const TString rightTableName = rightKeyColumnType + (NotNull ? "NotNull" : ""); + const TString rightType = rightKeyColumnType + (NotNull ? " NOT NULL" : ""); + + CreateSimpleTableWithKeyType(session, leftKeyColumnType, leftKeyColumnType); + CreateSimpleTableWithKeyType(session, rightTableName, rightType); + + TString query = fmt::format( + R"( + REPLACE INTO `/Root/{leftTable}` (Key, Value) VALUES + (1, "Value11"), + (4294967295, "Value12"); + REPLACE INTO `/Root/{rightTable}` (Key, Value) VALUES + (1, "Value21"), + (65535, "Value22"); + )", + "leftTable"_a = leftKeyColumnType, + "rightTable"_a = rightTableName + ); - TString query = R"( - REPLACE INTO `/Root/TableUint32` (Key, Value) VALUES - (1, "Value11"), - (4294967295, "Value12"); - REPLACE INTO `/Root/TableUint16` (Key, Value) VALUES - (1, "Value21"), - (65535, "Value22"); - )"; UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); - TString answer = R"([ + const TString answer = R"([ [[1u];["Value11"];[1u];["Value21"]]; [[4294967295u];["Value12"];#;#] ])"; - TestKeyCast(session, "LEFT", "Uint32", "Uint16", answer, 2); + + TestKeyCast(settings, session, "LEFT", leftKeyColumnType, rightTableName, answer, StreamLookupJoin ? 1 : 2); } -Y_UNIT_TEST(CheckCastUint64ToInt64) { - TKikimrSettings settings; +Y_UNIT_TEST_QUAD(CheckCastUint64ToInt64, StreamLookupJoin, NotNull) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSimpleTableWithKeyType(session, "Uint64"); - CreateSimpleTableWithKeyType(session, "Int64"); - - TString query = R"( - REPLACE INTO `/Root/TableUint64` (Key, Value) VALUES - (18446744073709551615, "Value11"), - (1, "Value12"), - (32768, "Value13"); - REPLACE INTO `/Root/TableInt64` (Key, Value) VALUES - (1, "Value21"), - (-1, "Value22"); - )"; + const TString leftKeyColumnType = "Uint64"; + const TString rightKeyColumnType = "Int64"; + const TString rightTableName = rightKeyColumnType + (NotNull ? "NotNull" : ""); + const TString rightType = rightKeyColumnType + (NotNull ? " NOT NULL" : ""); + + CreateSimpleTableWithKeyType(session, leftKeyColumnType, leftKeyColumnType); + CreateSimpleTableWithKeyType(session, rightTableName, rightType); + + TString query = fmt::format( + R"( + REPLACE INTO `/Root/{leftTable}` (Key, Value) VALUES + (18446744073709551615, "Value11"), + (1, "Value12"), + (32768, "Value13"); + REPLACE INTO `/Root/{rightTable}` (Key, Value) VALUES + (1, "Value21"), + (-1, "Value22"); + )", + "leftTable"_a = leftKeyColumnType, + "rightTable"_a = rightTableName + ); + UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); - TString answer = R"([ + const TString answer = R"([ [[1u];["Value12"];[1];["Value21"]]; [[32768u];["Value13"];#;#]; [[18446744073709551615u];["Value11"];#;#] ])"; - TestKeyCast(session, "LEFT", "Uint64", "Int64", answer, 2); + + TestKeyCast(settings, session, "LEFT", leftKeyColumnType, rightTableName, answer, StreamLookupJoin ? 1 : 2); } -Y_UNIT_TEST(CheckCastInt64ToUint64) { - TKikimrSettings settings; +Y_UNIT_TEST_QUAD(CheckCastInt64ToUint64, StreamLookupJoin, NotNull) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSimpleTableWithKeyType(session, "Int64"); - CreateSimpleTableWithKeyType(session, "Uint64"); + const TString leftKeyColumnType = "Int64"; + const TString rightKeyColumnType = "Uint64"; + const TString rightTableName = rightKeyColumnType + (NotNull ? "NotNull" : ""); + const TString rightType = rightKeyColumnType + (NotNull ? " NOT NULL" : ""); + + CreateSimpleTableWithKeyType(session, leftKeyColumnType, leftKeyColumnType); + CreateSimpleTableWithKeyType(session, rightTableName, rightType); + + TString query = fmt::format( + R"( + REPLACE INTO `/Root/{leftTable}` (Key, Value) VALUES + (1, "Value11"), + (-1, "Value12"); + REPLACE INTO `/Root/{rightTable}` (Key, Value) VALUES + (18446744073709551615, "Value21"), + (1, "Value22"); + )", + "leftTable"_a = leftKeyColumnType, + "rightTable"_a = rightTableName + ); - TString query = R"( - REPLACE INTO `/Root/TableInt64` (Key, Value) VALUES - (1, "Value11"), - (-1, "Value12"); - REPLACE INTO `/Root/TableUint64` (Key, Value) VALUES - (18446744073709551615, "Value21"), - (1, "Value22"); - )"; UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); - TString answer = R"([ + const TString answer = R"([ [[-1];["Value12"];#;#]; [[1];["Value11"];[1u];["Value22"]] ])"; - TestKeyCast(session, "LEFT", "Int64", "Uint64", answer, 2); + + TestKeyCast(settings, session, "LEFT", leftKeyColumnType, rightTableName, answer, StreamLookupJoin ? 1 : 2); } -Y_UNIT_TEST(CheckCastUtf8ToString) { - TKikimrSettings settings; +Y_UNIT_TEST_QUAD(CheckCastUtf8ToString, StreamLookupJoin, NotNull) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSimpleTableWithKeyType(session, "Utf8"); - CreateSimpleTableWithKeyType(session, "String"); + const TString leftKeyColumnType = "Utf8"; + const TString rightKeyColumnType = "String"; + const TString rightTableName = rightKeyColumnType + (NotNull ? "NotNull" : ""); + const TString rightType = rightKeyColumnType + (NotNull ? " NOT NULL" : ""); + + CreateSimpleTableWithKeyType(session, leftKeyColumnType, leftKeyColumnType); + CreateSimpleTableWithKeyType(session, rightTableName, rightType); + + TString query = fmt::format( + R"( + REPLACE INTO `/Root/{leftTable}` (Key, Value) VALUES + (Utf8("six"), "Value11"), + (Utf8("seven"), "Value12"); + REPLACE INTO `/Root/{rightTable}` (Key, Value) VALUES + ("six", "Value21"), + ("eight", "Value22"); + )", + "leftTable"_a = leftKeyColumnType, + "rightTable"_a = rightTableName + ); - TString query = R"( - REPLACE INTO `/Root/TableUtf8` (Key, Value) VALUES - (Utf8("six"), "Value11"), - (Utf8("seven"), "Value12"); - REPLACE INTO `/Root/TableString` (Key, Value) VALUES - ("six", "Value21"), - ("eight", "Value22"); - )"; UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); - TString answer = R"([ + const TString answer = R"([ [["seven"];["Value12"];#;#]; [["six"];["Value11"];["six"];["Value21"]] ])"; - TestKeyCast(session, "LEFT", "Utf8", "String", answer, 1); + + TestKeyCast(settings, session, "LEFT", leftKeyColumnType, rightTableName, answer, 1); +} + +Y_UNIT_TEST_TWIN(JoinByComplexKeyWithNullComponents, StreamLookupJoin) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin); + auto settings = TKikimrSettings().SetAppConfig(appConfig); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { // create tables + const TString query = R"( + CREATE TABLE `/Root/Left` ( + Key1 Int64, + Key2 String, + Value String, + PRIMARY KEY (Key1, Key2) + ); + + CREATE TABLE `/Root/Right` ( + Key1 Int64, + Key2 String, + Value String, + PRIMARY KEY (Key1, Key2) + ); + )"; + UNIT_ASSERT(session.ExecuteSchemeQuery(query).GetValueSync().IsSuccess()); + } + + { // fill tables + const TString query = R"( + REPLACE INTO `/Root/Left` (Key1, Key2, Value) VALUES + (1, "one", "value1"), + (2, NULL, "value2"), + (NULL, "three", "value3"); + + REPLACE INTO `/Root/Right` (Key1, Key2, Value) VALUES + (1, "one", "value1"), + (2, NULL, "value2"), + (NULL, "three", "value3"); + )"; + UNIT_ASSERT(session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); + } + + { // execute join + TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + const TString query = R"( + SELECT l.Key1, l.Key2, l.Value, r.Key1, r.Key2, r.Value + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Key1 = r.Key1 AND l.Key2 = r.Key2 ORDER BY l.Key1, l.Key2, l.Value + )"; + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + CompareYson(R"([ + [[1];["one"];["value1"];[1];["one"];["value1"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + const ui32 index = (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin() ? 0 : 1); + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + for (const auto& tableStats : stats.query_phases(index).table_access()) { + if (tableStats.name() == "/Root/Right") { + UNIT_ASSERT_VALUES_EQUAL(tableStats.reads().rows(), 1); + } + } + } } Y_UNIT_TEST_TWIN(JoinWithComplexCondition, StreamLookupJoin) {