diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index f5e4848d0228..062148adf752 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal& query, bool isQueryActionPrepare, TInstant deadline, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe& applicationName, std::shared_ptr> intrestedInResult, const TIntrusivePtr& userRequestContext, - NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) + NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe queryAst = Nothing()) : UserToken(userToken) , Uid(uid) , Query(query) @@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal> IntrestedInResult; + + TMaybe QueryAst; }; struct TEvCompileResponse: public TEventLocal { diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index de8852aa0df2..01f1615cc466 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -759,13 +759,17 @@ class TKqpCompileService : public TActorBootstrapped { NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); - TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, + TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query, compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName, ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState)); + if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) { + return CompileByAst(*request.QueryAst, compileRequest, ctx); + } + if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -1138,7 +1142,7 @@ class TKqpCompileService : public TActorBootstrapped { << ", message: " << e.what()); } - void Reply(const TActorId& sender, const TVector astStatements, const TKqpQueryId query, + void Reply(const TActorId& sender, const TVector& astStatements, const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReply, @@ -1157,7 +1161,7 @@ class TKqpCompileService : public TActorBootstrapped { ctx.Send(sender, responseEv.Release(), 0, cookie); } - void ReplyQueryStatements(const TActorId& sender, const TVector astStatements, + void ReplyQueryStatements(const TActorId& sender, const TVector& astStatements, const TKqpQueryId query, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyStatements, orbit); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 80d1cc9ee093..008d036808c3 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -14,6 +14,31 @@ using namespace NSchemeCache; #define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) #define LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) + +TKqpQueryState::TQueryTxId::TQueryTxId(const TQueryTxId& other) { + YQL_ENSURE(!Id); + Id = other.Id; +} + +TKqpQueryState::TQueryTxId& TKqpQueryState::TQueryTxId::operator=(const TQueryTxId& id) { + YQL_ENSURE(!Id); + Id = id.Id; + return *this; +} + +void TKqpQueryState::TQueryTxId::SetValue(const TTxId& id) { + YQL_ENSURE(!Id); + Id = id.Id; +} + +TTxId TKqpQueryState::TQueryTxId::GetValue() { + return Id ? *Id : TTxId(); +} + +void TKqpQueryState::TQueryTxId::Reset() { + Id = TTxId(); +} + bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response) { Y_ENSURE(response.Request); const auto& navigate = *response.Request; @@ -244,8 +269,15 @@ std::unique_ptr TKqpQueryState::BuildReCompileReque compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt); } - return std::make_unique(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare, - compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState); + TMaybe statementAst; + if (!Statements.empty()) { + YQL_ENSURE(CurrentStatementId < Statements.size()); + statementAst = Statements[CurrentStatementId]; + } + + return std::make_unique(UserToken, CompileResult->Uid, query, isQueryActionPrepare, + compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState, + statementAst); } std::unique_ptr TKqpQueryState::BuildSplitRequest(std::shared_ptr> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) { @@ -295,7 +327,6 @@ bool TKqpQueryState::PrepareNextStatementPart() { QueryData = {}; PreparedQuery = {}; CompileResult = {}; - TxCtx = {}; CurrentTx = 0; TableVersions = {}; MaxReadType = ETableReadType::Other; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 05b89f21185d..23ec65e7d2ef 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -33,6 +33,21 @@ namespace NKikimr::NKqp { // common case). class TKqpQueryState : public TNonCopyable { public: + class TQueryTxId { + public: + TQueryTxId() = default; + TQueryTxId(const TQueryTxId& other); + TQueryTxId& operator=(const TQueryTxId& id); + + void SetValue(const TTxId& id); + TTxId GetValue(); + + void Reset(); + + private: + TMaybe Id; + }; + TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TMaybe& applicationName, const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TString& sessionId, TMonotonic startedAt) @@ -110,7 +125,7 @@ class TKqpQueryState : public TNonCopyable { NWilson::TSpan KqpSessionSpan; ETableReadType MaxReadType = ETableReadType::Other; - TTxId TxId; // User tx + TQueryTxId TxId; // User tx bool Commit = false; bool Commited = false; @@ -132,6 +147,7 @@ class TKqpQueryState : public TNonCopyable { NYql::TIssues Issues; TVector Statements; + TMaybe ImplicitTxId = {}; // Implicit tx for all statements ui32 CurrentStatementId = 0; ui32 StatementResultIndex = 0; ui32 StatementResultSize = 0; @@ -417,15 +433,12 @@ class TKqpQueryState : public TNonCopyable { } void PrepareCurrentStatement() { - QueryData = {}; + QueryData = std::make_shared(TxCtx->TxAlloc); PreparedQuery = {}; CompileResult = {}; - TxCtx = {}; CurrentTx = 0; TableVersions = {}; MaxReadType = ETableReadType::Other; - Commit = false; - Commited = false; TopicOperations = {}; ReplayMessage = {}; } @@ -444,7 +457,6 @@ class TKqpQueryState : public TNonCopyable { TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; break; default: - Commit = true; TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index fb8ceebb1ca1..f8db4f14ce88 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -284,7 +284,7 @@ class TKqpSessionActor : public TActorBootstrapped { } QueryState->TxCtx = std::move(txCtx); QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); - QueryState->TxId = txId; + QueryState->TxId.SetValue(txId); if (!CheckTransactionLocks(/*tx*/ nullptr)) { return; } @@ -540,6 +540,10 @@ class TKqpSessionActor : public TActorBootstrapped { void Handle(TEvKqp::TEvParseResponse::TPtr& ev) { QueryState->SaveAndCheckParseResult(std::move(*ev->Get())); + Ydb::Table::TransactionSettings settings; + settings.mutable_serializable_read_write(); + BeginTx(settings); + QueryState->ImplicitTxId = QueryState->TxId; CompileStatement(); } @@ -680,7 +684,7 @@ class TKqpSessionActor : public TActorBootstrapped { } void BeginTx(const Ydb::Table::TransactionSettings& settings) { - QueryState->TxId = UlidGen.Next(); + QueryState->TxId.SetValue(UlidGen.Next()); QueryState->TxCtx = MakeIntrusive(false, AppData()->FunctionRegistry, AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects); @@ -704,7 +708,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->TxCtx->SetIsolationLevel(settings); QueryState->TxCtx->OnBeginQuery(); - if (!Transactions.CreateNew(QueryState->TxId, QueryState->TxCtx)) { + if (!Transactions.CreateNew(QueryState->TxId.GetValue(), QueryState->TxCtx)) { std::vector issues{ YqlIssue({}, TIssuesIds::KIKIMR_TOO_MANY_TRANSACTIONS)}; ythrow TRequestFail(Ydb::StatusIds::BAD_SESSION, @@ -717,14 +721,14 @@ class TKqpSessionActor : public TActorBootstrapped { Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); } - static const Ydb::Table::TransactionControl& GetImpliedTxControl() { - auto create = []() -> Ydb::Table::TransactionControl { - Ydb::Table::TransactionControl control; + Ydb::Table::TransactionControl GetImpliedTxControl() { + Ydb::Table::TransactionControl control; + control.set_commit_tx(QueryState->ProcessingLastStatement()); + if (QueryState->ImplicitTxId) { + control.set_tx_id(QueryState->ImplicitTxId->GetValue().GetHumanStr()); + } else { control.mutable_begin_tx()->mutable_serializable_read_write(); - control.set_commit_tx(true); - return control; - }; - static const Ydb::Table::TransactionControl control = create(); + } return control; } @@ -744,14 +748,17 @@ class TKqpSessionActor : public TActorBootstrapped { } QueryState->TxCtx = txCtx; QueryState->QueryData = std::make_shared(QueryState->TxCtx->TxAlloc); - QueryState->TxId = txId; + if (hasTxControl) { + LOG_E("AAA"); + QueryState->TxId.SetValue(txId); + } break; } case Ydb::Table::TransactionControl::kBeginTx: { BeginTx(txControl.begin_tx()); break; - } - case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: + } + case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << "wrong TxControl: tx_selector must be set"; break; @@ -1539,7 +1546,7 @@ class TKqpSessionActor : public TActorBootstrapped { void FillTxInfo(NKikimrKqp::TQueryResponse* response) { YQL_ENSURE(QueryState); - response->MutableTxMeta()->set_id(QueryState->TxId.GetHumanStr()); + response->MutableTxMeta()->set_id(QueryState->TxId.GetValue().GetHumanStr()); if (QueryState->TxCtx) { auto txInfo = QueryState->TxCtx->GetInfo(); @@ -1612,8 +1619,8 @@ class TKqpSessionActor : public TActorBootstrapped { if (QueryState->Commit) { ResetTxState(); - Transactions.ReleaseTransaction(QueryState->TxId); - QueryState->TxId = TTxId(); + Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); + QueryState->TxId.Reset(); } FillTxInfo(response); @@ -1958,7 +1965,7 @@ class TKqpSessionActor : public TActorBootstrapped { auto& txCtx = QueryState->TxCtx; if (txCtx->IsInvalidated()) { Transactions.AddToBeAborted(txCtx); - Transactions.ReleaseTransaction(QueryState->TxId); + Transactions.ReleaseTransaction(QueryState->TxId.GetValue()); } DiscardPersistentSnapshot(txCtx->SnapshotHandle); } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 3658ba62b80d..4ecea856e88b 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2159,6 +2159,77 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetQueryClient(); + { + // DDl + DML with explicit transaction + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml1 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml1 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml1; + ALTER TABLE TestDdlDml1 DROP COLUMN Value2; + UPSERT INTO TestDdlDml1 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml1; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Queries with mixed data and scheme operations are not supported.")); + } + + { + // DDl + DML with implicit transaction + auto result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml2 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1"); + SELECT * FROM TestDdlDml2; + ALTER TABLE TestDdlDml2 DROP COLUMN Value2; + UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2); + CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdlDml2; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + + result = db.ExecuteQuery(R"( + CREATE TABLE TestDdlDml4 ( + Key Uint64, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + UPSERT INTO TestDdlDml4 (Key, Value1, Value2) VALUES (1, "1", "2"); + SELECT * FROM TestDdlDml4; + ALTER TABLE TestDdlDml4 DROP COLUMN Value2; + UPSERT INTO TestDdlDml4 (Key, Value1) VALUES (2, "2"); + SELECT * FROM TestDdlDml5; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdlDml4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } + { // Base test with ddl and dml statements auto result = db.ExecuteQuery(R"( @@ -2228,13 +2299,21 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { // Test with query with error auto result = db.ExecuteQuery(R"( - UPSERT INTO TestDdl2 (Key, Value) VALUES (1, "One"); + SELECT * FROM TestDdl2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); CREATE TABLE TestDdl3 ( Key Uint64, Value String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four"); + UPSERT INTO TestDdl2 (Key, Value) VALUES (5, "Five"); CREATE TABLE TestDdl2 ( Key Uint64, Value String, @@ -2245,18 +2324,32 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { Value String, PRIMARY KEY (Key) ); - UPSERT INTO TestDdl1 (Key, Value) VALUES (3, "Three"); )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Check failed: path: '/Root/TestDdl2', error: path exist")); result = db.ExecuteQuery(R"( SELECT * FROM TestDdl2; )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]];[[4u];["Four"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0))); + UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl3; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + SELECT * FROM TestDdl4; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Cannot find table 'db.[/Root/TestDdl4]'")); } { @@ -2329,7 +2422,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];[1u]];[[2u];[2u]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); result = db.ExecuteQuery(R"( CREATE TABLE TestDdl5 ( @@ -2348,7 +2441,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); - CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); } }