Skip to content

Commit

Permalink
fixes applied to sessions sysview (ydb-platform#8661)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Sep 3, 2024
1 parent 5bea49a commit be39b5a
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
13 changes: 11 additions & 2 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,11 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
KQP_PROXY_LOG_D("incoming list sessions request " << ev->Get()->Record.ShortUtf8DebugString());

auto result = std::make_unique<TEvKqp::TEvListSessionsResponse>();
auto startIt = LocalSessions->GetOrderedLowerBound(ev->Get()->Record.GetSessionIdStart());

const auto& tenant = ev->Get()->Record.GetTenantName();
bool checkTenant = (AppData()->TenantName != tenant);

auto startIt = LocalSessions->GetOrderedLowerBound(tenant, ev->Get()->Record.GetSessionIdStart());
auto endIt = LocalSessions->GetOrderedEnd();
i32 freeSpace = ev->Get()->Record.GetFreeSpace();

Expand All @@ -1743,6 +1747,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {

while(startIt != endIt && freeSpace > 0) {
auto* sessionInfo = startIt->second;
if (checkTenant && sessionInfo->Database != ev->Get()->Record.GetTenantName()) {
finished = true;
break;
}

if (!until.empty()) {
if (sessionInfo->SessionId > until) {
Expand Down Expand Up @@ -1770,7 +1778,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
if (finished) {
result->Record.SetFinished(true);
} else {
result->Record.SetContinuationToken(startIt->first);
Y_ABORT_UNLESS(startIt != endIt);
result->Record.SetContinuationToken(startIt->first.second);
result->Record.SetFinished(false);
}

Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ struct TKqpSessionInfo {

class TLocalSessionsRegistry {
THashMap<TString, TKqpSessionInfo> LocalSessions;
std::map<TString, TKqpSessionInfo*> OrderedSessions;
std::map<std::pair<TString, TString>, TKqpSessionInfo*> OrderedSessions;
THashMap<TActorId, TString> TargetIdIndex;
THashSet<TString> ShutdownInFlightSessions;
THashMap<TString, ui32> SessionsCountPerDatabase;
Expand Down Expand Up @@ -208,7 +208,7 @@ class TLocalSessionsRegistry {
auto result = LocalSessions.emplace(sessionId,
TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos),
sessionStartedAt + idleDuration, IdleSessions.end(), pgWire, startedAt));
OrderedSessions.emplace(sessionId, &result.first->second);
OrderedSessions.emplace(std::make_pair(database, sessionId), &result.first->second);
SessionsCountPerDatabase[database]++;
Y_ABORT_UNLESS(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
Expand Down Expand Up @@ -302,11 +302,11 @@ class TLocalSessionsRegistry {
return ShutdownInFlightSessions.size();
}

std::map<TString, TKqpSessionInfo*>::const_iterator GetOrderedLowerBound(const TString& continuation) const {
return OrderedSessions.lower_bound(continuation);
std::map<std::pair<TString, TString>, TKqpSessionInfo*>::const_iterator GetOrderedLowerBound(const TString& tenant, const TString& continuation) const {
return OrderedSessions.lower_bound(std::make_pair(tenant, continuation));
}

std::map<TString, TKqpSessionInfo*>::const_iterator GetOrderedEnd() const {
std::map<std::pair<TString, TString>, TKqpSessionInfo*>::const_iterator GetOrderedEnd() const {
return OrderedSessions.end();
}

Expand Down Expand Up @@ -339,7 +339,7 @@ class TLocalSessionsRegistry {
}
}

OrderedSessions.erase(sessionId);
OrderedSessions.erase(std::make_pair(it->second.Database, sessionId));
LocalSessions.erase(it);
}

Expand Down Expand Up @@ -452,7 +452,7 @@ class TResourcePoolsCache {
return true;
}

const auto databaseInfo = GetDatabaseInfo(database);
const auto databaseInfo = GetDatabaseInfo(database);
return !databaseInfo || !databaseInfo->Serverless;
}

Expand Down
9 changes: 8 additions & 1 deletion ydb/core/kqp/proxy_service/kqp_session_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace NKikimr::NKqp {

using VSessions = NKikimr::NSysView::Schema::QuerySessions;

constexpr size_t QUERY_TEXT_LIMIT = 10_KB;

void TKqpSessionInfo::SerializeTo(::NKikimrKqp::TSessionInfo* proto, const TFieldsMap& fieldsMap) const {
if (fieldsMap.NeedField(VSessions::SessionId::ColumnId)) { // 1
proto->SetSessionId(SessionId);
Expand All @@ -26,7 +28,12 @@ void TKqpSessionInfo::SerializeTo(::NKikimrKqp::TSessionInfo* proto, const TFiel

// last executed query or currently running query.
if (fieldsMap.NeedField(VSessions::Query::ColumnId)) { // 4
proto->SetQuery(QueryText);
if (QueryText.size() > QUERY_TEXT_LIMIT) {
TString truncatedText = QueryText.substr(0, QUERY_TEXT_LIMIT);
proto->SetQuery(QueryText);
} else {
proto->SetQuery(QueryText);
}
}

if (fieldsMap.NeedField(VSessions::QueryCount::ColumnId)) { // 5
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ message TEvListSessionsRequest {
repeated uint32 Columns = 5;
optional int64 FreeSpace = 6;
optional int64 Limit = 7;
optional string TenantName = 8;
}

message TEvListSessionsResponse {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/sys_view/sessions/sessions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class TSessionsScan : public NKikimr::NSysView::TScanActorBase<TSessionsScan> {
const auto& nodeId = PendingNodes.front();
auto kqpProxyId = NKqp::MakeKqpProxyID(nodeId);
auto req = std::make_unique<NKikimr::NKqp::TEvKqp::TEvListSessionsRequest>();
req->Record.SetTenantName(TenantName);
if (!ContinuationToken.empty()) {
req->Record.SetSessionIdStart(ContinuationToken);
req->Record.SetSessionIdStartInclusive(true);
Expand Down

0 comments on commit be39b5a

Please sign in to comment.