Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Jan 19, 2024
1 parent 6ac905f commit dba1ef6
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 52 deletions.
35 changes: 26 additions & 9 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache;

bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) == nullptr;
bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState, true) == nullptr;

try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
Expand Down Expand Up @@ -815,18 +815,35 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

TKqpCompileResult::TConstPtr WithCache(
TKqpCompileResult::TConstPtr cacheResult, TKqpTempTablesState::TConstPtr tempTablesState) {
if (!cacheResult) {
TKqpCompileResult::TConstPtr compileResult,
TKqpTempTablesState::TConstPtr tempTablesState, bool forInsert = false) {
if (!compileResult) {
return nullptr;
}
if (!cacheResult->PreparedQuery) {
return cacheResult;
if (!compileResult->PreparedQuery) {
return compileResult;
}
auto hasTempTables = cacheResult->PreparedQuery->HasTempTables(tempTablesState);
if (hasTempTables) {
return nullptr;
if (forInsert) {
auto hasTempTables = compileResult->PreparedQuery->HasTempTables(tempTablesState);
if (hasTempTables) {
return nullptr;
}
return compileResult;
}
if (!tempTablesState) {
return compileResult;
}
auto tables = compileResult->PreparedQuery->GetQueryTables();
auto tempTables = THashSet<TString>();
for (const auto& [path, info] : tempTablesState->TempTables) {
tempTables.insert(path.second);
}
for (const auto& path: tables) {
if (tempTables.contains(path)) {
return nullptr;
}
}
return cacheResult;
return compileResult;
}

void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) {
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

case NKqpProto::TKqpSchemeOperation::kDropTable: {
auto modifyScheme = schemeOp.GetDropTable();
if (Temporary) {
auto* dropTable = modifyScheme.MutableDrop();
dropTable->SetName(dropTable->GetName() + SessionId);
}
const auto& modifyScheme = schemeOp.GetDropTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/kqp/query_data/kqp_prepared_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ bool TKqpPhyTxHolder::IsLiteralTx() const {
return LiteralTx;
}

std::optional<std::pair<std::pair<TString, TString>, bool>>
std::optional<std::pair<bool, std::pair<TString, TString>>>
TKqpPhyTxHolder::GetSchemeOpTempTablePath() const {
if (GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) {
return std::nullopt;
Expand All @@ -131,8 +131,7 @@ TKqpPhyTxHolder::GetSchemeOpTempTablePath() const {
}
if (tableDesc->HasTemporary()) {
if (tableDesc->GetTemporary()) {
return {{{modifyScheme.GetWorkingDir(), tableDesc->GetName()},
true}};
return {{true, {modifyScheme.GetWorkingDir(), tableDesc->GetName()}}};
}
}
break;
Expand All @@ -141,8 +140,7 @@ TKqpPhyTxHolder::GetSchemeOpTempTablePath() const {
auto modifyScheme = schemeOperation.GetDropTable();
auto* dropTable = modifyScheme.MutableDrop();

return {{{modifyScheme.GetWorkingDir(), dropTable->GetName()},
false}};
return {{false, {modifyScheme.GetWorkingDir(), dropTable->GetName()}}};
}
default:
return std::nullopt;
Expand Down Expand Up @@ -274,6 +272,7 @@ bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTabl
if (!tempTablesState) {
return false;
}
YQL_ENSURE(tempTablesState->SessionId);
auto tempTables = THashSet<TString>();
for (const auto& [path, info] : tempTablesState->TempTables) {
tempTables.insert(path.second + *tempTablesState->SessionId);
Expand All @@ -289,7 +288,7 @@ bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTabl
if (!optPath) {
continue;
} else {
const auto& [path, isCreate] = *optPath;
const auto& [isCreate, path] = *optPath;
if (isCreate) {
return true;
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/query_data/kqp_prepared_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TKqpPhyTxHolder {

bool IsLiteralTx() const;

std::optional<std::pair<std::pair<TString, TString>, bool>>
std::optional<std::pair<bool, std::pair<TString, TString>>>
GetSchemeOpTempTablePath() const;
};

Expand Down
63 changes: 32 additions & 31 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,39 +1162,34 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
}

std::optional<TKqpTempTablesState::TTempTableInfo> GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) {
std::optional<std::pair<bool, TKqpTempTablesState::TTempTableInfo>>
GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) {
if (!tx) {
return std::nullopt;
}
const auto& schemeOperation = tx->GetSchemeOperation();
switch (schemeOperation.GetOperationCase()) {
case NKqpProto::TKqpSchemeOperation::kCreateTable: {
const auto& modifyScheme = schemeOperation.GetCreateTable();
const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
switch (modifyScheme.GetOperationType()) {
case NKikimrSchemeOp::ESchemeOpCreateTable: {
tableDesc = &modifyScheme.GetCreateTable();
break;
}
case NKikimrSchemeOp::ESchemeOpCreateIndexedTable: {
tableDesc = &modifyScheme.GetCreateIndexedTable().GetTableDescription();
break;
}
default:
YQL_ENSURE(false, "Unexpected operation type");
}
auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>();
if (tableDesc->HasTemporary()) {
if (tableDesc->GetTemporary()) {
return {{tableDesc->GetName(), modifyScheme.GetWorkingDir(), Settings.Cluster, userToken, Settings.Database}};
}
}
break;
}
default:
return std::nullopt;
auto optPath = tx->GetSchemeOpTempTablePath();
if (!optPath) {
return std::nullopt;
}
return std::nullopt;
const auto& [isCreate, path] = *optPath;
if (isCreate) {
auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>();
return {{true, {path.second, path.first, Settings.Cluster, userToken, Settings.Database}}};
}

TString name = path.second;
auto pos = name.find(*TempTablesState.SessionId);

if (pos == TString::npos) {
return std::nullopt;
}
name.erase(pos, name.size());

auto it = TempTablesState.TempTables.find(std::make_pair(Settings.Cluster, JoinPath({path.first, name})));
if (it == TempTablesState.TempTables.end()) {
return std::nullopt;
}
return {{false, it->second}};
}

void UpdateTempTablesState() {
Expand All @@ -1205,8 +1200,14 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
if (!tx) {
return;
}
if (auto tempTableInfo = GetTemporaryTableInfo(tx)) {
TempTablesState.TempTables[std::make_pair(tempTableInfo->Database, JoinPath({tempTableInfo->WorkingDir, tempTableInfo->Name}))] = std::move(*tempTableInfo);
auto optInfo = GetTemporaryTableInfo(tx);
if (optInfo) {
auto [isCreate, tempTableInfo] = *optInfo;
if (isCreate) {
TempTablesState.TempTables[std::make_pair(tempTableInfo.Database, JoinPath({tempTableInfo.WorkingDir, tempTableInfo.Name}))] = tempTableInfo;
} else {
TempTablesState.TempTables.erase(std::make_pair(tempTableInfo.Database, JoinPath({tempTableInfo.WorkingDir, tempTableInfo.Name})));
}
QueryState->UpdateTempTablesState(TempTablesState);
}
}
Expand Down
Loading

0 comments on commit dba1ef6

Please sign in to comment.