Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24-3: CTAS fixes #6857

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
bool split = false, NYql::TExprContext* splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
: UserToken(userToken)
, Uid(uid)
, Query(query)
Expand All @@ -90,6 +91,9 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, QueryAst(queryAst)
, Split(split)
, SplitCtx(splitCtx)
, SplitExpr(splitExpr)
{
}

Expand All @@ -110,6 +114,10 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

TMaybe<TQueryAst> QueryAst;
bool Split = false;

NYql::TExprContext* SplitCtx = nullptr;
NYql::TExprNode::TPtr SplitExpr = nullptr;
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
YQL_ENSURE(PerStatementResult);

const auto prepareSettings = PrepareCompilationSettings(ctx);

auto result = KqpHost->SplitQuery(QueryId.Text, prepareSettings);
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);

Become(&TKqpCompileActor::CompileState);
ReplySplitResult(ctx, std::move(result));
Expand Down
17 changes: 14 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", queryUid: " << (request.Uid ? *request.Uid : "<empty>")
<< ", queryText: \"" << (request.Query ? EscapeC(request.Query->Text) : "<empty>") << "\""
<< ", keepInCache: " << request.KeepInCache
<< ", split: " << request.Split
<< *request.UserRequestContext);

*Counters->CompileQueryCacheSize = QueryCache.Size();
Expand Down Expand Up @@ -697,7 +698,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
request.Deadline,
ev->Get()->Split
? ECompileActorAction::SPLIT
: TableServiceConfig.GetEnableAstCache()
: (TableServiceConfig.GetEnableAstCache() && !request.QueryAst)
? ECompileActorAction::PARSE
: ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
Expand Down Expand Up @@ -760,7 +761,16 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");

TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
TKqpCompileSettings compileSettings(
true,
request.IsQueryActionPrepare,
false,
request.Deadline,
ev->Get()->Split
? ECompileActorAction::SPLIT
: (TableServiceConfig.GetEnableAstCache() && !request.QueryAst)
? ECompileActorAction::PARSE
: ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
Expand Down Expand Up @@ -824,6 +834,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
if (compileResult->NeedToSplit) {
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
ProcessQueue(ctx);
return;
}

Expand Down Expand Up @@ -961,7 +972,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
compileRequest.Orbit,
compileRequest.Query.UserSid);

compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE;
compileRequest.QueryAst = std::move(queryAst);

if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Expand Down Expand Up @@ -994,6 +1004,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
return;
}

compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE;
CompileByAst(astStatements.front(), compileRequest, ctx);
}

Expand Down
55 changes: 55 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2566,6 +2566,61 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(SeveralCTAS) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
appConfig.MutableTableServiceConfig()->SetEnableAstCache(true);
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting})
.SetWithSampleTables(false)
.SetEnableTempTables(true);

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetQueryClient();

{
auto result = db.ExecuteQuery(R"(
CREATE TABLE Table1 (
PRIMARY KEY (Key)
) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
CREATE TABLE Table2 (
PRIMARY KEY (Key)
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
CREATE TABLE Table3 (
PRIMARY KEY (Key)
) AS SELECT * FROM Table2 UNION ALL SELECT * FROM Table1;
SELECT * FROM Table1 ORDER BY Key;
SELECT * FROM Table2 ORDER BY Key;
SELECT * FROM Table3 ORDER BY Key;
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3);
// Results are empty. Snapshot was taken before tables were created, so we don't see changes after snapshot.
// This will be fixed in future, for example, by implicit commit before/after each ddl statement.
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1)));
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2)));

result = db.ExecuteQuery(R"(
SELECT * FROM Table1 ORDER BY Key;
SELECT * FROM Table2 ORDER BY Key;
SELECT * FROM Table3 ORDER BY Key;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3);
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[2u];["2"];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
// Also empty now(
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2)));
}
}

Y_UNIT_TEST(TableSink_ReplaceFromSelectOlap) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
Expand Down
Loading