Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix wrong isolation level #6568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,18 +492,6 @@ class TKqpQueryState : public TNonCopyable {
PrepareCurrentStatement();
}

void PrepareStatementTransaction(NKqpProto::TKqpPhyTx_EType txType) {
if (!HasTxControl()) {
switch (txType) {
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
break;
default:
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
}
}

// validate the compiled query response and ensure that all table versions are not
// changed since the last compilation.
bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response);
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Copy link
Collaborator

@UgnineSirdis UgnineSirdis Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this will fix the problem. I've debugged this test and found out, that the error occurs when we are executing the last PhyTx: ExecutePhyTx(tx==nullptr, commit=true). In this case we don't enter the first if (tx) {} block and so don't execute QueryState->PrepareStatementTransaction(tx->GetType());. So IsolationLevel is leaved as it was during previous PhyTx. I tried to add PrepareStatementTransaction also to the call with commit=true variant and it fixed the problem.
I think that the following code would be more safe:

bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) {
    if (tx) {
        QueryState->PrepareStatementTransaction(tx->GetType());
        switch (tx->GetType()) {
            ...
        }
    } else {
        YQL_ENSURE(commit);
        QueryState->PrepareStatementTransaction(NKqpProto::TKqpPhyTx::TYPE_GENERIC); // not sure that this is the most suitable type, but in this call the main thing is that it is not equal to TYPE_SCHEME
    }
    ...
}

Original file line number Diff line number Diff line change
Expand Up @@ -1096,11 +1096,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) {
if (tx) {
QueryState->PrepareStatementTransaction(tx->GetType());
switch (tx->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
YQL_ENSURE(tx->StagesSize() == 0);
if (QueryState->HasTxControl() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
if (QueryState->HasTxControl() && !QueryState->HasImplicitTx() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Scheme operations cannot be executed inside transaction");
return true;
Expand Down
182 changes: 182 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2313,6 +2313,10 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml2;
CREATE TABLE TestDdlDml33 (
Key Uint64,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
Expand All @@ -2327,6 +2331,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(0)));

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdlDml33;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));

result = db.ExecuteQuery(R"(
CREATE TABLE TestDdlDml4 (
Key Uint64,
Expand Down Expand Up @@ -2621,6 +2632,177 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(CheckIsolationLevelFroPerStatementMode) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
appConfig.MutableTableServiceConfig()->SetEnableAstCache(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetQueryClient();
auto tableClient = kikimr.GetTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

{
// 1 ddl statement
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test1 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

NYdb::NTable::TDescribeTableResult describe = session.DescribeTable("/Root/Test1").GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
}

{
// 2 ddl statements
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test2 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
CREATE TABLE Test3 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

NYdb::NTable::TDescribeTableResult describe1 = session.DescribeTable("/Root/Test2").GetValueSync();
UNIT_ASSERT_EQUAL(describe1.GetStatus(), EStatus::SUCCESS);
NYdb::NTable::TDescribeTableResult describe2 = session.DescribeTable("/Root/Test3").GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetStatus(), EStatus::SUCCESS);
}

{
// 1 dml statement
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test1;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
}

{
// 2 dml statements
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test2;
SELECT * FROM Test3;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
}

{
// 1 ddl 1 dml statements
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test4 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test4;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe = session.DescribeTable("/Root/Test4").GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
}

{
// 1 dml 1 ddl statements
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test4;
CREATE TABLE Test5 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe = session.DescribeTable("/Root/Test5").GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
}

{
// 1 ddl 1 dml 1 ddl 1 dml statements
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test6 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test6;
CREATE TABLE Test7 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test7;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe1 = session.DescribeTable("/Root/Test6").GetValueSync();
UNIT_ASSERT_EQUAL(describe1.GetStatus(), EStatus::SUCCESS);
NYdb::NTable::TDescribeTableResult describe2 = session.DescribeTable("/Root/Test7").GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetStatus(), EStatus::SUCCESS);
}

{
// 1 dml 1 ddl 1 dml 1 ddl statements
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test7;
CREATE TABLE Test8 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test8;
CREATE TABLE Test9 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe1 = session.DescribeTable("/Root/Test8").GetValueSync();
UNIT_ASSERT_EQUAL(describe1.GetStatus(), EStatus::SUCCESS);
NYdb::NTable::TDescribeTableResult describe2 = session.DescribeTable("/Root/Test9").GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetStatus(), EStatus::SUCCESS);
}
}

Y_UNIT_TEST(TableSink_ReplaceFromSelectOlap) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
Expand Down
Loading