From ba321da285a0a123d0822590d20d2b943aeb8b22 Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Mon, 15 Apr 2024 07:46:25 +0000 Subject: [PATCH 1/4] implicit transaction in per statement mode implicit transaction in per statement mode --- ydb/core/kqp/common/compilation/events.h | 5 +- .../compile_service/kqp_compile_service.cpp | 6 +- .../kqp/session_actor/kqp_query_state.cpp | 12 +- ydb/core/kqp/session_actor/kqp_query_state.h | 7 +- .../kqp/session_actor/kqp_session_actor.cpp | 24 +++- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 105 +++++++++++++++++- 6 files changed, 137 insertions(+), 22 deletions(-) diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index f5e4848d0228..062148adf752 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal& query, bool isQueryActionPrepare, TInstant deadline, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe& applicationName, std::shared_ptr> intrestedInResult, const TIntrusivePtr& userRequestContext, - NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) + NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe queryAst = Nothing()) : UserToken(userToken) , Uid(uid) , Query(query) @@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal> IntrestedInResult; + + TMaybe QueryAst; }; struct TEvCompileResponse: public TEventLocal { diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index df83f7d7a2fe..0538176fd25f 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -733,13 +733,17 @@ class TKqpCompileService : public TActorBootstrapped { NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); - TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, + TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query, compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName, ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState)); + if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) { + return CompileByAst(*request.QueryAst, compileRequest, ctx); + } + if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 80d1cc9ee093..c51f3106ae9b 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -244,8 +244,15 @@ std::unique_ptr TKqpQueryState::BuildReCompileReque compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt); } - return std::make_unique(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare, - compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState); + TMaybe statementAst; + if (!Statements.empty()) { + YQL_ENSURE(CurrentStatementId < Statements.size()); + statementAst = Statements[CurrentStatementId]; + } + + return std::make_unique(UserToken, CompileResult->Uid, query, isQueryActionPrepare, + compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState, + statementAst); } std::unique_ptr TKqpQueryState::BuildSplitRequest(std::shared_ptr> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) { @@ -295,7 +302,6 @@ bool TKqpQueryState::PrepareNextStatementPart() { QueryData = {}; PreparedQuery = {}; CompileResult = {}; - TxCtx = {}; CurrentTx = 0; TableVersions = {}; MaxReadType = ETableReadType::Other; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 05b89f21185d..3b8315bde5df 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -113,6 +113,7 @@ class TKqpQueryState : public TNonCopyable { TTxId TxId; // User tx bool Commit = false; bool Commited = false; + bool Begin = false; NTopic::TTopicOperations TopicOperations; TDuration CpuTime; @@ -417,15 +418,12 @@ class TKqpQueryState : public TNonCopyable { } void PrepareCurrentStatement() { - QueryData = {}; + QueryData = std::make_shared(TxCtx->TxAlloc); PreparedQuery = {}; CompileResult = {}; - TxCtx = {}; CurrentTx = 0; TableVersions = {}; MaxReadType = ETableReadType::Other; - Commit = false; - Commited = false; TopicOperations = {}; ReplayMessage = {}; } @@ -444,7 +442,6 @@ class TKqpQueryState : public TNonCopyable { TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; break; default: - Commit = true; TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index fb8ceebb1ca1..1e428ad8a291 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -577,6 +577,13 @@ class TKqpSessionActor : public TActorBootstrapped { } void OnSuccessCompileRequest() { + if (QueryState->HasTxControl()) { + const auto& txControl = QueryState->GetTxControl(); + if (txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId) { + LOG_E("TTTTTXXXXXX IIIIIDDDDD " << txControl.tx_id()); + } + } + if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE || QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXPLAIN) { @@ -680,6 +687,7 @@ class TKqpSessionActor : public TActorBootstrapped { } void BeginTx(const Ydb::Table::TransactionSettings& settings) { + QueryState->Begin = true; QueryState->TxId = UlidGen.Next(); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); @@ -733,7 +741,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (hasTxControl || QueryState->HasImpliedTx()) { const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl(); - QueryState->Commit = txControl.commit_tx(); + QueryState->Commit = txControl.commit_tx() && QueryState->ProcessingLastStatement(); switch (txControl.tx_selector_case()) { case Ydb::Table::TransactionControl::kTxId: { auto txId = TTxId::FromString(txControl.tx_id()); @@ -748,15 +756,17 @@ class TKqpSessionActor : public TActorBootstrapped { break; } case Ydb::Table::TransactionControl::kBeginTx: { - BeginTx(txControl.begin_tx()); + if (!QueryState->Begin) { + BeginTx(txControl.begin_tx()); + } break; - } - case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: + } + case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "wrong TxControl: tx_selector must be set"; break; } - } else { + } else if (QueryState->CurrentStatementId == 0) { QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -2068,7 +2078,9 @@ class TKqpSessionActor : public TActorBootstrapped { } else { CleanupCtx.reset(); bool doNotKeepSession = QueryState && !QueryState->KeepSession; - QueryState.reset(); + //if (!QueryState || QueryState->ProcessingLastStatement()) { + QueryState.reset(); + //} if (doNotKeepSession) { // TEvCloseSessionRequest was received in final=false CleanupState, so actor should rerun Cleanup with final=true CleanupAndPassAway(); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index abb948244198..5fce37c83d83 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2159,6 +2159,77 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetQueryClient(); + { + // DDl + DML with explicit transaction + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml1 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml1 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml1; + ALTER TABLE TestDdlDml1 DROP COLUMN Value2; + UPSERT INTO TestDdlDml1 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml1; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Queries with mixed data and scheme operations are not supported.")); + } + + { + // DDl + DML with implicit transaction + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml2 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml2; + ALTER TABLE TestDdlDml2 DROP COLUMN Value2; + UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2); + CompareYson(R"([[[1u];["1"];["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdlDml2; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + + result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml4 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml4 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml4; + ALTER TABLE TestDdlDml4 DROP COLUMN Value2; + UPSERT INTO TestDdlDml4 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml5; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdlDml4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } + { // Base test with ddl and dml statements auto result = db.ExecuteQuery(R"( @@ -2228,13 +2299,21 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { // Test with query with error auto result = db.ExecuteQuery(R"( - UPSERT INTO TestDdl2 (Key, Value) VALUES (1, "One"); + SELECT * FROM TestDdl2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); CREATE TABLE TestDdl3 ( Key Uint64, Value String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); + UPSERT INTO TestDdl2 (Key, Value) VALUES (5, "Five"); CREATE TABLE TestDdl2 ( Key Uint64, Value String, @@ -2245,18 +2324,32 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { Value String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdl1 (Key, Value) VALUES (3, "Three"); )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Check failed: path: '/Root/TestDdl2', error: path exist")); result = db.ExecuteQuery(R"( SELECT * FROM TestDdl2; )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]];[[4u];["Four"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl3; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Cannot find table 'db.[/Root/TestDdl4]'")); } { @@ -2329,7 +2422,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];[1u]];[[2u];[2u]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); result = db.ExecuteQuery(R"( CREATE TABLE TestDdl5 ( @@ -2348,7 +2441,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); } } From 3d595868458b46328a93a3578e20cb25f58f6dd4 Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Wed, 24 Apr 2024 11:13:29 +0000 Subject: [PATCH 2/4] delete extra --- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1e428ad8a291..c9c1b6562e9f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -577,13 +577,6 @@ class TKqpSessionActor : public TActorBootstrapped { } void OnSuccessCompileRequest() { - if (QueryState->HasTxControl()) { - const auto& txControl = QueryState->GetTxControl(); - if (txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId) { - LOG_E("TTTTTXXXXXX IIIIIDDDDD " << txControl.tx_id()); - } - } - if (QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE || QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXPLAIN) { @@ -2078,9 +2071,7 @@ class TKqpSessionActor : public TActorBootstrapped { } else { CleanupCtx.reset(); bool doNotKeepSession = QueryState && !QueryState->KeepSession; - //if (!QueryState || QueryState->ProcessingLastStatement()) { - QueryState.reset(); - //} + QueryState.reset(); if (doNotKeepSession) { // TEvCloseSessionRequest was received in final=false CleanupState, so actor should rerun Cleanup with final=true CleanupAndPassAway(); From 6a93b06f3fffe25e9b9d41fc78dfaf9839c6bcbc Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Tue, 21 May 2024 11:04:03 +0000 Subject: [PATCH 3/4] fix --- .../compile_service/kqp_compile_service.cpp | 4 +- .../kqp/session_actor/kqp_query_state.cpp | 28 ++++++++++++ ydb/core/kqp/session_actor/kqp_query_state.h | 20 ++++++++- .../kqp/session_actor/kqp_session_actor.cpp | 45 ++++++++++--------- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 4 +- 5 files changed, 74 insertions(+), 27 deletions(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 0538176fd25f..c66822444e9d 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -1111,7 +1111,7 @@ class TKqpCompileService : public TActorBootstrapped { << ", message: " << e.what()); } - void Reply(const TActorId& sender, const TVector astStatements, const TKqpQueryId query, + void Reply(const TActorId& sender, const TVector& astStatements, const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReply, @@ -1130,7 +1130,7 @@ class TKqpCompileService : public TActorBootstrapped { ctx.Send(sender, responseEv.Release(), 0, cookie); } - void ReplyQueryStatements(const TActorId& sender, const TVector astStatements, + void ReplyQueryStatements(const TActorId& sender, const TVector& astStatements, const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyStatements, orbit); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index c51f3106ae9b..b15ad8449b08 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -14,6 +14,34 @@ using namespace NSchemeCache; #define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) #define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) + +TKqpQueryState::TQueryTxId::TQueryTxId(const TQueryTxId& other) { + YQL_ENSURE(!IsValueSet); + Id = other.Id; + IsValueSet = true; +} + +TKqpQueryState::TQueryTxId& TKqpQueryState::TQueryTxId::operator=(const TQueryTxId& id) { + YQL_ENSURE(!IsValueSet); + Id = id.Id; + IsValueSet = true; + return *this; +} + +void TKqpQueryState::TQueryTxId::SetValue(const TTxId& id) { + YQL_ENSURE(!IsValueSet); + Id = id.Id; + IsValueSet = true; +} + +TTxId TKqpQueryState::TQueryTxId::GetValue() { + return Id; +} + +void TKqpQueryState::TQueryTxId::Reset() { + Id = TTxId(); +} + bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) { Y_ENSURE(response.Request); const auto& navigate = *response.Request; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 3b8315bde5df..ec414385cb24 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -33,6 +33,22 @@ namespace NKikimr::NKqp { // common case). class TKqpQueryState : public TNonCopyable { public: + class TQueryTxId { + public: + TQueryTxId() = default; + TQueryTxId(const TQueryTxId& other); + TQueryTxId& operator=(const TQueryTxId& id); + + void SetValue(const TTxId& id); + TTxId GetValue(); + + void Reset(); + + private: + TTxId Id; + bool IsValueSet = false; + }; + TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TMaybe& applicationName, const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TString& sessionId, TMonotonic startedAt) @@ -110,10 +126,9 @@ class TKqpQueryState : public TNonCopyable { NWilson::TSpan KqpSessionSpan; ETableReadType MaxReadType = ETableReadType::Other; - TTxId TxId; // User tx + TQueryTxId TxId; // User tx bool Commit = false; bool Commited = false; - bool Begin = false; NTopic::TTopicOperations TopicOperations; TDuration CpuTime; @@ -133,6 +148,7 @@ class TKqpQueryState : public TNonCopyable { NYql::TIssues Issues; TVector Statements; + TMaybe ImpliedTxId = {}; // Implied tx ui32 CurrentStatementId = 0; ui32 StatementResultIndex = 0; ui32 StatementResultSize = 0; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c9c1b6562e9f..b30957f22038 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -284,7 +284,7 @@ class TKqpSessionActor : public TActorBootstrapped { } QueryState->TxCtx = std::move(txCtx); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); - QueryState->TxId = txId; + QueryState->TxId.SetValue(txId); if (!CheckTransactionLocks(/*tx*/ nullptr)) { return; } @@ -540,6 +540,10 @@ class TKqpSessionActor : public TActorBootstrapped { void Handle(TEvKqp::TEvParseResponse::TPtr& ev) { QueryState->SaveAndCheckParseResult(std::move(*ev->Get())); + Ydb::Table::TransactionSettings settings; + settings.mutable_serializable_read_write(); + BeginTx(settings); + QueryState->ImpliedTxId = QueryState->TxId; CompileStatement(); } @@ -680,8 +684,7 @@ class TKqpSessionActor : public TActorBootstrapped { } void BeginTx(const Ydb::Table::TransactionSettings& settings) { - QueryState->Begin = true; - QueryState->TxId = UlidGen.Next(); + QueryState->TxId.SetValue(UlidGen.Next()); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); @@ -705,7 +708,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->SetIsolationLevel(settings); QueryState->TxCtx->OnBeginQuery(); - if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) { + if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) { std::vector issues{ YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)}; ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, @@ -718,14 +721,14 @@ class TKqpSessionActor : public TActorBootstrapped { Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); } - static const Ydb::Table::TransactionControl& GetImpliedTxControl() { - auto create = []() -> Ydb::Table::TransactionControl { - Ydb::Table::TransactionControl control; + Ydb::Table::TransactionControl GetImpliedTxControl() { + Ydb::Table::TransactionControl control; + control.set_commit_tx(QueryState->ProcessingLastStatement()); + if (QueryState->ImpliedTxId) { + control.set_tx_id(QueryState->ImpliedTxId->GetValue().GetHumanStr()); + } else { control.mutable_begin_tx()->mutable_serializable_read_write(); - control.set_commit_tx(true); - return control; - }; - static const Ydb::Table::TransactionControl control = create(); + } return control; } @@ -734,7 +737,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (hasTxControl || QueryState->HasImpliedTx()) { const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl(); - QueryState->Commit = txControl.commit_tx() && QueryState->ProcessingLastStatement(); + QueryState->Commit = txControl.commit_tx(); switch (txControl.tx_selector_case()) { case Ydb::Table::TransactionControl::kTxId: { auto txId = TTxId::FromString(txControl.tx_id()); @@ -745,13 +748,13 @@ class TKqpSessionActor : public TActorBootstrapped { } QueryState->TxCtx = txCtx; QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); - QueryState->TxId = txId; + if (QueryState->TxId.GetValue() != txId) { + QueryState->TxId.SetValue(txId); + } break; } case Ydb::Table::TransactionControl::kBeginTx: { - if (!QueryState->Begin) { - BeginTx(txControl.begin_tx()); - } + BeginTx(txControl.begin_tx()); break; } case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: @@ -759,7 +762,7 @@ class TKqpSessionActor : public TActorBootstrapped { << "wrong TxControl: tx_selector must be set"; break; } - } else if (QueryState->CurrentStatementId == 0) { + } else { QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); @@ -1542,7 +1545,7 @@ class TKqpSessionActor : public TActorBootstrapped { void FillTxInfo(NKikimrKqp::TQueryResponse* response) { YQL_ENSURE(QueryState); - response->MutableTxMeta()->set_id(QueryState->TxId.GetHumanStr()); + response->MutableTxMeta()->set_id(QueryState->TxId.GetValue().GetHumanStr()); if (QueryState->TxCtx) { auto txInfo = QueryState->TxCtx->GetInfo(); @@ -1615,8 +1618,8 @@ class TKqpSessionActor : public TActorBootstrapped { if (QueryState->Commit) { ResetTxState(); - Transactions.ReleaseTransaction(QueryState->TxId); - QueryState->TxId = TTxId(); + Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); + QueryState->TxId.Reset(); } FillTxInfo(response); @@ -1961,7 +1964,7 @@ class TKqpSessionActor : public TActorBootstrapped { auto& txCtx = QueryState->TxCtx; if (txCtx->IsInvalidated()) { Transactions.AddToBeAborted(txCtx); - Transactions.ReleaseTransaction(QueryState->TxId); + Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); } DiscardPersistentSnapshot(txCtx->SnapshotHandle); } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 5fce37c83d83..375e246ec075 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2187,7 +2187,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { Value2 String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "2"); + UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1"); SELECT * FROM TestDdlDml2; ALTER TABLE TestDdlDml2 DROP COLUMN Value2; UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2"); @@ -2195,7 +2195,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2); - CompareYson(R"([[[1u];["1"];["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0))); CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1))); UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); From c34380a55166f34e14017baec3fafb8cd85748ce Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Mon, 27 May 2024 12:33:23 +0000 Subject: [PATCH 4/4] fix name --- ydb/core/kqp/session_actor/kqp_query_state.cpp | 11 ++++------- ydb/core/kqp/session_actor/kqp_query_state.h | 5 ++--- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 9 +++++---- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index b15ad8449b08..008d036808c3 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -16,26 +16,23 @@ using namespace NSchemeCache; TKqpQueryState::TQueryTxId::TQueryTxId(const TQueryTxId& other) { - YQL_ENSURE(!IsValueSet); + YQL_ENSURE(!Id); Id = other.Id; - IsValueSet = true; } TKqpQueryState::TQueryTxId& TKqpQueryState::TQueryTxId::operator=(const TQueryTxId& id) { - YQL_ENSURE(!IsValueSet); + YQL_ENSURE(!Id); Id = id.Id; - IsValueSet = true; return *this; } void TKqpQueryState::TQueryTxId::SetValue(const TTxId& id) { - YQL_ENSURE(!IsValueSet); + YQL_ENSURE(!Id); Id = id.Id; - IsValueSet = true; } TTxId TKqpQueryState::TQueryTxId::GetValue() { - return Id; + return Id ? *Id : TTxId(); } void TKqpQueryState::TQueryTxId::Reset() { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index ec414385cb24..23ec65e7d2ef 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -45,8 +45,7 @@ class TKqpQueryState : public TNonCopyable { void Reset(); private: - TTxId Id; - bool IsValueSet = false; + TMaybe Id; }; TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TMaybe& applicationName, @@ -148,7 +147,7 @@ class TKqpQueryState : public TNonCopyable { NYql::TIssues Issues; TVector Statements; - TMaybe ImpliedTxId = {}; // Implied tx + TMaybe ImplicitTxId = {}; // Implicit tx for all statements ui32 CurrentStatementId = 0; ui32 StatementResultIndex = 0; ui32 StatementResultSize = 0; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b30957f22038..f8db4f14ce88 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -543,7 +543,7 @@ class TKqpSessionActor : public TActorBootstrapped { Ydb::Table::TransactionSettings settings; settings.mutable_serializable_read_write(); BeginTx(settings); - QueryState->ImpliedTxId = QueryState->TxId; + QueryState->ImplicitTxId = QueryState->TxId; CompileStatement(); } @@ -724,8 +724,8 @@ class TKqpSessionActor : public TActorBootstrapped { Ydb::Table::TransactionControl GetImpliedTxControl() { Ydb::Table::TransactionControl control; control.set_commit_tx(QueryState->ProcessingLastStatement()); - if (QueryState->ImpliedTxId) { - control.set_tx_id(QueryState->ImpliedTxId->GetValue().GetHumanStr()); + if (QueryState->ImplicitTxId) { + control.set_tx_id(QueryState->ImplicitTxId->GetValue().GetHumanStr()); } else { control.mutable_begin_tx()->mutable_serializable_read_write(); } @@ -748,7 +748,8 @@ class TKqpSessionActor : public TActorBootstrapped { } QueryState->TxCtx = txCtx; QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); - if (QueryState->TxId.GetValue() != txId) { + if (hasTxControl) { + LOG_E("AAA"); QueryState->TxId.SetValue(txId); } break;