Skip to content

Commit

Permalink
Merge 1e0bec3 into 3caf512
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka authored Apr 24, 2024
2 parents 3caf512 + 1e0bec3 commit 27887b8
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 21 deletions.
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
: UserToken(userToken)
, Uid(uid)
, Query(query)
Expand All @@ -89,6 +89,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, QueryAst(queryAst)
{
}

Expand All @@ -107,6 +108,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::

TKqpTempTablesState::TConstPtr TempTablesState;
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

TMaybe<TQueryAst> QueryAst;
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,17 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");

TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
ev->Get()->UserRequestContext,
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState));

if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) {
return CompileByAst(*request.QueryAst, compileRequest, ctx);
}

if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);

Expand Down
12 changes: 9 additions & 3 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,15 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, CompileResult->Query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState);
TMaybe<TQueryAst> statementAst;
if (!Statements.empty()) {
YQL_ENSURE(CurrentStatementId < Statements.size());
statementAst = Statements[CurrentStatementId];
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
statementAst);
}

std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildSplitRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) {
Expand Down Expand Up @@ -295,7 +302,6 @@ bool TKqpQueryState::PrepareNextStatementPart() {
QueryData = {};
PreparedQuery = {};
CompileResult = {};
TxCtx = {};
CurrentTx = 0;
TableVersions = {};
MaxReadType = ETableReadType::Other;
Expand Down
7 changes: 2 additions & 5 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class TKqpQueryState : public TNonCopyable {
TTxId TxId; // User tx
bool Commit = false;
bool Commited = false;
bool Begin = false;

NTopic::TTopicOperations TopicOperations;
TDuration CpuTime;
Expand Down Expand Up @@ -408,15 +409,12 @@ class TKqpQueryState : public TNonCopyable {
}

void PrepareCurrentStatement() {
QueryData = {};
QueryData = std::make_shared<TQueryData>(TxCtx->TxAlloc);
PreparedQuery = {};
CompileResult = {};
TxCtx = {};
CurrentTx = 0;
TableVersions = {};
MaxReadType = ETableReadType::Other;
Commit = false;
Commited = false;
TopicOperations = {};
ReplayMessage = {};
}
Expand All @@ -435,7 +433,6 @@ class TKqpQueryState : public TNonCopyable {
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
break;
default:
Commit = true;
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
}
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

void BeginTx(const Ydb::Table::TransactionSettings& settings) {
QueryState->Begin = true;
QueryState->TxId = UlidGen.Next();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);
Expand Down Expand Up @@ -731,7 +732,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
if (hasTxControl || QueryState->HasImpliedTx()) {
const auto& txControl = hasTxControl ? QueryState->GetTxControl() : GetImpliedTxControl();

QueryState->Commit = txControl.commit_tx();
QueryState->Commit = txControl.commit_tx() && QueryState->ProcessingLastStatement();
switch (txControl.tx_selector_case()) {
case Ydb::Table::TransactionControl::kTxId: {
auto txId = TTxId::FromString(txControl.tx_id());
Expand All @@ -746,15 +747,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
break;
}
case Ydb::Table::TransactionControl::kBeginTx: {
BeginTx(txControl.begin_tx());
if (!QueryState->Begin) {
BeginTx(txControl.begin_tx());
}
break;
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
<< "wrong TxControl: tx_selector must be set";
break;
}
} else {
} else if (QueryState->CurrentStatementId == 0) {
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
Expand Down
105 changes: 99 additions & 6 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2159,6 +2159,77 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetQueryClient();

{
// DDl + DML with explicit transaction
auto result = db.ExecuteQuery(R"(
CREATE TABLE TestDdlDml1 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
UPSERT INTO TestDdlDml1 (Key, Value1, Value2) VALUES (1, "1", "2");
SELECT * FROM TestDdlDml1;
ALTER TABLE TestDdlDml1 DROP COLUMN Value2;
UPSERT INTO TestDdlDml1 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Queries with mixed data and scheme operations are not supported."));
}

{
// DDl + DML with implicit transaction
auto result = db.ExecuteQuery(R"(
CREATE TABLE TestDdlDml2 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "2");
SELECT * FROM TestDdlDml2;
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml2;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
CompareYson(R"([[[1u];["1"];["2"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdlDml2;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(0)));

result = db.ExecuteQuery(R"(
CREATE TABLE TestDdlDml4 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
UPSERT INTO TestDdlDml4 (Key, Value1, Value2) VALUES (1, "1", "2");
SELECT * FROM TestDdlDml4;
ALTER TABLE TestDdlDml4 DROP COLUMN Value2;
UPSERT INTO TestDdlDml4 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml5;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0);

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdlDml4;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
}

{
// Base test with ddl and dml statements
auto result = db.ExecuteQuery(R"(
Expand Down Expand Up @@ -2228,13 +2299,21 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
{
// Test with query with error
auto result = db.ExecuteQuery(R"(
UPSERT INTO TestDdl2 (Key, Value) VALUES (1, "One");
SELECT * FROM TestDdl2;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four");
CREATE TABLE TestDdl3 (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
UPSERT INTO TestDdl2 (Key, Value) VALUES (4, "Four");
UPSERT INTO TestDdl2 (Key, Value) VALUES (5, "Five");
CREATE TABLE TestDdl2 (
Key Uint64,
Value String,
Expand All @@ -2245,18 +2324,32 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
Value String,
PRIMARY KEY (Key)
);
UPSERT INTO TestDdl1 (Key, Value) VALUES (3, "Three");
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0);
UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Check failed: path: '/Root/TestDdl2', error: path exist"));

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdl2;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]];[[4u];["Four"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["One"]];[[2u];["Two"]];[[3u];["Three"]]])", FormatResultSetYson(result.GetResultSet(0)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdl3;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdl4;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
UNIT_ASSERT(result.GetIssues().ToOneLineString().Contains("Cannot find table 'db.[/Root/TestDdl4]'"));
}

{
Expand Down Expand Up @@ -2329,7 +2422,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];[1u]];[[2u];[2u]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0)));

result = db.ExecuteQuery(R"(
CREATE TABLE TestDdl5 (
Expand All @@ -2348,7 +2441,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];[1u]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Expand Down

0 comments on commit 27887b8

Please sign in to comment.