Skip to content

Commit

Permalink
delete query from compile cache if it is existed during insert (#8506)
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka authored Sep 2, 2024
1 parent 6fca8c1 commit 30edde3
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
22 changes: 22 additions & 0 deletions ydb/core/kqp/common/simple/query_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <google/protobuf/util/message_differencer.h>

#include <util/generic/yexception.h>
#include <util/string/escape.h>

#include <memory>

Expand Down Expand Up @@ -74,4 +75,25 @@ bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
return true;
}

TString TKqpQueryId::SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "Cluster: " << Cluster << ", "
<< "Database: " << Database << ", "
<< "UserSid: " << UserSid << ", "
<< "Text: " << EscapeC(Text) << ", "
<< "Settings: " << Settings.SerializeToString() << ", ";
if (QueryParameterTypes) {
result << "QueryParameterTypes: [";
for (const auto& param : *QueryParameterTypes) {
result << "name: " << param.first << ", type: " << param.second.ShortDebugString();
}
result << "], ";
} else {
result << "QueryParameterTypes: <empty>, ";
}

result << "GUCSettings: " << GUCSettings.SerializeToString() << "}";
return result;
}

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/query_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct TKqpQueryId {
GUCSettings.GetHash());
return THash<decltype(tuple)>()(tuple);
}

TString SerializeToString() const;
};
} // namespace NKikimr::NKqp

Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/simple/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/public/api/protos/ydb_query.pb.h>

#include <util/generic/string.h>
#include <util/str_stl.h>
#include <util/string/builder.h>

#include <tuple>

Expand Down Expand Up @@ -39,6 +41,14 @@ struct TKqpQuerySettings {
auto tuple = std::make_tuple(DocumentApiRestricted, IsInternalCall, QueryType, Syntax);
return THash<decltype(tuple)>()(tuple);
}

TString SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "DocumentApiRestricted: " << DocumentApiRestricted << ", "
<< "IsInternalCall: " << IsInternalCall << ", "
<< "QueryType: " << QueryType << "}";
return result;
}
};

} // namespace NKikimr::NKqp
Expand Down
21 changes: 17 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class TKqpQueryCache {
YQL_ENSURE(compileResult->PreparedQuery);

auto queryIt = QueryIndex.emplace(query, compileResult->Uid);
if (!queryIt.second) {
EraseByUid(compileResult->Uid);
}
Y_ENSURE(queryIt.second);
}

Expand Down Expand Up @@ -676,6 +679,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
Y_ENSURE(query.UserSid == userSid);
}

LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by queryId, queryId: " << query.SerializeToString());
auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
Expand Down Expand Up @@ -857,7 +861,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (!hasTempTablesNameClashes) {
UpdateQueryCache(compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
}

if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) {
Expand Down Expand Up @@ -939,15 +943,21 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId);
}

void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) {
void UpdateQueryCache(const TActorContext& ctx, TKqpCompileResult::TConstPtr compileResult, bool keepInCache, bool isQueryActionPrepare, bool isPerStatementExecution) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (compileResult->Query) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert query into compile cache, queryId: " << compileResult->Query->SerializeToString());
if (QueryCache.FindByQuery(*compileResult->Query, keepInCache)) {
LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Trying to insert query into compile cache when it is already there");
}
}
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution)) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && isQueryActionPrepare) {
if (InsertPreparingQuery(compileResult, true, isPerStatementExecution)) {
if (InsertPreparingQuery(ctx, compileResult, true, isPerStatementExecution)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
Expand All @@ -958,6 +968,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
YQL_ENSURE(queryAst.Ast);
YQL_ENSURE(queryAst.Ast->IsOk());
YQL_ENSURE(queryAst.Ast->Root);
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Try to find query by ast, queryId: " << compileRequest.Query.SerializeToString()
<< ", ast: " << queryAst.Ast->Root->ToString());
auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache);

if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
Expand Down Expand Up @@ -1026,7 +1038,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

private:
bool InsertPreparingQuery(const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache, bool isPerStatementExecution) {
bool InsertPreparingQuery(const TActorContext& ctx, const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache, bool isPerStatementExecution) {
YQL_ENSURE(compileResult->Query);
auto query = *compileResult->Query;

Expand All @@ -1051,6 +1063,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->Ast);
newCompileResult->AllowCache = compileResult->AllowCache;
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache(), isPerStatementExecution);
}

Expand Down

0 comments on commit 30edde3

Please sign in to comment.