Skip to content

Commit

Permalink
KIKIMR-20740: QueryCache with temp tables (#1053)
Browse files Browse the repository at this point in the history
* Initial commit

* Fixes

* Fixes

* Fixes

* Fixes

* Fixes

* Fixes

* Fixes

* Fixes

* Fixes
  • Loading branch information
shnikd authored Jan 25, 2024
1 parent dbf1e80 commit 2abbcfe
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 114 deletions.
17 changes: 17 additions & 0 deletions ydb/core/kqp/common/simple/temp_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,21 @@

namespace NKikimr::NKqp {

THashMap<TString, TKqpTempTablesState::TTempTableInfo>::const_iterator
TKqpTempTablesState::FindInfo(const std::string_view& path, bool withSessionId) const {
if (!withSessionId) {
return TempTables.find(path);
}

if (path.size() < SessionId.size()) {
return TempTables.end();
}
size_t pos = path.size() - SessionId.size();
if (path.substr(pos) != SessionId) {
return TempTables.end();
}

return TempTables.find(path.substr(0, pos));
}

} // namespace NKikimr::NKqp
10 changes: 6 additions & 4 deletions ydb/core/kqp/common/simple/temp_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/library/aclib/aclib.h>

#include <optional>
#include <string_view>

#include <util/generic/fwd.h>
#include <util/generic/hash.h>
Expand All @@ -14,14 +15,15 @@ struct TKqpTempTablesState {
struct TTempTableInfo {
TString Name;
TString WorkingDir;
TString Database;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString Cluster;
};
std::optional<TString> SessionId;
THashMap<std::pair<TString, TString>, TTempTableInfo> TempTables;
TString SessionId;
THashMap<TString, TTempTableInfo> TempTables;

using TConstPtr = std::shared_ptr<const TKqpTempTablesState>;

THashMap<TString, TTempTableInfo>::const_iterator
FindInfo(const std::string_view& path, bool withSessionId = false) const;
};

} // namespace NKikimr::NKqp
65 changes: 51 additions & 14 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
Counters->ReportCompileRequestGet(dbCounters);

auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
}
if (compileResult) {
Y_ENSURE(compileResult->Query);
if (compileResult->Query->UserSid == userSid) {
Expand Down Expand Up @@ -610,6 +613,10 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
}

if (compileResult) {
Counters->ReportQueryCacheHit(dbCounters, true);

Expand Down Expand Up @@ -672,7 +679,11 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto dbCounters = request.DbCounters;
Counters->ReportRecompileRequestGet(dbCounters);

auto compileResult = QueryCache.FindByUid(request.Uid, false);
TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
}

if (compileResult || request.Query) {
Counters->ReportCompileRequestCompile(dbCounters);

Expand Down Expand Up @@ -736,19 +747,12 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache;

bool hasTempTablesNameClashes = HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState, true);

try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
if (!hasTempTablesNameClashes) {
UpdateQueryCache(compileResult, keepInCache);
}

if (ev->Get()->ReplayMessage) {
Expand All @@ -762,8 +766,10 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
}
} else {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.EraseByUid(compileResult->Uid);
if (!hasTempTablesNameClashes) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.EraseByUid(compileResult->Uid);
}
}
}

Expand Down Expand Up @@ -814,12 +820,43 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
StartCheckQueriesTtlTimer();
}

bool HasTempTablesNameClashes(
TKqpCompileResult::TConstPtr compileResult,
TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) {
if (!compileResult) {
return false;
}
if (!compileResult->PreparedQuery) {
return false;
}

return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId);
}

void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
if (InsertPreparingQuery(compileResult, true)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
}
}

void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) {
auto& parseResult = ev->Get()->AstResult;
auto& query = ev->Get()->Query;
auto compileRequest = RequestsQueue.FinishActiveRequest(query);
if (parseResult && parseResult->Ast->IsOk()) {
auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache);
if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
compileResult = nullptr;
}
if (compileResult) {
Counters->ReportQueryCacheHit(compileRequest.DbCounters, true);

Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

case NKqpProto::TKqpSchemeOperation::kDropTable: {
auto modifyScheme = schemeOp.GetDropTable();
if (Temporary) {
auto* dropTable = modifyScheme.MutableDrop();
dropTable->SetName(dropTable->GetName() + SessionId);
}
const auto& modifyScheme = schemeOp.GetDropTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
Expand Down
18 changes: 8 additions & 10 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ struct NavigateEntryResult {
std::optional<TString> QueryName;
};

NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& path,
const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) {
NavigateEntryResult CreateNavigateEntry(const TString& path,
const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) {
TNavigate::TEntry entry;
TString currentPath = path;
std::optional<TString> queryName = std::nullopt;
if (tempTablesState) {
auto tempTablesIt = tempTablesState->TempTables.find(std::make_pair(cluster, currentPath));
if (tempTablesState->SessionId && tempTablesIt != tempTablesState->TempTables.end()) {
auto tempTablesInfoIt = tempTablesState->FindInfo(currentPath, false);
if (tempTablesInfoIt != tempTablesState->TempTables.end()) {
queryName = currentPath;
currentPath = currentPath + *tempTablesState->SessionId;
currentPath = currentPath + tempTablesState->SessionId;
}
}
entry.Path = SplitPath(currentPath);
Expand All @@ -50,10 +50,8 @@ NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& p
return {entry, currentPath, queryName};
}

NavigateEntryResult CreateNavigateEntry(const TString& cluster,
const std::pair<TIndexId, TString>& pair,
NavigateEntryResult CreateNavigateEntry(const std::pair<TIndexId, TString>& pair,
const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) {
Y_UNUSED(cluster);
Y_UNUSED(tempTablesState);

TNavigate::TEntry entry;
Expand Down Expand Up @@ -701,8 +699,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta

const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_);
Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only");
auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(cluster,
id, settings, TempTablesState);
auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id,
settings, TempTablesState);
const auto entry = resNavigate.Entry;
const auto queryName = resNavigate.QueryName;
const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<NavigateEntryResult>{} : externalEntryItem;
Expand Down
39 changes: 23 additions & 16 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ struct TKikimrData {
const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TString& cluster,
const TString& table, TPositionHandle pos, TExprContext& ctx) const
{
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (tempTable) {
tablePath = *tempTable;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath));
Expand All @@ -141,11 +143,13 @@ const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TStrin
}

TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster, const TString& database, const TString& table, ETableType tableType) {
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (tempTable) {
tablePath = *tempTable;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

if (!Tables.FindPtr(std::make_pair(cluster, tablePath))) {
Expand All @@ -165,11 +169,13 @@ TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster
}

TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, const TString& table) {
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (tempTable) {
tablePath = *tempTable;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath));
Expand All @@ -181,12 +187,13 @@ TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, con
const TKikimrTableDescription& TKikimrTablesData::ExistingTable(const TStringBuf& cluster,
const TStringBuf& table) const
{
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTable) {
tablePath = *tempTable;
if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

auto desc = Tables.FindPtr(std::make_pair(TString(cluster), TString(tablePath)));
Expand Down
35 changes: 13 additions & 22 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,12 @@ class TKikimrTablesData : public TThrRefBase {
}

void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
if (tempTablesState) {
for (const auto& [path, info] : tempTablesState->TempTables) {
TempTables[path.second + *tempTablesState->SessionId] = path.second;
}
}
TempTablesState = std::move(tempTablesState);
}

private:
THashMap<std::pair<TString, TString>, TKikimrTableDescription> Tables;
THashMap<TString, TString> TempTables;
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
};

enum class TYdbOperation : ui32 {
Expand Down Expand Up @@ -288,11 +284,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
}

void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
if (tempTablesState) {
for (const auto& [path, info] : tempTablesState->TempTables) {
TempTables[path.second] = path.second + *tempTablesState->SessionId;
}
}
TempTablesState = std::move(tempTablesState);
}

template<class IterableKqpTableOps, class IterableKqpTableInfos>
Expand Down Expand Up @@ -330,17 +322,16 @@ class TKikimrTransactionContextBase : public TThrRefBase {
}

for (const auto& op : operations) {
const auto& table = [&]() -> const TString& {
const auto tempTable = TempTables.FindPtr(op.GetTable());
if (tempTable) {
return *tempTable;
} else {
return op.GetTable();
}
}();

const auto newOp = TYdbOperation(op.GetOperation());

auto table = op.GetTable();
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, false);
if (tempTableInfoIt != TempTablesState->TempTables.end()) {
table = tempTableInfoIt->first + TempTablesState->SessionId;
}
}

const auto info = tableInfoMap.FindPtr(table);
if (!info) {
TString message = TStringBuilder()
Expand Down Expand Up @@ -450,7 +441,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
THashMap<TString, TString> TempTables;
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
bool Readonly = false;
bool Invalidated = false;
bool Closed = false;
Expand Down Expand Up @@ -535,7 +526,7 @@ class TKikimrSessionContext : public TThrRefBase {
if (TxCtx) {
TxCtx->SetTempTables(tempTablesState);
}
TempTablesState = tempTablesState;
TempTablesState = std::move(tempTablesState);
}

const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const {
Expand Down
Loading

0 comments on commit 2abbcfe

Please sign in to comment.