Skip to content

Commit

Permalink
Support temp tables in yql (#1589)
Browse files Browse the repository at this point in the history
* Initial commit

* Fixes
  • Loading branch information
shnikd authored Feb 8, 2024
1 parent 8adf99d commit 3ce0953
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 3 deletions.
131 changes: 131 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,137 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(CreateTempTable) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});
TKikimrRunner kikimr(
serverSettings.SetWithSampleTables(false).SetEnableTempTables(true));
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
auto client = kikimr.GetQueryClient();
{
auto session = client.GetSession().GetValueSync().GetSession();
auto id = session.GetId();

const auto queryCreate = Q_(R"(
--!syntax_v1
CREATE TEMP TABLE Temp (
Key Uint64 NOT NULL,
Value String,
PRIMARY KEY (Key)
);)");

auto resultCreate = session.ExecuteQuery(queryCreate, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());

const auto querySelect = Q_(R"(
--!syntax_v1
SELECT * FROM Temp;
)");

auto resultSelect = session.ExecuteQuery(
querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);

UNIT_ASSERT(allDoneOk);
}

{
const auto querySelect = Q_(R"(
--!syntax_v1
SELECT * FROM Temp;
)");

auto resultSelect = client.ExecuteQuery(
querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!resultSelect.IsSuccess());
}
}

Y_UNIT_TEST(TempTablesDrop) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});
TKikimrRunner kikimr(
serverSettings.SetWithSampleTables(false).SetEnableTempTables(true));
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
auto client = kikimr.GetQueryClient();

auto session = client.GetSession().GetValueSync().GetSession();
auto id = session.GetId();

const auto queryCreate = Q_(R"(
--!syntax_v1
CREATE TEMPORARY TABLE Temp (
Key Uint64 NOT NULL,
Value String,
PRIMARY KEY (Key)
);)");

auto resultCreate = session.ExecuteQuery(queryCreate, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());

{
const auto querySelect = Q_(R"(
--!syntax_v1
SELECT * FROM Temp;
)");

auto resultSelect = session.ExecuteQuery(
querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
}

const auto queryDrop = Q_(R"(
--!syntax_v1
DROP TABLE Temp;
)");

auto resultDrop = session.ExecuteQuery(
queryDrop, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(resultDrop.IsSuccess(), resultDrop.GetIssues().ToString());

{
const auto querySelect = Q_(R"(
--!syntax_v1
SELECT * FROM Temp;
)");

auto resultSelect = session.ExecuteQuery(
querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!resultSelect.IsSuccess());
}

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);

UNIT_ASSERT(allDoneOk);

auto sessionAnother = client.GetSession().GetValueSync().GetSession();
auto idAnother = sessionAnother.GetId();
UNIT_ASSERT(id != idAnother);

{
const auto querySelect = Q_(R"(
--!syntax_v1
SELECT * FROM Temp;
)");

auto resultSelect = sessionAnother.ExecuteQuery(
querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!resultSelect.IsSuccess());
}
}

Y_UNIT_TEST(DdlGroup) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/sql/v1/SQLv1.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ object_features: object_feature | LPAREN object_feature (COMMA object_feature)*

object_type_ref: an_id_or_type;

create_table_stmt: CREATE (OR REPLACE)? (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
create_table_stmt: CREATE (OR REPLACE)? (TABLE | TABLESTORE | EXTERNAL TABLE | TEMP TABLE | TEMPORARY TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
table_inherits?
table_partition_by?
with_table_settings?
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/sql/v1/format/sql_format_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
")\n"
"PARTITION BY HASH (a, b, hash)\n"
"WITH (tiering = 'some');\n"},
{"create table if not exists user(user int32)", "CREATE TABLE IF NOT EXISTS user (\n\tuser int32\n);\n"}
{"create table if not exists user(user int32)", "CREATE TABLE IF NOT EXISTS user (\n\tuser int32\n);\n"},
{"create temp table user(user int32)", "CREATE TEMP TABLE user (\n\tuser int32\n);\n"},
{"create temporary table user(user int32)", "CREATE TEMPORARY TABLE user (\n\tuser int32\n);\n"}
};

TSetup setup;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/sql/v1/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ namespace NSQLTranslationV1 {
TVector<TChangefeedDescription> Changefeeds;
TTableSettings TableSettings;
ETableType TableType = ETableType::Table;
bool Temporary = false;
};

struct TAlterTableParameters {
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/sql/v1/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,10 @@ class TCreateTableNode final: public TAstListNode {
break;
}

if (Params.Temporary) {
opts = L(opts, Q(Y(Q("temporary"))));
}

Add("block", Q(Y(
Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, Table.Service), Scoped->WrapCluster(Table.Cluster, ctx))),
Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(opts))),
Expand Down
6 changes: 5 additions & 1 deletion ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,14 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&

const auto& block = rule.GetBlock3();
ETableType tableType = ETableType::Table;
bool temporary = false;
if (block.HasAlt2() && block.GetAlt2().GetToken1().GetId() == SQLv1LexerTokens::TOKEN_TABLESTORE) {
tableType = ETableType::TableStore;
} else if (block.HasAlt3() && block.GetAlt3().GetToken1().GetId() == SQLv1LexerTokens::TOKEN_EXTERNAL) {
tableType = ETableType::ExternalTable;
} else if (block.HasAlt4() && block.GetAlt4().GetToken1().GetId() == SQLv1LexerTokens::TOKEN_TEMP ||
block.HasAlt5() && block.GetAlt5().GetToken1().GetId() == SQLv1LexerTokens::TOKEN_TEMPORARY) {
temporary = true;
}

bool existingOk = false;
Expand All @@ -198,7 +202,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
return false;
}

TCreateTableParameters params{.TableType=tableType};
TCreateTableParameters params{.TableType=tableType, .Temporary=temporary};
if (!CreateTableEntry(rule.GetRule_create_table_entry7(), params)) {
return false;
}
Expand Down
34 changes: 34 additions & 0 deletions ydb/library/yql/sql/v1/sql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,40 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write!"]);
}

Y_UNIT_TEST(CreateTempTable) {
NYql::TAstParseResult res = SqlToYql("USE plato; CREATE TEMP TABLE t (a int32, primary key(a));");
UNIT_ASSERT(res.Root);

TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write!") {
UNIT_ASSERT_VALUES_UNEQUAL_C(TString::npos,
line.find(R"__((Write! world sink (Key '('tablescheme (String '"t"))) (Void) '('('mode 'create) '('columns '('('"a" (AsOptionalType (DataType 'Int32)) '('columnConstrains '()) '()))) '('primarykey '('"a")) '('temporary))))__"), line);
}
};

TWordCountHive elementStat = {{TString("Write!"), 0}};
VerifyProgram(res, elementStat, verifyLine);

UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write!"]);
}

Y_UNIT_TEST(CreateTemporaryTable) {
NYql::TAstParseResult res = SqlToYql("USE plato; CREATE TEMPORARY TABLE t (a int32, primary key(a));");
UNIT_ASSERT(res.Root);

TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write!") {
UNIT_ASSERT_VALUES_UNEQUAL_C(TString::npos,
line.find(R"__((Write! world sink (Key '('tablescheme (String '"t"))) (Void) '('('mode 'create) '('columns '('('"a" (AsOptionalType (DataType 'Int32)) '('columnConstrains '()) '()))) '('primarykey '('"a")) '('temporary))))__"), line);
}
};

TWordCountHive elementStat = {{TString("Write!"), 0}};
VerifyProgram(res, elementStat, verifyLine);

UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write!"]);
}

Y_UNIT_TEST(CreateTableDuplicatedPkColumnsFail) {
NYql::TAstParseResult res = SqlToYql("USE plato; CREATE TABLE t (a int32 not null, primary key(a, a));");
UNIT_ASSERT(!res.Root);
Expand Down

0 comments on commit 3ce0953

Please sign in to comment.