From 35d12faa8442e72f50d87f8d3db370cd8d1ad657 Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Mon, 15 Apr 2024 07:46:25 +0000 Subject: [PATCH 1/2] implicit transaction in per statement mode implicit transaction in per statement mode --- ydb/core/kqp/common/compilation/events.h | 5 +- .../compile_service/kqp_compile_service.cpp | 6 +- .../kqp/session_actor/kqp_query_state.cpp | 12 +- ydb/core/kqp/session_actor/kqp_query_state.h | 7 +- .../kqp/session_actor/kqp_session_actor.cpp | 24 +++- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 105 +++++++++++++++++- 6 files changed, 137 insertions(+), 22 deletions(-) diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index f5e4848d0228..062148adf752 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal& query, bool isQueryActionPrepare, TInstant deadline, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe& applicationName, std::shared_ptr> intrestedInResult, const TIntrusivePtr& userRequestContext, - NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) + NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe queryAst = Nothing()) : UserToken(userToken) , Uid(uid) , Query(query) @@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal> IntrestedInResult; + + TMaybe QueryAst; }; struct TEvCompileResponse: public TEventLocal { diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index a892b5724d10..297dd69c7156 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -731,13 +731,17 @@ class TKqpCompileService : public TActorBootstrapped { NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); - TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, + TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query, compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName, ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState)); + if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) { + return CompileByAst(*request.QueryAst, compileRequest, ctx); + } + if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 80d1cc9ee093..c51f3106ae9b 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -244,8 +244,15 @@ std::unique_ptr TKqpQueryState::BuildReCompileReque compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt); } - return std::make_unique(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare, - compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState); + TMaybe statementAst; + if (!Statements.empty()) { + YQL_ENSURE(CurrentStatementId < Statements.size()); + statementAst = Statements[CurrentStatementId]; + } + + return std::make_unique(UserToken, CompileResult->Uid, query, isQueryActionPrepare, + compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState, + statementAst); } std::unique_ptr TKqpQueryState::BuildSplitRequest(std::shared_ptr> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) { @@ -295,7 +302,6 @@ bool TKqpQueryState::PrepareNextStatementPart() { QueryData = {}; PreparedQuery = {}; CompileResult = {}; - TxCtx = {}; CurrentTx = 0; TableVersions = {}; MaxReadType = ETableReadType::Other; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 1acd5ab35007..06a1f7efe0b4 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -113,6 +113,7 @@ class TKqpQueryState : public TNonCopyable { TTxId TxId; // User tx bool Commit = false; bool Commited = false; + bool Begin = false; NTopic::TTopicOperations TopicOperations; TDuration CpuTime; @@ -408,15 +409,12 @@ class TKqpQueryState : public TNonCopyable { } void PrepareCurrentStatement() { - QueryData = {}; + QueryData = std::make_shared(TxCtx->TxAlloc); PreparedQuery = {}; CompileResult = {}; - TxCtx = {}; CurrentTx = 0; TableVersions = {}; MaxReadType = ETableReadType::Other; - Commit = false; - Commited = false; TopicOperations = {}; ReplayMessage = {}; } @@ -435,7 +433,6 @@ class TKqpQueryState : public TNonCopyable { TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; break; default: - Commit = true; TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 01a5d3c1d136..54f94edc8b87 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -575,6 +575,13 @@ class TKqpSessionActor : public TActorBootstrapped { } void OnSuccessCompileRequest() { + if (QueryState->HasTxControl()) { + const auto& txControl = QueryState->GetTxControl(); + if (txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId) { + LOG_E("TTTTTXXXXXX IIIIIDDDDD " << txControl.tx_id()); + } + } + if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE || QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXPLAIN) { @@ -678,6 +685,7 @@ class TKqpSessionActor : public TActorBootstrapped { } void BeginTx(const Ydb::Table::TransactionSettings& settings) { + QueryState->Begin = true; QueryState->TxId = UlidGen.Next(); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); @@ -731,7 +739,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (hasTxControl || QueryState->HasImpliedTx()) { const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl(); - QueryState->Commit = txControl.commit_tx(); + QueryState->Commit = txControl.commit_tx() && QueryState->ProcessingLastStatement(); switch (txControl.tx_selector_case()) { case Ydb::Table::TransactionControl::kTxId: { auto txId = TTxId::FromString(txControl.tx_id()); @@ -746,15 +754,17 @@ class TKqpSessionActor : public TActorBootstrapped { break; } case Ydb::Table::TransactionControl::kBeginTx: { - BeginTx(txControl.begin_tx()); + if (!QueryState->Begin) { + BeginTx(txControl.begin_tx()); + } break; - } - case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: + } + case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "wrong TxControl: tx_selector must be set"; break; } - } else { + } else if (QueryState->CurrentStatementId == 0) { QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2057,7 +2067,9 @@ class TKqpSessionActor : public TActorBootstrapped { } else { CleanupCtx.reset(); bool doNotKeepSession = QueryState && !QueryState->KeepSession; - QueryState.reset(); + //if (!QueryState || QueryState->ProcessingLastStatement()) { + QueryState.reset(); + //} if (doNotKeepSession) { // TEvCloseSessionRequest was received in final=false CleanupState, so actor should rerun Cleanup with final=true CleanupAndPassAway(); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 470c70bc1146..54000552eb02 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2159,6 +2159,77 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetQueryClient(); + { + // DDl + DML with explicit transaction + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml1 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml1 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml1; + ALTER TABLE TestDdlDml1 DROP COLUMN Value2; + UPSERT INTO TestDdlDml1 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml1; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Queries with mixed data and scheme operations are not supported.")); + } + + { + // DDl + DML with implicit transaction + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml2 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml2; + ALTER TABLE TestDdlDml2 DROP COLUMN Value2; + UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2); + CompareYson(R"([[[1u];["1"];["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdlDml2; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + + result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml4 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml4 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml4; + ALTER TABLE TestDdlDml4 DROP COLUMN Value2; + UPSERT INTO TestDdlDml4 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml5; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdlDml4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } + { // Base test with ddl and dml statements auto result = db.ExecuteQuery(R"( @@ -2228,13 +2299,21 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { // Test with query with error auto result = db.ExecuteQuery(R"( - UPSERT INTO TestDdl2 (Key, Value) VALUES (1, "One"); + SELECT * FROM TestDdl2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); CREATE TABLE TestDdl3 ( Key Uint64, Value String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); + UPSERT INTO TestDdl2 (Key, Value) VALUES (5, "Five"); CREATE TABLE TestDdl2 ( Key Uint64, Value String, @@ -2245,18 +2324,32 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { Value String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdl1 (Key, Value) VALUES (3, "Three"); )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Check failed: path: '/Root/TestDdl2', error: path exist")); result = db.ExecuteQuery(R"( SELECT * FROM TestDdl2; )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]];[[4u];["Four"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl3; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Cannot find table 'db.[/Root/TestDdl4]'")); } { @@ -2329,7 +2422,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];[1u]];[[2u];[2u]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); result = db.ExecuteQuery(R"( CREATE TABLE TestDdl5 ( @@ -2348,7 +2441,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); } } From 1e0bec32d26ec2092b1feff7ef3d95d2f534b4a6 Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Wed, 24 Apr 2024 11:13:29 +0000 Subject: [PATCH 2/2] delete extra --- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 54f94edc8b87..e239438f894d 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -575,13 +575,6 @@ class TKqpSessionActor : public TActorBootstrapped { } void OnSuccessCompileRequest() { - if (QueryState->HasTxControl()) { - const auto& txControl = QueryState->GetTxControl(); - if (txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId) { - LOG_E("TTTTTXXXXXX IIIIIDDDDD " << txControl.tx_id()); - } - } - if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE || QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXPLAIN) { @@ -2067,9 +2060,7 @@ class TKqpSessionActor : public TActorBootstrapped { } else { CleanupCtx.reset(); bool doNotKeepSession = QueryState && !QueryState->KeepSession; - //if (!QueryState || QueryState->ProcessingLastStatement()) { - QueryState.reset(); - //} + QueryState.reset(); if (doNotKeepSession) { // TEvCloseSessionRequest was received in final=false CleanupState, so actor should rerun Cleanup with final=true CleanupAndPassAway();