diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp index f752bef3160e..33cec012a0f5 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp @@ -1,6 +1,8 @@ #include #include +#include + #include #include @@ -24,17 +26,9 @@ namespace NFq { class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped { struct TCounters { ::NMonitoring::TDynamicCounterPtr Counters; - struct TCommonMetrics { - ::NMonitoring::TDynamicCounters::TCounterPtr Ok; - ::NMonitoring::TDynamicCounters::TCounterPtr Error; - ::NMonitoring::THistogramPtr LatencyMs; - }; - - TCommonMetrics CpuLoadRequest; - ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage; - ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage; - ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage; - ::NMonitoring::TDynamicCounters::TCounterPtr AvailableLoadPercentage; + ::NMonitoring::TDynamicCounterPtr SubComponent; + + ::NMonitoring::THistogramPtr CpuLoadRequestLatencyMs; ::NMonitoring::TDynamicCounters::TCounterPtr TargetLoadPercentage; ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize; ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload; @@ -48,21 +42,11 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrappedGetSubgroup("component", "ComputeDatabaseMonitoring"); - auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest"); - RegisterCommonMetrics(CpuLoadRequest, subComponent); - InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false); - AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false); - QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false); - AvailableLoadPercentage = subComponent->GetCounter("AvailableLoadPercentage", false); - TargetLoadPercentage = subComponent->GetCounter("TargetLoadPercentage", false); - PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false); - PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true); - } - - void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) { - metrics.Ok = subComponent->GetCounter("Ok", true); - metrics.Error = subComponent->GetCounter("Error", true); - metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + SubComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest"); + CpuLoadRequestLatencyMs = SubComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); + TargetLoadPercentage = SubComponent->GetCounter("TargetLoadPercentage", false); + PendingQueueSize = SubComponent->GetCounter("PendingQueueSize", false); + PendingQueueOverload = SubComponent->GetCounter("PendingQueueOverload", true); } static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() { @@ -75,15 +59,19 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1))) , MaxClusterLoad(std::min(config.GetMaxClusterLoadPercentage(), 100) / 100.0) - , DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1) , PendingQueueSize(config.GetPendingQueueSize()) , Strict(config.GetStrict()) - , CpuNumber(config.GetCpuNumber()) + , CpuQuotaManager( + GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)), + std::max(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)), + TDuration::Zero(), + config.GetDefaultQueryLoadPercentage() ? std::min(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1, + config.GetStrict(), + config.GetCpuNumber(), + Counters.SubComponent + ) { - *Counters.AvailableLoadPercentage = 100; *Counters.TargetLoadPercentage = static_cast(MaxClusterLoad * 100); } @@ -105,8 +93,8 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped(InstantLoad, AverageLoad); - if (!Ready) { + auto response = std::make_unique(CpuQuotaManager.GetInstantLoad(), CpuQuotaManager.GetAverageLoad()); + if (!CpuQuotaManager.CheckLoadIsOutdated()) { response->Issues.AddIssue("CPU Load is unavailable"); } Send(ev->Sender, response.release(), 0, ev->Cookie); @@ -114,45 +102,20 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrappedGet(); - - auto now = TInstant::Now(); - if (!response.Issues) { - auto delta = now - LastCpuLoad; - LastCpuLoad = now; - - if (response.CpuNumber) { - CpuNumber = response.CpuNumber; - } - - InstantLoad = response.InstantLoad; - // exponential moving average - if (!Ready || delta >= AverageLoadInterval) { - AverageLoad = InstantLoad; - QuotedLoad = InstantLoad; - } else { - auto ratio = static_cast(delta.GetValue()) / AverageLoadInterval.GetValue(); - AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad; - QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad; - } - Ready = true; - Counters.CpuLoadRequest.Ok->Inc(); - *Counters.InstantLoadPercentage = static_cast(InstantLoad * 100); - *Counters.AverageLoadPercentage = static_cast(AverageLoad * 100); - CheckPendingQueue(); - *Counters.QuotedLoadPercentage = static_cast(QuotedLoad * 100); - } else { + if (response.Issues) { LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString()); - Counters.CpuLoadRequest.Error->Inc(); - CheckLoadIsOutdated(); } - Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds()); + Counters.CpuLoadRequestLatencyMs->Collect((TInstant::Now() - StartCpuLoad).MilliSeconds()); + + CpuQuotaManager.UpdateCpuLoad(response.InstantLoad, response.CpuNumber, !response.Issues); + CheckPendingQueue(); // TODO: make load pulling reactive // 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests) // 2. Active pulling when busy - if (MonitoringRequestDelay) { - Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup()); + if (auto delay = CpuQuotaManager.GetMonitoringRequestDelay()) { + Schedule(delay, new NActors::TEvents::TEvWakeup()); } else { SendCpuLoadRequest(); } @@ -164,48 +127,24 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped 1.0) { Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie); } else { - if (!request.Quota) { - request.Quota = DefaultQueryLoad; - } - CheckLoadIsOutdated(); - if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) { - if (PendingQueue.size() >= PendingQueueSize) { - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{ - NYql::TIssue{TStringBuilder{} - << "Cluster is overloaded, current quoted load " << static_cast(QuotedLoad * 100) - << "%, average load " << static_cast(AverageLoad * 100) << "%" - }}), 0, ev->Cookie); + auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad); + CheckPendingQueue(); + if (response.Status == NYdb::EStatus::OVERLOADED && PendingQueue.size() < PendingQueueSize) { + PendingQueue.push(ev); + Counters.PendingQueueSize->Inc(); + } else { + if (response.Status == NYdb::EStatus::OVERLOADED) { Counters.PendingQueueOverload->Inc(); - } else { - PendingQueue.push(ev); - Counters.PendingQueueSize->Inc(); } - } else { - QuotedLoad += request.Quota; - *Counters.QuotedLoadPercentage = static_cast(QuotedLoad * 100); - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie); + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie); } } } void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) { - if (CpuNumber) { - auto& request = *ev.Get()->Get(); - if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) { - auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber; - auto quota = request.Quota ? request.Quota : DefaultQueryLoad; - if (quota > load) { - auto adjustment = (quota - load) / 2; - if (QuotedLoad > adjustment) { - QuotedLoad -= adjustment; - } else { - QuotedLoad = 0.0; - } - CheckPendingQueue(); - *Counters.QuotedLoadPercentage = static_cast(QuotedLoad * 100); - } - } - } + auto& request = *ev.Get()->Get(); + CpuQuotaManager.AdjustCpuQuota(request.Quota, request.Duration, request.CpuSecondsConsumed); + CheckPendingQueue(); } void SendCpuLoadRequest() { @@ -215,57 +154,51 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped AverageLoadInterval) { - Ready = false; - QuotedLoad = 0.0; - if (Strict) { - while (PendingQueue.size()) { - auto& ev = PendingQueue.front(); - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie); - PendingQueue.pop(); - Counters.PendingQueueSize->Dec(); - } + if (Strict && !CpuQuotaManager.CheckLoadIsOutdated()) { + while (PendingQueue.size()) { + auto& ev = PendingQueue.front(); + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie); + PendingQueue.pop(); + Counters.PendingQueueSize->Dec(); } } } void CheckPendingQueue() { + CheckLoadIsOutdated(); + auto now = TInstant::Now(); - while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) { + while (PendingQueue.size()) { auto& ev = PendingQueue.front(); auto& request = *ev.Get()->Get(); if (request.Deadline && now >= request.Deadline) { Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::CANCELLED, NYql::TIssues{ NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie); } else { - QuotedLoad += request.Quota; - Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie); + auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad); + if (response.Status == NYdb::EStatus::OVERLOADED) { + break; + } + + Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie); } + PendingQueue.pop(); Counters.PendingQueueSize->Dec(); } } private: - TInstant StartCpuLoad; - TInstant LastCpuLoad; TActorId MonitoringClientActorId; TCounters Counters; - - double InstantLoad = 0.0; - double AverageLoad = 0.0; - double QuotedLoad = 0.0; - bool Ready = false; - - const TDuration MonitoringRequestDelay; - const TDuration AverageLoadInterval; const double MaxClusterLoad; - const double DefaultQueryLoad; const ui32 PendingQueueSize; const bool Strict; - ui32 CpuNumber = 0; + NKikimr::NKqp::NWorkload::TCpuQuotaManager CpuQuotaManager; TQueue PendingQueue; + + TInstant StartCpuLoad; }; std::unique_ptr CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) { diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make index a77f4292a98a..fcfb13f095ef 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/core/fq/libs/compute/ydb/synchronization_service ydb/core/fq/libs/control_plane_storage/proto ydb/core/fq/libs/quota_manager/proto + ydb/core/kqp/workload_service/common ydb/core/protos ydb/library/db_pool/protos ydb/library/yql/public/issue diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index bdde6089d1ef..6ef1bfe6cf45 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -41,15 +41,19 @@ struct TEvContinueRequest : public NActors::TEventLocal { - TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId) + TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed) : Database(database) , SessionId(sessionId) , PoolId(poolId) + , Duration(duration) + , CpuConsumed(cpuConsumed) {} const TString Database; const TString SessionId; const TString PoolId; + const TDuration Duration; + const TDuration CpuConsumed; }; struct TEvCleanupResponse : public NActors::TEventLocal { diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp index 42c4687a19c6..d3196d1f2f85 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp @@ -138,8 +138,14 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso } if (settings.GetObjectId() == NResourcePool::DEFAULT_POOL_ID) { - if (properties.contains("concurrent_query_limit")) { - ythrow yexception() << "Can not change property concurrent_query_limit for default pool"; + std::vector forbiddenProperties = { + "concurrent_query_limit", + "database_load_cpu_threshold" + }; + for (const TString& property : forbiddenProperties) { + if (properties.contains(property)) { + ythrow yexception() << "Can not change property " << property << " for default pool"; + } } } } @@ -186,19 +192,19 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::DoModify(const NYql::TO TResourcePoolManager::TAsyncStatus TResourcePoolManager::CreateResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const { NKqpProto::TKqpSchemeOperation schemeOperation; PrepareCreateResourcePool(schemeOperation, settings, context); - return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId); + return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kCreateResourcePool); } TResourcePoolManager::TAsyncStatus TResourcePoolManager::AlterResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const { NKqpProto::TKqpSchemeOperation schemeOperation; PrepareAlterResourcePool(schemeOperation, settings, context); - return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId); + return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kAlterResourcePool); } TResourcePoolManager::TAsyncStatus TResourcePoolManager::DropResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const { NKqpProto::TKqpSchemeOperation schemeOperation; PrepareDropResourcePool(schemeOperation, settings, context); - return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId); + return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kDropResourcePool); } //// Deferred modification @@ -265,11 +271,11 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecutePrepared(const N try { switch (schemeOperation.GetOperationCase()) { case NKqpProto::TKqpSchemeOperation::kCreateResourcePool: - return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId); + return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId, schemeOperation.GetOperationCase()); case NKqpProto::TKqpSchemeOperation::kAlterResourcePool: - return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId); + return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId, schemeOperation.GetOperationCase()); case NKqpProto::TKqpSchemeOperation::kDropResourcePool: - return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId); + return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId, schemeOperation.GetOperationCase()); default: return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast(schemeOperation.GetOperationCase()))); } @@ -288,8 +294,13 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ChainFeatures(TAsyncSta }); } -TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const { - auto validationFuture = CheckFeatureFlag(context, nodeId); +TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const { + TAsyncStatus validationFuture = NThreading::MakeFuture(TYqlConclusionStatus::Success()); + if (operationCase != NKqpProto::TKqpSchemeOperation::kDropResourcePool) { + validationFuture = ChainFeatures(validationFuture, [context, nodeId] { + return CheckFeatureFlag(context, nodeId); + }); + } return ChainFeatures(validationFuture, [schemeTx, context] { return SendSchemeRequest(schemeTx, context); }); diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool/manager.h b/ydb/core/kqp/gateway/behaviour/resource_pool/manager.h index edab893dbe6c..64406f78e328 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool/manager.h +++ b/ydb/core/kqp/gateway/behaviour/resource_pool/manager.h @@ -31,7 +31,7 @@ class TResourcePoolManager : public NMetadata::NModifications::IOperationsManage void PrepareDropResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const; TAsyncStatus ChainFeatures(TAsyncStatus lastFeature, std::function callback) const; - TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const; + TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 10259a4cfca8..f763fb6b976e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -2076,8 +2076,15 @@ class TKqpSessionActor : public TActorBootstrapped { } CleanupCtx->Final = isFinal; CleanupCtx->IsWaitingForWorkloadServiceCleanup = true; + + const auto& stats = QueryState->QueryStats; + auto event = std::make_unique( + QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId, + TDuration::MicroSeconds(stats.DurationUs), TDuration::MicroSeconds(stats.WorkerCpuTimeUs) + ); + auto forwardId = MakeKqpWorkloadServiceId(SelfId().NodeId()); - Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), new NWorkload::TEvCleanupRequest(QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId)); + Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), event.release(), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId)); QueryState->PoolHandlerActor = Nothing(); } diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 4db4ac5dd9fb..b2b60631c079 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -6060,11 +6060,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - auto checkDisabled = [&session](const TString& query) { + auto checkQuery = [&session](const TString& query, EStatus status, const TString& error) { Cerr << "Check query:\n" << query << "\n"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::UNSUPPORTED); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it"); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), error); + }; + + auto checkDisabled = [checkQuery](const TString& query) { + checkQuery(query, EStatus::UNSUPPORTED, "Resource pools are disabled. Please contact your system administrator to enable it"); }; // CREATE RESOURCE POOL @@ -6083,7 +6087,9 @@ Y_UNIT_TEST_SUITE(KqpScheme) { )"); // DROP RESOURCE POOL - checkDisabled("DROP RESOURCE POOL MyResourcePool;"); + checkQuery("DROP RESOURCE POOL MyResourcePool;", + EStatus::SCHEME_ERROR, + "Path does not exist"); } Y_UNIT_TEST(ResourcePoolsValidation) { diff --git a/ydb/core/kqp/workload_service/actors/actors.h b/ydb/core/kqp/workload_service/actors/actors.h index 7bae3b142656..770867a58f1c 100644 --- a/ydb/core/kqp/workload_service/actors/actors.h +++ b/ydb/core/kqp/workload_service/actors/actors.h @@ -9,10 +9,13 @@ namespace NKikimr::NKqp::NWorkload { NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters); // Fetch pool and create default pool if needed -NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists); +NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless); // Fetch and create pool in scheme shard -NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken); +NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless); NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl); +// Cpu load fetcher actor +NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId); + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/actors/cpu_load_actors.cpp b/ydb/core/kqp/workload_service/actors/cpu_load_actors.cpp new file mode 100644 index 000000000000..425c8cfbd30e --- /dev/null +++ b/ydb/core/kqp/workload_service/actors/cpu_load_actors.cpp @@ -0,0 +1,77 @@ +#include "actors.h" + +#include + +#include + + +namespace NKikimr::NKqp::NWorkload { + +namespace { + +class TCpuLoadFetcherActor : public NKikimr::TQueryBase { + using TBase = NKikimr::TQueryBase; + +public: + TCpuLoadFetcherActor() + : TBase(NKikimrServices::KQP_WORKLOAD_SERVICE) + { + SetOperationInfo(__func__, ""); + } + + void OnRunQuery() override { + TString sql = TStringBuilder() << R"( + -- TCpuLoadFetcherActor::OnRunQuery + + SELECT + SUM(CpuThreads) AS ThreadsCount, + SUM(CpuThreads * (1.0 - CpuIdle)) AS TotalLoad + FROM `.sys/nodes`; + )"; + + RunDataQuery(sql); + } + + void OnQueryResult() override { + if (ResultSets.size() != 1) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + NYdb::TResultSetParser result(ResultSets[0]); + if (!result.TryNextRow()) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + ThreadsCount = result.ColumnParser("ThreadsCount").GetOptionalUint64().GetOrElse(0); + TotalLoad = result.ColumnParser("TotalLoad").GetOptionalDouble().GetOrElse(0.0); + + if (!ThreadsCount) { + Finish(Ydb::StatusIds::NOT_FOUND, "Cpu info not found"); + return; + } + + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status == Ydb::StatusIds::SUCCESS) { + Send(Owner, new TEvPrivate::TEvCpuLoadResponse(Ydb::StatusIds::SUCCESS, TotalLoad / ThreadsCount, ThreadsCount, std::move(issues))); + } else { + Send(Owner, new TEvPrivate::TEvCpuLoadResponse(status, 0.0, 0, std::move(issues))); + } + } + +private: + double TotalLoad = 0.0; + ui64 ThreadsCount = 0; +}; + +} // anonymous namespace + +IActor* CreateCpuLoadFetcherActor(const TActorId& replyActorId) { + return new TQueryRetryActor(replyActorId); +} + +} // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 76b72ceee20b..f0f11628a068 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -22,6 +22,74 @@ template class TPoolHandlerActorBase : public TActor { using TBase = TActor; + struct TCommonCounters { + const NMonitoring::TDynamicCounterPtr CountersRoot; + const NMonitoring::TDynamicCounterPtr CountersSubgroup; + + // Workload service counters + NMonitoring::TDynamicCounters::TCounterPtr ActivePoolHandlers; + + // Pool counters + NMonitoring::TDynamicCounters::TCounterPtr LocalInFly; + NMonitoring::TDynamicCounters::TCounterPtr LocalDelayedRequests; + NMonitoring::TDynamicCounters::TCounterPtr ContinueOk; + NMonitoring::TDynamicCounters::TCounterPtr ContinueOverloaded; + NMonitoring::TDynamicCounters::TCounterPtr ContinueError; + NMonitoring::TDynamicCounters::TCounterPtr CleanupOk; + NMonitoring::TDynamicCounters::TCounterPtr CleanupError; + NMonitoring::TDynamicCounters::TCounterPtr Cancelled; + NMonitoring::THistogramPtr DelayedTimeMs; + NMonitoring::THistogramPtr RequestsLatencyMs; + + // Config counters + NMonitoring::TDynamicCounters::TCounterPtr InFlightLimit; + NMonitoring::TDynamicCounters::TCounterPtr QueueSizeLimit; + NMonitoring::TDynamicCounters::TCounterPtr LoadCpuThreshold; + + TCommonCounters(NMonitoring::TDynamicCounterPtr counters, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) + : CountersRoot(counters) + , CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId))) + { + Register(); + UpdateConfigCounters(poolConfig); + } + + void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) { + InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0)); + QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0)); + LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0)); + } + + void OnCleanup() { + ActivePoolHandlers->Dec(); + + InFlightLimit->Set(0); + QueueSizeLimit->Set(0); + LoadCpuThreshold->Set(0); + } + + private: + void Register() { + ActivePoolHandlers = CountersRoot->GetCounter("ActivePoolHandlers", false); + ActivePoolHandlers->Inc(); + + LocalInFly = CountersSubgroup->GetCounter("LocalInFly", false); + LocalDelayedRequests = CountersSubgroup->GetCounter("LocalDelayedRequests", false); + ContinueOk = CountersSubgroup->GetCounter("ContinueOk", true); + ContinueOverloaded = CountersSubgroup->GetCounter("ContinueOverloaded", true); + ContinueError = CountersSubgroup->GetCounter("ContinueError", true); + CleanupOk = CountersSubgroup->GetCounter("CleanupOk", true); + CleanupError = CountersSubgroup->GetCounter("CleanupError", true); + Cancelled = CountersSubgroup->GetCounter("Cancelled", true); + DelayedTimeMs = CountersSubgroup->GetHistogram("DelayedTimeMs", NMonitoring::ExponentialHistogram(20, 2, 4)); + RequestsLatencyMs = CountersSubgroup->GetHistogram("RequestsLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 4)); + + InFlightLimit = CountersSubgroup->GetCounter("InFlightLimit", false); + QueueSizeLimit = CountersSubgroup->GetCounter("QueueSizeLimit", false); + LoadCpuThreshold = CountersSubgroup->GetCounter("LoadCpuThreshold", false); + } + }; + protected: struct TRequest { enum class EState { @@ -42,22 +110,21 @@ class TPoolHandlerActorBase : public TActor { EState State = EState::Pending; bool Started = false; // after TEvContinueRequest success bool CleanupRequired = false; + bool UsedCpuQuota = false; + TDuration Duration; + TDuration CpuConsumed; }; public: TPoolHandlerActorBase(void (TDerived::* requestFunc)(TAutoPtr& ev), const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : TBase(requestFunc) - , CountersRoot(counters) - , CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId))) + , Counters(counters, database, poolId, poolConfig) , Database(database) , PoolId(poolId) , QueueSizeLimit(GetMaxQueueSize(poolConfig)) , InFlightLimit(GetMaxInFlight(poolConfig)) , PoolConfig(poolConfig) - , CancelAfter(poolConfig.QueryCancelAfter) - { - RegisterCounters(); - } + {} STRICT_STFUNC(StateFuncBase, // Workload service events @@ -83,7 +150,7 @@ class TPoolHandlerActorBase : public TActor { this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); } - ActivePoolHandlers->Dec(); + Counters.OnCleanup(); TBase::PassAway(); } @@ -118,12 +185,12 @@ class TPoolHandlerActorBase : public TActor { } LOG_D("Received new request, worker id: " << workerActorId << ", session id: " << sessionId); - if (CancelAfter) { - this->Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(sessionId)); + if (auto cancelAfter = PoolConfig.QueryCancelAfter) { + this->Schedule(cancelAfter, new TEvPrivate::TEvCancelRequest(sessionId)); } TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second; - LocalDelayedRequests->Inc(); + Counters.LocalDelayedRequests->Inc(); UpdatePoolConfig(ev->Get()->PoolConfig); UpdateSchemeboardSubscription(ev->Get()->PathId); @@ -146,8 +213,10 @@ class TPoolHandlerActorBase : public TActor { return; } request->State = TRequest::EState::Finishing; + request->Duration = ev->Get()->Duration; + request->CpuConsumed = ev->Get()->CpuConsumed; - LOG_D("Received cleanup request, worker id: " << workerActorId << ", session id: " << sessionId); + LOG_D("Received cleanup request, worker id: " << workerActorId << ", session id: " << sessionId << ", duration: " << request->Duration << ", cpu consumed: " << request->CpuConsumed); OnCleanupRequest(request); } @@ -198,25 +267,25 @@ class TPoolHandlerActorBase : public TActor { if (status == Ydb::StatusIds::SUCCESS) { LocalInFlight++; request->Started = true; - LocalInFly->Inc(); - ContinueOk->Inc(); - DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.LocalInFly->Inc(); + Counters.ContinueOk->Inc(); + Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); LOG_D("Reply continue success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } else { if (status == Ydb::StatusIds::OVERLOADED) { - ContinueOverloaded->Inc(); + Counters.ContinueOverloaded->Inc(); LOG_I("Reply overloaded to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } else if (status == Ydb::StatusIds::CANCELLED) { - Cancelled->Inc(); + Counters.Cancelled->Inc(); LOG_I("Reply cancelled to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } else { - ContinueError->Inc(); + Counters.ContinueError->Inc(); LOG_W("Reply continue error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } - RemoveRequest(request->SessionId); + RemoveRequest(request); } - LocalDelayedRequests->Dec(); + Counters.LocalDelayedRequests->Dec(); } void FinalReply(TRequest* request, Ydb::StatusIds::StatusCode status, const TString& message) { @@ -235,9 +304,9 @@ class TPoolHandlerActorBase : public TActor { if (request->Started) { LocalInFlight--; - LocalInFly->Dec(); + Counters.LocalInFly->Dec(); } else { - LocalDelayedRequests->Dec(); + Counters.LocalDelayedRequests->Dec(); } if (request->State == TRequest::EState::Canceling) { @@ -246,7 +315,7 @@ class TPoolHandlerActorBase : public TActor { ReplyCleanup(request, status, issues); } - RemoveRequest(request->SessionId); + RemoveRequest(request); } protected: @@ -273,9 +342,13 @@ class TPoolHandlerActorBase : public TActor { return nullptr; } - void RemoveRequest(const TString& sessionId) { - LocalSessions.erase(sessionId); - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvFinishRequestInPool(Database, PoolId)); + void RemoveRequest(TRequest* request) { + auto event = std::make_unique( + Database, PoolId, request->Duration, request->CpuConsumed, request->UsedCpuQuota + ); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), event.release()); + + LocalSessions.erase(request->SessionId); if (StopHandler && LocalSessions.empty()) { LOG_I("All requests finished, stop handler"); PassAway(); @@ -291,10 +364,17 @@ class TPoolHandlerActorBase : public TActor { } TMaybe GetWaitDeadline(TInstant startTime) const { - if (!CancelAfter) { + if (auto cancelAfter = PoolConfig.QueryCancelAfter) { + return startTime + cancelAfter; + } + return Nothing(); + } + + TMaybe GetLoadCpuThreshold() const { + if (PoolConfig.DatabaseLoadCpuThreshold < 0.0) { return Nothing(); } - return startTime + CancelAfter; + return PoolConfig.DatabaseLoadCpuThreshold; } TString LogPrefix() const { @@ -306,11 +386,11 @@ class TPoolHandlerActorBase : public TActor { this->Send(request->WorkerActorId, new TEvCleanupResponse(status, issues)); if (status == Ydb::StatusIds::SUCCESS) { - CleanupOk->Inc(); - RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.CleanupOk->Inc(); + Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } else { - CleanupError->Inc(); + Counters.CleanupError->Inc(); LOG_W("Reply cleanup error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } } @@ -320,8 +400,8 @@ class TPoolHandlerActorBase : public TActor { ev->Record.MutableRequest()->SetSessionId(request->SessionId); this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release()); - Cancelled->Inc(); - RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.Cancelled->Inc(); + Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } @@ -349,13 +429,13 @@ class TPoolHandlerActorBase : public TActor { LOG_D("Pool config has changed, queue size: " << poolConfig.QueueSize << ", in flight limit: " << poolConfig.ConcurrentQueryLimit); PoolConfig = poolConfig; - CancelAfter = poolConfig.QueryCancelAfter; QueueSizeLimit = GetMaxQueueSize(poolConfig); InFlightLimit = GetMaxInFlight(poolConfig); + Counters.UpdateConfigCounters(poolConfig); RefreshState(true); if (ShouldResign()) { - const TActorId& newHandler = this->RegisterWithSameMailbox(CreatePoolHandlerActor(Database, PoolId, poolConfig, CountersRoot)); + const TActorId& newHandler = this->RegisterWithSameMailbox(CreatePoolHandlerActor(Database, PoolId, poolConfig, Counters.CountersRoot)); this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvResignPoolHandler(Database, PoolId, newHandler)); } } @@ -370,25 +450,8 @@ class TPoolHandlerActorBase : public TActor { return concurrentQueryLimit == -1 ? std::numeric_limits::max() : static_cast(concurrentQueryLimit); } - void RegisterCounters() { - ActivePoolHandlers = CountersRoot->GetCounter("ActivePoolHandlers", false); - ActivePoolHandlers->Inc(); - - LocalInFly = CountersSubgroup->GetCounter("LocalInFly", false); - LocalDelayedRequests = CountersSubgroup->GetCounter("LocalDelayedRequests", false); - ContinueOk = CountersSubgroup->GetCounter("ContinueOk", true); - ContinueOverloaded = CountersSubgroup->GetCounter("ContinueOverloaded", true); - ContinueError = CountersSubgroup->GetCounter("ContinueError", true); - CleanupOk = CountersSubgroup->GetCounter("CleanupOk", true); - CleanupError = CountersSubgroup->GetCounter("CleanupError", true); - Cancelled = CountersSubgroup->GetCounter("Cancelled", true); - DelayedTimeMs = CountersSubgroup->GetHistogram("DelayedTimeMs", NMonitoring::ExponentialHistogram(20, 2, 4)); - RequestsLatencyMs = CountersSubgroup->GetHistogram("RequestsLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 4)); - } - protected: - NMonitoring::TDynamicCounterPtr CountersRoot; - NMonitoring::TDynamicCounterPtr CountersSubgroup; + TCommonCounters Counters; // Configuration const TString Database; @@ -398,7 +461,6 @@ class TPoolHandlerActorBase : public TActor { private: NResourcePool::TPoolSettings PoolConfig; - TDuration CancelAfter; // Scheme board settings std::unique_ptr WatchPathId; @@ -408,19 +470,6 @@ class TPoolHandlerActorBase : public TActor { ui64 LocalInFlight = 0; std::unordered_map LocalSessions; bool StopHandler = false; // Stop than all requests finished - - // Counters - NMonitoring::TDynamicCounters::TCounterPtr ActivePoolHandlers; - NMonitoring::TDynamicCounters::TCounterPtr LocalInFly; - NMonitoring::TDynamicCounters::TCounterPtr LocalDelayedRequests; - NMonitoring::TDynamicCounters::TCounterPtr ContinueOk; - NMonitoring::TDynamicCounters::TCounterPtr ContinueOverloaded; - NMonitoring::TDynamicCounters::TCounterPtr ContinueError; - NMonitoring::TDynamicCounters::TCounterPtr CleanupOk; - NMonitoring::TDynamicCounters::TCounterPtr CleanupError; - NMonitoring::TDynamicCounters::TCounterPtr Cancelled; - NMonitoring::THistogramPtr DelayedTimeMs; - NMonitoring::THistogramPtr RequestsLatencyMs; }; @@ -436,7 +485,7 @@ class TUnlimitedPoolHandlerActor : public TPoolHandlerActorBase::max(); + return 0 < InFlightLimit && (InFlightLimit < std::numeric_limits::max() || GetLoadCpuThreshold()); } void OnScheduleRequest(TRequest* request) override { @@ -452,20 +501,59 @@ class TUnlimitedPoolHandlerActor : public TPoolHandlerActorBase { using TBase = TPoolHandlerActorBase; + struct TCounters { + // Fifo pool counters + NMonitoring::TDynamicCounters::TCounterPtr PendingRequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr FinishingRequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr GlobalInFly; + NMonitoring::TDynamicCounters::TCounterPtr GlobalDelayedRequests; + NMonitoring::THistogramPtr PoolStateUpdatesBacklogMs; + + TCounters(NMonitoring::TDynamicCounterPtr countersSubgroup) { + Register(countersSubgroup); + } + + void UpdateGlobalState(const TPoolStateDescription& description) { + GlobalInFly->Set(description.RunningRequests); + GlobalDelayedRequests->Set(description.DelayedRequests); + } + + void OnCleanup() { + GlobalInFly->Set(0); + GlobalDelayedRequests->Set(0); + } + + private: + void Register(NMonitoring::TDynamicCounterPtr countersSubgroup) { + PendingRequestsCount = countersSubgroup->GetCounter("PendingRequestsCount", false); + FinishingRequestsCount = countersSubgroup->GetCounter("FinishingRequestsCount", false); + GlobalInFly = countersSubgroup->GetCounter("GlobalInFly", false); + GlobalDelayedRequests = countersSubgroup->GetCounter("GlobalDelayedRequests", false); + PoolStateUpdatesBacklogMs = countersSubgroup->GetHistogram("PoolStateUpdatesBacklogMs", NMonitoring::LinearHistogram(20, 0, 3 * LEASE_DURATION.MillisecondsFloat() / 40)); + } + }; + + enum class EStartRequestCase { + Pending, + Delayed + }; + static constexpr ui64 MAX_PENDING_REQUESTS = 1000; public: TFifoPoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : TBase(&TFifoPoolHandlerActor::StateFunc, database, poolId, poolConfig, counters) + , FifoCounters(Counters.CountersSubgroup) { Y_ENSURE(!ShouldResign()); - RegisterCounters(); } STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { sFunc(TEvents::TEvWakeup, HandleRefreshState); sFunc(TEvPrivate::TEvRefreshPoolState, HandleExternalRefreshState); + hFunc(TEvPrivate::TEvCpuQuotaResponse, Handle); + hFunc(TEvPrivate::TEvNodesInfoResponse, Handle); hFunc(TEvPrivate::TEvTablesCreationFinished, Handle); hFunc(TEvPrivate::TEvRefreshPoolStateResponse, Handle); @@ -478,25 +566,24 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(0); - GlobalDelayedRequests->Set(0); + FifoCounters.OnCleanup(); TBase::PassAway(); } protected: bool ShouldResign() const override { - return InFlightLimit == 0 || InFlightLimit == std::numeric_limits::max(); + return InFlightLimit == 0 || (InFlightLimit == std::numeric_limits::max() && !GetLoadCpuThreshold()); } void OnScheduleRequest(TRequest* request) override { - if (PendingRequests.size() >= MAX_PENDING_REQUESTS || GetLocalSessionsCount() - GetLocalInFlight() > QueueSizeLimit + 1) { + if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) { ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); return; } PendingRequests.emplace_back(request->SessionId); - PendingRequestsCount->Inc(); + FifoCounters.PendingRequestsCount->Inc(); if (!PreparingFinished) { this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPrepareTablesRequest(Database, PoolId)); @@ -515,6 +602,11 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase LEASE_DURATION) { + WaitingNodesInfo = true; + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvNodesInfoRequest()); + } + RefreshRequired |= refreshRequired; if (!PreparingFinished) { return; @@ -532,7 +624,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateRefreshPoolStateActor(this->SelfId(), Database, PoolId, LEASE_DURATION, CountersSubgroup)); + this->Register(CreateRefreshPoolStateActor(this->SelfId(), Database, PoolId, LEASE_DURATION, Counters.CountersSubgroup)); } } @@ -552,6 +644,14 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseGet()->NodeCount; + + LOG_T("Updated node info, noode count: " << NodeCount); + } + void Handle(TEvPrivate::TEvTablesCreationFinished::TPtr& ev) { if (ev->Get()->Success) { PreparingFinished = true; @@ -565,7 +665,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(0); + FifoCounters.PendingRequestsCount->Set(0); } void Handle(TEvPrivate::TEvRefreshPoolStateResponse::TPtr& ev) { @@ -579,7 +679,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseCollect((TInstant::Now() - LastRefreshTime).MilliSeconds()); + FifoCounters.PoolStateUpdatesBacklogMs->Collect((TInstant::Now() - LastRefreshTime).MilliSeconds()); } LastRefreshTime = TInstant::Now(); @@ -587,20 +687,19 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(GlobalState.RunningRequests); - GlobalDelayedRequests->Set(GlobalState.DelayedRequests); + FifoCounters.UpdateGlobalState(GlobalState); LOG_T("succefully refreshed pool state, in flight: " << GlobalState.RunningRequests << ", delayed: " << GlobalState.DelayedRequests); RemoveFinishedRequests(); size_t delayedRequestsCount = DelayedRequests.size(); - DoStartPendingRequest(); + DoStartPendingRequest(GetLoadCpuThreshold()); - if (GlobalState.DelayedRequests + PendingRequests.size() > QueueSizeLimit) { - RemoveBackRequests(PendingRequests, std::min(GlobalState.DelayedRequests + PendingRequests.size() - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) { + if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) { + RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) { ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); }); - PendingRequestsCount->Set(PendingRequests.size()); + FifoCounters.PendingRequestsCount->Set(PendingRequests.size()); } if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) { @@ -611,7 +710,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseInc(); + FifoCounters.GlobalDelayedRequests->Inc(); LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId); - DoStartDelayedRequest(); + DoStartDelayedRequest(GetLoadCpuThreshold()); RefreshState(); }; + void Handle(TEvPrivate::TEvCpuQuotaResponse::TPtr& ev) { + RunningOperation = false; + + if (!ev->Get()->QuotaAccepted) { + LOG_D("Skipped request start due to load cpu threshold"); + if (static_cast(ev->Cookie) == EStartRequestCase::Pending) { + ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this](TRequest* request) { + AddFinishedRequest(request->SessionId); + ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); + }); + } + RefreshState(); + return; + } + + RemoveFinishedRequests(); + switch (static_cast(ev->Cookie)) { + case EStartRequestCase::Pending: + if (!RunningOperation && !DelayedRequests.empty()) { + RunningOperation = true; + const TString& sessionId = DelayedRequests.front(); + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); + GetRequest(sessionId)->CleanupRequired = true; + } + break; + + case EStartRequestCase::Delayed: + DoStartDelayedRequest(Nothing()); + break; + } + + RefreshState(); + } + void Handle(TEvPrivate::TEvStartRequestResponse::TPtr& ev) { RunningOperation = false; @@ -668,14 +801,15 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSessionId == sessionId) { + request->UsedCpuQuota = !!GetLoadCpuThreshold(); requestFound = true; GlobalState.RunningRequests++; - GlobalInFly->Inc(); + FifoCounters.GlobalInFly->Inc(); ReplyContinue(request); } else { // Request was dropped due to lease expiration PendingRequests.emplace_front(request->SessionId); - PendingRequestsCount->Inc(); + FifoCounters.PendingRequestsCount->Inc(); } }); DelayedRequests.pop_front(); @@ -706,22 +840,28 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase loadCpuThreshold) { RemoveFinishedRequests(); if (RunningOperation) { return; } - if (!PendingRequests.empty() && QueueSizeLimit == 0 && GlobalState.RunningRequests < InFlightLimit) { + bool canStartRequest = QueueSizeLimit == 0 && GlobalState.RunningRequests < InFlightLimit; + canStartRequest |= !GetLoadCpuThreshold() && DelayedRequests.size() + GlobalState.DelayedRequests == 0 && NodeCount && GlobalState.RunningRequests + NodeCount < InFlightLimit; + if (!PendingRequests.empty() && canStartRequest) { RunningOperation = true; const TString& sessionId = PopPendingRequest(); - this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup)); DelayedRequests.emplace_front(sessionId); - GetRequest(sessionId)->CleanupRequired = true; + if (loadCpuThreshold) { + RequestCpuQuota(*loadCpuThreshold, EStartRequestCase::Pending); + } else { + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); + GetRequest(sessionId)->CleanupRequired = true; + } } } - void DoStartDelayedRequest() { + void DoStartDelayedRequest(TMaybe loadCpuThreshold) { RemoveFinishedRequests(); if (RunningOperation) { return; @@ -729,7 +869,11 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, CountersSubgroup)); + if (loadCpuThreshold) { + RequestCpuQuota(*loadCpuThreshold, EStartRequestCase::Delayed); + } else { + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, Counters.CountersSubgroup)); + } } } @@ -743,7 +887,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateDelayRequestActor(this->SelfId(), Database, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, CountersSubgroup)); + this->Register(CreateDelayRequestActor(this->SelfId(), Database, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters.CountersSubgroup)); DelayedRequests.emplace_back(sessionId); request->CleanupRequired = true; } @@ -756,9 +900,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateCleanupRequestsActor(this->SelfId(), Database, PoolId, FinishedRequests, CountersSubgroup)); + this->Register(CreateCleanupRequestsActor(this->SelfId(), Database, PoolId, FinishedRequests, Counters.CountersSubgroup)); FinishedRequests.clear(); - FinishingRequestsCount->Set(0); + FifoCounters.FinishingRequestsCount->Set(0); } } @@ -770,6 +914,10 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSchedule(LEASE_DURATION / 2, new TEvents::TEvWakeup()); } + void RequestCpuQuota(double loadCpuThreshold, EStartRequestCase requestCase) const { + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvCpuQuotaRequest(loadCpuThreshold / 100.0), 0, static_cast(requestCase)); + } + private: void RemoveFinishedRequests() { if (RunningOperation) { @@ -778,7 +926,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(PendingRequests.size()); + FifoCounters.PendingRequestsCount->Set(PendingRequests.size()); } void RemoveFinishedRequests(std::deque& requests) { @@ -819,24 +967,18 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseDec(); + FifoCounters.PendingRequestsCount->Dec(); return sessionId; } void AddFinishedRequest(const TString& sessionId) { FinishedRequests.emplace_back(sessionId); - FinishingRequestsCount->Inc(); - } - - void RegisterCounters() { - PendingRequestsCount = CountersSubgroup->GetCounter("PendingRequestsCount", false); - FinishingRequestsCount = CountersSubgroup->GetCounter("FinishingRequestsCount", false); - GlobalInFly = CountersSubgroup->GetCounter("GlobalInFly", false); - GlobalDelayedRequests = CountersSubgroup->GetCounter("GlobalDelayedRequests", false); - PoolStateUpdatesBacklogMs = CountersSubgroup->GetHistogram("PoolStateUpdatesBacklogMs", NMonitoring::LinearHistogram(20, 0, 3 * LEASE_DURATION.MillisecondsFloat() / 40)); + FifoCounters.FinishingRequestsCount->Inc(); } private: + TCounters FifoCounters; + bool PreparingFinished = false; bool RefreshRequired = false; bool RunningOperation = false; @@ -849,17 +991,15 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase +#include #include #include #include +#include #include #include @@ -20,8 +22,9 @@ using namespace NActors; class TPoolResolverActor : public TActorBootstrapped { public: - TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) + TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) : Event(std::move(event)) + , EnableOnServerless(enableOnServerless) { if (!Event->Get()->PoolId) { Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID; @@ -36,7 +39,7 @@ class TPoolResolverActor : public TActorBootstrapped { void StartPoolFetchRequest() const { LOG_D("Start pool fetching"); - Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken)); + Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless)); } void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) { @@ -63,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped { for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) { diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid); } - diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers); + + auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema; + for (const auto& userSID : AppData()->DefaultUserSIDs) { + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID); + } + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers); + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT); auto token = MakeIntrusive(BUILTIN_ACL_METADATA, TVector{}); Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); @@ -107,6 +116,7 @@ class TPoolResolverActor : public TActorBootstrapped { private: TEvPlaceRequestIntoPool::TPtr Event; + const bool EnableOnServerless; bool CanCreatePool = false; bool DefaultPoolCreated = false; }; @@ -114,11 +124,12 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) + TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) : ReplyActorId(replyActorId) , Database(database) , PoolId(poolId) , UserToken(userToken) + , EnableOnServerless(enableOnServerless) {} void DoBootstrap() { @@ -133,6 +144,11 @@ class TPoolFetcherActor : public TSchemeActorBase { } const auto& result = results[0]; + if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) { + Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it"); + return; + } + switch (result.Status) { case EStatus::Unknown: case EStatus::PathNotTable: @@ -222,6 +238,7 @@ class TPoolFetcherActor : public TSchemeActorBase { const TString Database; const TString PoolId; const TIntrusiveConstPtr UserToken; + const bool EnableOnServerless; NResourcePool::TPoolSettings PoolConfig; NKikimrProto::TPathID PathId; @@ -246,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase { } void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) { - const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus(); - switch (ev->Get()->Status()) { + const auto& response = ev->Get()->Record; + const auto ssStatus = response.GetSchemeShardStatus(); + const auto status = ev->Get()->Status(); + switch (status) { case NTxProxy::TResultStatus::ExecComplete: case NTxProxy::TResultStatus::ExecAlready: if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) { Reply(Ydb::StatusIds::SUCCESS); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecError: - if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) { - ScheduleRetry(ssStatus, "Retry execution error", true); + if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) { + SubscribeOnTransactionOrRetry(status, response); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecInProgress: - ScheduleRetry(ssStatus, "Retry execution in progress error", true); + SubscribeOnTransactionOrRetry(status, response); return; case NTxProxy::TResultStatus::ProxyShardNotAvailable: - ScheduleRetry(ssStatus, "Retry shard unavailable error"); + ScheduleRetry(response, "Retry shard unavailable error"); return; default: - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus))); return; } } + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::OK) { + LOG_T("Tablet to pipe successfully connected"); + return; + } + + ClosePipeClient(); + ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { + const TActorId clientId = ev->Get()->ClientId; + if (!ClosedSchemePipeActors.contains(clientId)) { + ClosePipeClient(); + ScheduleRetry("Tablet to pipe destroyed"); + } + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { + ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking"); + } + STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle) + hFunc(TEvTabletPipe::TEvClientConnected, Handle) + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle) + hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle) + IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered) + default: StateFuncBase(ev); } @@ -292,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase { schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"})); schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool); schemeTx.SetInternal(true); - schemeTx.SetAllowAccessToPrivatePaths(true); BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool()); BuildModifyAclRequest(*schemeTx.MutableModifyACL()); if (UserToken) { - event->Record.SetUserToken(UserToken->GetSerializedToken()); + event->Record.SetUserToken(UserToken->SerializeAsString()); } Send(MakeTxProxyID(), std::move(event)); @@ -313,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase { } private: - void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) { - auto ssStatus = static_cast(status); - if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) { - Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus); + void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) { + const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId(); + if (txId == 0) { + ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true); + return; + } + + SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId())); + + auto request = MakeHolder(); + request->Record.SetTxId(txId); + NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request)); + LOG_D("Subscribe on create pool tx: " << txId); + } + + void ClosePipeClient() { + if (SchemePipeActorId) { + ClosedSchemePipeActors.insert(SchemePipeActorId); + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + SchemePipeActorId = {}; + } + } + + void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) { + ClosePipeClient(); + + auto ssStatus = static_cast(response.GetSchemeShardStatus()); + if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus)); + } + } + + void ScheduleRetry(const TString& message, bool longDelay = false) { + ClosePipeClient(); + if (!TBase::ScheduleRetry(message, longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message); } } @@ -349,11 +426,19 @@ class TPoolCreatorActor : public TSchemeActorBase { LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString()); } + ClosePipeClient(); + Issues.AddIssues(std::move(issues)); Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues))); PassAway(); } + static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) { + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetIssues(), issues); + return GroupIssues(issues, message); + } + private: const TActorId ReplyActorId; const TString Database; @@ -361,16 +446,19 @@ class TPoolCreatorActor : public TSchemeActorBase { const TIntrusiveConstPtr UserToken; const NACLibProto::TDiffACL DiffAcl; NResourcePool::TPoolSettings PoolConfig; + + std::unordered_set ClosedSchemePipeActors; + TActorId SchemePipeActorId; }; } // anonymous namespace -IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) { - return new TPoolResolverActor(std::move(event), defaultPoolExists); +IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) { + return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless); } -IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) { - return new TPoolFetcherActor(replyActorId, database, poolId, userToken); +IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) { + return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless); } IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) { diff --git a/ydb/core/kqp/workload_service/actors/ya.make b/ydb/core/kqp/workload_service/actors/ya.make index 01d4e7c9d663..774488a83d6a 100644 --- a/ydb/core/kqp/workload_service/actors/ya.make +++ b/ydb/core/kqp/workload_service/actors/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + cpu_load_actors.cpp pool_handlers_acors.cpp scheme_actors.cpp ) diff --git a/ydb/core/kqp/workload_service/common/cpu_quota_manager.cpp b/ydb/core/kqp/workload_service/common/cpu_quota_manager.cpp new file mode 100644 index 000000000000..dd3a66183423 --- /dev/null +++ b/ydb/core/kqp/workload_service/common/cpu_quota_manager.cpp @@ -0,0 +1,156 @@ +#include "cpu_quota_manager.h" + +#include + + +namespace NKikimr::NKqp::NWorkload { + +//// TCpuQuotaManager::TCounters + +TCpuQuotaManager::TCounters::TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent) + : SubComponent(subComponent) +{ + Register(); +} + +void TCpuQuotaManager::TCounters::Register() { + RegisterCommonMetrics(CpuLoadRequest); + InstantLoadPercentage = SubComponent->GetCounter("InstantLoadPercentage", false); + AverageLoadPercentage = SubComponent->GetCounter("AverageLoadPercentage", false); + QuotedLoadPercentage = SubComponent->GetCounter("QuotedLoadPercentage", false); +} + +void TCpuQuotaManager::TCounters::RegisterCommonMetrics(TCommonMetrics& metrics) const { + metrics.Ok = SubComponent->GetCounter("Ok", true); + metrics.Error = SubComponent->GetCounter("Error", true); +} + +//// TCpuQuotaManager::TCpuQuotaResponse + +TCpuQuotaManager::TCpuQuotaResponse::TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status, NYql::TIssues issues) + : CurrentLoad(currentLoad) + , Status(status) + , Issues(std::move(issues)) +{} + +//// TCpuQuotaManager + +TCpuQuotaManager::TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui64 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent) + : Counters(subComponent) + , MonitoringRequestDelay(monitoringRequestDelay) + , AverageLoadInterval(averageLoadInterval) + , IdleTimeout(idleTimeout) + , DefaultQueryLoad(defaultQueryLoad) + , Strict(strict) + , CpuNumber(cpuNumber) +{} + +double TCpuQuotaManager::GetInstantLoad() const { + return InstantLoad; +} + +double TCpuQuotaManager::GetAverageLoad() const { + return AverageLoad; +} + +TDuration TCpuQuotaManager::GetMonitoringRequestDelay() const { + return GetMonitoringRequestTime() - TInstant::Now(); +} + +TInstant TCpuQuotaManager::GetMonitoringRequestTime() const { + TDuration delay = MonitoringRequestDelay; + if (IdleTimeout && TInstant::Now() - LastRequestCpuQuota > IdleTimeout) { + delay = AverageLoadInterval / 2; + } + + return LastUpdateCpuLoad ? LastUpdateCpuLoad + delay : TInstant::Now(); +} + +void TCpuQuotaManager::UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success) { + auto now = TInstant::Now(); + LastUpdateCpuLoad = now; + + if (!success) { + Counters.CpuLoadRequest.Error->Inc(); + CheckLoadIsOutdated(); + return; + } + + auto delta = now - LastCpuLoad; + LastCpuLoad = now; + + if (cpuNumber) { + CpuNumber = cpuNumber; + } + + InstantLoad = instantLoad; + // exponential moving average + if (!Ready || delta >= AverageLoadInterval) { + AverageLoad = InstantLoad; + QuotedLoad = InstantLoad; + } else { + auto ratio = static_cast(delta.GetValue()) / AverageLoadInterval.GetValue(); + AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad; + QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad; + } + Ready = true; + Counters.CpuLoadRequest.Ok->Inc(); + Counters.InstantLoadPercentage->Set(static_cast(InstantLoad * 100)); + Counters.AverageLoadPercentage->Set(static_cast(AverageLoad * 100)); + Counters.QuotedLoadPercentage->Set(static_cast(QuotedLoad * 100)); +} + +bool TCpuQuotaManager::CheckLoadIsOutdated() { + if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) { + Ready = false; + QuotedLoad = 0.0; + Counters.QuotedLoadPercentage->Set(0); + } + return Ready; +} + +bool TCpuQuotaManager::HasCpuQuota(double maxClusterLoad) { + LastRequestCpuQuota = TInstant::Now(); + return maxClusterLoad == 0.0 || ((Ready || !Strict) && QuotedLoad < maxClusterLoad); +} + +TCpuQuotaManager::TCpuQuotaResponse TCpuQuotaManager::RequestCpuQuota(double quota, double maxClusterLoad) { + if (quota < 0.0 || quota > 1.0) { + return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder() << "Incorrect quota value (exceeds 1.0 or less than 0.0) " << quota)}); + } + quota = quota ? quota : DefaultQueryLoad; + + CheckLoadIsOutdated(); + if (!HasCpuQuota(maxClusterLoad)) { + return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder() + << "Cluster is overloaded, current quoted load " << static_cast(QuotedLoad * 100) + << "%, average load " << static_cast(AverageLoad * 100) << "%" + )}); + } + + QuotedLoad += quota; + Counters.QuotedLoadPercentage->Set(static_cast(QuotedLoad * 100)); + return TCpuQuotaResponse(QuotedLoad * 100); +} + +void TCpuQuotaManager::AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed) { + if (!CpuNumber) { + return; + } + + if (duration && duration < AverageLoadInterval / 2 && quota <= 1.0) { + quota = quota ? quota : DefaultQueryLoad; + auto load = (cpuSecondsConsumed * 1000.0 / duration.MilliSeconds()) / CpuNumber; + if (quota > load) { + auto adjustment = (quota - load) / 2; + if (QuotedLoad > adjustment) { + QuotedLoad -= adjustment; + } else { + QuotedLoad = 0.0; + } + Counters.QuotedLoadPercentage->Set(static_cast(QuotedLoad * 100)); + } + } +} + +} // namespace NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/cpu_quota_manager.h b/ydb/core/kqp/workload_service/common/cpu_quota_manager.h new file mode 100644 index 000000000000..f0587e1d4418 --- /dev/null +++ b/ydb/core/kqp/workload_service/common/cpu_quota_manager.h @@ -0,0 +1,76 @@ +#pragma once + +#include + +#include + +#include + + +namespace NKikimr::NKqp::NWorkload { + +class TCpuQuotaManager { + struct TCounters { + const ::NMonitoring::TDynamicCounterPtr SubComponent; + struct TCommonMetrics { + ::NMonitoring::TDynamicCounters::TCounterPtr Ok; + ::NMonitoring::TDynamicCounters::TCounterPtr Error; + }; + + TCommonMetrics CpuLoadRequest; + ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage; + ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage; + + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent); + + private: + void Register(); + void RegisterCommonMetrics(TCommonMetrics& metrics) const; + }; + +public: + struct TCpuQuotaResponse { + explicit TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status = NYdb::EStatus::SUCCESS, NYql::TIssues issues = {}); + + const int32_t CurrentLoad; + const NYdb::EStatus Status; + const NYql::TIssues Issues; + }; + +public: + TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui64 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent); + + double GetInstantLoad() const; + double GetAverageLoad() const; + TDuration GetMonitoringRequestDelay() const; + TInstant GetMonitoringRequestTime() const; + + void UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success); + bool CheckLoadIsOutdated(); + + bool HasCpuQuota(double maxClusterLoad); + TCpuQuotaResponse RequestCpuQuota(double quota, double maxClusterLoad); + void AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed); + +private: + TCounters Counters; + + const TDuration MonitoringRequestDelay; + const TDuration AverageLoadInterval; + const TDuration IdleTimeout; + const double DefaultQueryLoad; + const bool Strict; + ui64 CpuNumber = 0; + + TInstant LastCpuLoad; + TInstant LastUpdateCpuLoad; + TInstant LastRequestCpuQuota; + + double InstantLoad = 0.0; + double AverageLoad = 0.0; + double QuotedLoad = 0.0; + bool Ready = false; +}; + +} // namespace NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 25018bbe6728..c32f4cd4f4d5 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -30,6 +30,12 @@ struct TEvPrivate { EvStopPoolHandler, EvCancelRequest, + EvCpuQuotaRequest, + EvCpuQuotaResponse, + EvCpuLoadResponse, + EvNodesInfoRequest, + EvNodesInfoResponse, + EvTablesCreationFinished, EvCleanupTableResponse, EvCleanupTablesFinished, @@ -110,13 +116,19 @@ struct TEvPrivate { }; struct TEvFinishRequestInPool : public NActors::TEventLocal { - TEvFinishRequestInPool(const TString& database, const TString& poolId) + TEvFinishRequestInPool(const TString& database, const TString& poolId, TDuration duration, TDuration cpuConsumed, bool adjustCpuQuota) : Database(database) , PoolId(poolId) + , Duration(duration) + , CpuConsumed(cpuConsumed) + , AdjustCpuQuota(adjustCpuQuota) {} const TString Database; const TString PoolId; + const TDuration Duration; + const TDuration CpuConsumed; + const bool AdjustCpuQuota; }; struct TEvResignPoolHandler : public NActors::TEventLocal { @@ -142,6 +154,48 @@ struct TEvPrivate { const TString SessionId; }; + // Cpu load requests + struct TEvCpuQuotaRequest : public NActors::TEventLocal { + explicit TEvCpuQuotaRequest(double maxClusterLoad) + : MaxClusterLoad(maxClusterLoad) + {} + + const double MaxClusterLoad; + }; + + struct TEvCpuQuotaResponse : public NActors::TEventLocal { + explicit TEvCpuQuotaResponse(bool quotaAccepted) + : QuotaAccepted(quotaAccepted) + {} + + const bool QuotaAccepted; + }; + + struct TEvCpuLoadResponse : public NActors::TEventLocal { + TEvCpuLoadResponse(Ydb::StatusIds::StatusCode status, double instantLoad, ui64 cpuNumber, NYql::TIssues issues) + : Status(status) + , InstantLoad(instantLoad) + , CpuNumber(cpuNumber) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const double InstantLoad; + const ui64 CpuNumber; + const NYql::TIssues Issues; + }; + + struct TEvNodesInfoRequest : public NActors::TEventLocal { + }; + + struct TEvNodesInfoResponse : public NActors::TEventLocal { + explicit TEvNodesInfoResponse(ui32 nodeCount) + : NodeCount(nodeCount) + {} + + const ui32 NodeCount; + }; + // Tables queries events struct TEvTablesCreationFinished : public NActors::TEventLocal { TEvTablesCreationFinished(bool success, NYql::TIssues issues) diff --git a/ydb/core/kqp/workload_service/common/helpers.cpp b/ydb/core/kqp/workload_service/common/helpers.cpp index 3a5427a9de4a..f1893bf8b92f 100644 --- a/ydb/core/kqp/workload_service/common/helpers.cpp +++ b/ydb/core/kqp/workload_service/common/helpers.cpp @@ -20,4 +20,8 @@ void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& descript } } +ui64 SaturationSub(ui64 x, ui64 y) { + return (x > y) ? x - y : 0; +} + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index 85aff302d68f..163b2d765ed1 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped { virtual TString LogPrefix() const = 0; protected: - bool ScheduleRetry(const TString& message, bool longDelay = false) { + bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) { if (!RetryState) { RetryState = CreateRetryState(); } if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) { - Issues.AddIssue(message); + Issues.AddIssues(issues); this->Schedule(*delay, new TEvents::TEvWakeup()); - LOG_W("Scheduled retry for error: " << message); + LOG_W("Scheduled retry for error: " << issues.ToOneLineString()); return true; } return false; } + bool ScheduleRetry(const TString& message, bool longDelay = false) { + return ScheduleRetry({NYql::TIssue(message)}, longDelay); + } + private: static TRetryPolicy::IRetryState::TPtr CreateRetryState() { return TRetryPolicy::GetFixedIntervalPolicy( @@ -99,4 +103,6 @@ NYql::TIssues GroupIssues(const NYql::TIssues& issues, const TString& message); void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& description, NResourcePool::TPoolSettings& poolConfig); +ui64 SaturationSub(ui64 x, ui64 y); + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/ya.make b/ydb/core/kqp/workload_service/common/ya.make index 44cbd65ca22e..4026b389648b 100644 --- a/ydb/core/kqp/workload_service/common/ya.make +++ b/ydb/core/kqp/workload_service/common/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + cpu_quota_manager.cpp events.cpp helpers.cpp ) @@ -14,6 +15,8 @@ PEERDIR( ydb/library/actors/core + ydb/public/sdk/cpp/client/ydb_types + library/cpp/retry ) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 94ad44dbe460..66a6aaaaf64b 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -1,4 +1,5 @@ #include "kqp_workload_service.h" +#include "kqp_workload_service_impl.h" #include #include @@ -8,12 +9,13 @@ #include #include -#include #include #include #include +#include + namespace NKikimr::NKqp { @@ -23,8 +25,6 @@ namespace { using namespace NActors; -constexpr TDuration IDLE_DURATION = TDuration::Seconds(15); - class TKqpWorkloadService : public TActorBootstrapped { enum class ETablesCreationStatus { @@ -34,45 +34,10 @@ class TKqpWorkloadService : public TActorBootstrapped { Finished, }; - struct TPoolState { - TActorId PoolHandler; - TActorContext ActorContext; - - std::queue PendingRequests = {}; - bool WaitingInitialization = false; - bool PlaceRequestRunning = false; - std::optional NewPoolHandler = std::nullopt; - - ui64 InFlightRequests = 0; - TInstant LastUpdateTime = TInstant::Now(); - - void UpdateHandler() { - if (PlaceRequestRunning || WaitingInitialization || !NewPoolHandler) { - return; - } - - ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler()); - PoolHandler = *NewPoolHandler; - NewPoolHandler = std::nullopt; - InFlightRequests = 0; - } - - void StartPlaceRequest() { - if (PlaceRequestRunning || PendingRequests.empty()) { - return; - } - - PlaceRequestRunning = true; - InFlightRequests++; - ActorContext.Send(PendingRequests.front()->Forward(PoolHandler)); - PendingRequests.pop(); - } - - void OnRequestFinished() { - Y_ENSURE(InFlightRequests); - InFlightRequests--; - LastUpdateTime = TInstant::Now(); - } + enum class EWakeUp { + IdleCheck, + StartCpuLoadRequest, + StartNodeInfoRequest }; public: @@ -90,7 +55,10 @@ class TKqpWorkloadService : public TActorBootstrapped { (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem }), IEventHandle::FlagTrackDelivery); + CpuQuotaManager = std::make_unique(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); + EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools(); + EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless(); if (EnabledResourcePools) { InitializeWorkloadService(); } @@ -117,6 +85,7 @@ class TKqpWorkloadService : public TActorBootstrapped { const auto& event = ev->Get()->Record; EnabledResourcePools = event.GetConfig().GetFeatureFlags().GetEnableResourcePools(); + EnabledResourcePoolsOnServerless = event.GetConfig().GetFeatureFlags().GetEnableResourcePoolsOnServerless(); if (EnabledResourcePools) { LOG_I("Resource pools was enanbled"); InitializeWorkloadService(); @@ -128,6 +97,13 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(ev->Sender, responseEvent.release(), IEventHandle::FlagTrackDelivery, ev->Cookie); } + void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) { + NodeCount = ev->Get()->Nodes.size(); + ScheduleNodeInfoRequest(); + + LOG_T("Updated node info, noode count: " << NodeCount); + } + void Handle(TEvents::TEvUndelivered::TPtr& ev) const { switch (ev->Get()->SourceType) { case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest: @@ -138,6 +114,11 @@ class TKqpWorkloadService : public TActorBootstrapped { LOG_E("Failed to deliver config notification response"); break; + case TEvInterconnect::EvListNodes: + LOG_W("Failed to deliver list nodes request"); + ScheduleNodeInfoRequest(); + break; + default: LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType); break; @@ -151,12 +132,9 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - // Add AllAuthenticatedUsers group SID into user token - ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken); - LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database)); - Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool)); + Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless)); } void Handle(TEvCleanupRequest::TPtr& ev) { @@ -172,24 +150,19 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(ev->Forward(poolState->PoolHandler)); } - void HandleWakeup() { - IdleChecksStarted = false; + void Handle(TEvents::TEvWakeup::TPtr& ev) { + switch (static_cast(ev->Get()->Tag)) { + case EWakeUp::IdleCheck: + RunIdleCheck(); + break; - std::vector poolsToDelete; - poolsToDelete.reserve(PoolIdToState.size()); - for (const auto& [poolKey, poolState] : PoolIdToState) { - if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) { - Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler()); - poolsToDelete.emplace_back(poolKey); - } - } - for (const auto& poolKey : poolsToDelete) { - PoolIdToState.erase(poolKey); - ActivePools->Dec(); - } + case EWakeUp::StartCpuLoadRequest: + RunCpuLoadRequest(); + break; - if (!PoolIdToState.empty()) { - StartIdleChecks(); + case EWakeUp::StartNodeInfoRequest: + RunNodeInfoRequest(); + break; } } @@ -197,19 +170,23 @@ class TKqpWorkloadService : public TActorBootstrapped { sFunc(TEvents::TEvPoison, HandlePoison); sFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleSetConfigSubscriptionResponse); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + hFunc(TEvInterconnect::TEvNodesInfo, Handle); hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvPlaceRequestIntoPool, Handle); hFunc(TEvCleanupRequest, Handle); - sFunc(TEvents::TEvWakeup, HandleWakeup); + hFunc(TEvents::TEvWakeup, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); + hFunc(TEvPrivate::TEvNodesInfoRequest, Handle); hFunc(TEvPrivate::TEvRefreshPoolState, Handle); + hFunc(TEvPrivate::TEvCpuQuotaRequest, Handle); hFunc(TEvPrivate::TEvFinishRequestInPool, Handle); hFunc(TEvPrivate::TEvPrepareTablesRequest, Handle); hFunc(TEvPrivate::TEvCleanupTablesFinished, Handle); hFunc(TEvPrivate::TEvTablesCreationFinished, Handle); + hFunc(TEvPrivate::TEvCpuLoadResponse, Handle); hFunc(TEvPrivate::TEvResignPoolHandler, Handle); ) @@ -238,7 +215,7 @@ class TKqpWorkloadService : public TActorBootstrapped { poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; ActivePools->Inc(); - StartIdleChecks(); + ScheduleIdleCheck(); } poolState->PendingRequests.emplace(std::move(ev)); @@ -257,6 +234,10 @@ class TKqpWorkloadService : public TActorBootstrapped { } } + void Handle(TEvPrivate::TEvNodesInfoRequest::TPtr& ev) const { + Send(ev->Sender, new TEvPrivate::TEvNodesInfoResponse(NodeCount)); + } + void Handle(TEvPrivate::TEvRefreshPoolState::TPtr& ev) { const auto& event = ev->Get()->Record; const TString& database = event.GetDatabase(); @@ -268,14 +249,27 @@ class TKqpWorkloadService : public TActorBootstrapped { } } + void Handle(TEvPrivate::TEvCpuQuotaRequest::TPtr& ev) { + const TActorId& poolHandler = ev->Sender; + const double maxClusterLoad = ev->Get()->MaxClusterLoad; + LOG_T("Requested cpu quota from handler " << poolHandler << ", MaxClusterLoad: " << maxClusterLoad); + + CpuQuotaManager->RequestCpuQuota(poolHandler, maxClusterLoad, ev->Cookie); + ScheduleCpuLoadRequest(); + } + void Handle(TEvPrivate::TEvFinishRequestInPool::TPtr& ev) { const TString& database = ev->Get()->Database; const TString& poolId = ev->Get()->PoolId; - LOG_T("Request finished in pool, Database: " << database << ", PoolId: " << poolId); + LOG_T("Request finished in pool, Database: " << database << ", PoolId: " << poolId << ", Duration: " << ev->Get()->Duration << ", CpuConsumed: " << ev->Get()->CpuConsumed << ", AdjustCpuQuota: " << ev->Get()->AdjustCpuQuota); if (auto poolState = GetPoolState(database, poolId)) { poolState->OnRequestFinished(); } + if (ev->Get()->AdjustCpuQuota) { + CpuQuotaManager->AdjustCpuQuota(ev->Get()->Duration, ev->Get()->CpuConsumed.SecondsFloat()); + ScheduleCpuLoadRequest(); + } } void Handle(TEvPrivate::TEvPrepareTablesRequest::TPtr& ev) { @@ -327,6 +321,19 @@ class TKqpWorkloadService : public TActorBootstrapped { OnTabelsCreated(false, issues); } + void Handle(TEvPrivate::TEvCpuLoadResponse::TPtr& ev) { + const bool success = ev->Get()->Status == Ydb::StatusIds::SUCCESS; + if (!success) { + LOG_E("Failed to fetch cpu load " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString()); + } else { + LOG_T("Succesfully fetched cpu load: " << 100.0 * ev->Get()->InstantLoad << "%, cpu number: " << ev->Get()->CpuNumber); + } + + CpuQuotaManager->CpuLoadRequestRunning = false; + CpuQuotaManager->UpdateCpuLoad(ev->Get()->InstantLoad, ev->Get()->CpuNumber, success); + ScheduleCpuLoadRequest(); + } + void Handle(TEvPrivate::TEvResignPoolHandler::TPtr& ev) { const TString& database = ev->Get()->Database; const TString& poolId = ev->Get()->PoolId; @@ -350,6 +357,7 @@ class TKqpWorkloadService : public TActorBootstrapped { LOG_I("Started workload service initialization"); Register(CreateCleanupTablesActor()); + RunNodeInfoRequest(); } void PrepareWorkloadServiceTables() { @@ -378,13 +386,71 @@ class TKqpWorkloadService : public TActorBootstrapped { PendingHandlers.clear(); } - void StartIdleChecks() { + void ScheduleIdleCheck() { if (IdleChecksStarted) { return; } IdleChecksStarted = true; - Schedule(IDLE_DURATION, new TEvents::TEvWakeup()); + Schedule(IDLE_DURATION / 2, new TEvents::TEvWakeup()); + } + + void RunIdleCheck() { + IdleChecksStarted = false; + + std::vector poolsToDelete; + poolsToDelete.reserve(PoolIdToState.size()); + for (const auto& [poolKey, poolState] : PoolIdToState) { + if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) { + CpuQuotaManager->CleanupHandler(poolState.PoolHandler); + Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler()); + poolsToDelete.emplace_back(poolKey); + } + } + for (const auto& poolKey : poolsToDelete) { + PoolIdToState.erase(poolKey); + ActivePools->Dec(); + } + + if (!PoolIdToState.empty()) { + ScheduleIdleCheck(); + } + } + + void ScheduleCpuLoadRequest() const { + auto delay = CpuQuotaManager->GetCpuLoadRequestDelay(); + if (!delay) { + return; + } + + if (*delay) { + Schedule(*delay, new TEvents::TEvWakeup(static_cast(EWakeUp::StartCpuLoadRequest))); + } else { + RunCpuLoadRequest(); + } + } + + void RunCpuLoadRequest() const { + if (CpuQuotaManager->CpuLoadRequestRunning) { + return; + } + + CpuQuotaManager->CpuLoadRequestTime = TInstant::Zero(); + if (CpuQuotaManager->CpuQuotaManager.GetMonitoringRequestDelay()) { + ScheduleCpuLoadRequest(); + return; + } + + CpuQuotaManager->CpuLoadRequestRunning = true; + Register(CreateCpuLoadFetcherActor(SelfId())); + } + + void ScheduleNodeInfoRequest() const { + Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast(EWakeUp::StartCpuLoadRequest))); + } + + void RunNodeInfoRequest() const { + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(), IEventHandle::FlagTrackDelivery); } private: @@ -406,25 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } - static TIntrusivePtr GetUserToken(TIntrusiveConstPtr userToken) { - auto token = MakeIntrusive(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector{}); - - bool hasAllAuthenticatedUsersSID = false; - const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers; - if (userToken) { - for (const auto& groupSID : userToken->GetGroupSIDs()) { - token->AddGroupSID(groupSID); - hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID; - } - } - - if (!hasAllAuthenticatedUsersSID) { - token->AddGroupSID(allAuthenticatedUsersSID); - } - - return token; - } - TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } @@ -453,6 +500,7 @@ class TKqpWorkloadService : public TActorBootstrapped { NMonitoring::TDynamicCounterPtr Counters; bool EnabledResourcePools = false; + bool EnabledResourcePoolsOnServerless = false; bool ServiceInitialized = false; bool IdleChecksStarted = false; ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup; @@ -460,6 +508,8 @@ class TKqpWorkloadService : public TActorBootstrapped { std::unordered_set DatabasesWithDefaultPool; std::unordered_map PoolIdToState; + std::unique_ptr CpuQuotaManager; + ui32 NodeCount = 0; NMonitoring::TDynamicCounters::TCounterPtr ActivePools; }; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h new file mode 100644 index 000000000000..9ee91f077720 --- /dev/null +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -0,0 +1,132 @@ +#pragma once + +#include + +#include +#include + + +namespace NKikimr::NKqp::NWorkload { + +constexpr TDuration IDLE_DURATION = TDuration::Seconds(60); + +struct TPoolState { + NActors::TActorId PoolHandler; + NActors::TActorContext ActorContext; + + std::queue PendingRequests = {}; + bool WaitingInitialization = false; + bool PlaceRequestRunning = false; + std::optional NewPoolHandler = std::nullopt; + + ui64 InFlightRequests = 0; + TInstant LastUpdateTime = TInstant::Now(); + + void UpdateHandler() { + if (PlaceRequestRunning || WaitingInitialization || !NewPoolHandler) { + return; + } + + ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler()); + PoolHandler = *NewPoolHandler; + NewPoolHandler = std::nullopt; + InFlightRequests = 0; + } + + void StartPlaceRequest() { + if (PlaceRequestRunning || PendingRequests.empty()) { + return; + } + + PlaceRequestRunning = true; + InFlightRequests++; + ActorContext.Send(PendingRequests.front()->Forward(PoolHandler)); + PendingRequests.pop(); + } + + void OnRequestFinished() { + Y_ENSURE(InFlightRequests); + InFlightRequests--; + LastUpdateTime = TInstant::Now(); + } +}; + +struct TCpuQuotaManagerState { + TCpuQuotaManager CpuQuotaManager; + NActors::TActorContext ActorContext; + bool CpuLoadRequestRunning = false; + TInstant CpuLoadRequestTime = TInstant::Zero(); + + TCpuQuotaManagerState(NActors::TActorContext actorContext, NMonitoring::TDynamicCounterPtr subComponent) + : CpuQuotaManager(TDuration::Seconds(1), TDuration::Seconds(10), IDLE_DURATION, 0.1, true, 0, subComponent) + , ActorContext(actorContext) + {} + + void RequestCpuQuota(TActorId poolHandler, double maxClusterLoad, ui64 coockie) { + auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad); + + bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS; + ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted), 0, coockie); + + // Schedule notification + if (!quotaAccepted) { + if (auto it = HandlersLimits.find(poolHandler); it != HandlersLimits.end()) { + PendingHandlers[it->second].erase(poolHandler); + } + HandlersLimits[poolHandler] = maxClusterLoad; + PendingHandlers[maxClusterLoad].insert(poolHandler); + } + } + + void UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success) { + CpuQuotaManager.UpdateCpuLoad(instantLoad, cpuNumber, success); + CheckPendingQueue(); + } + + void AdjustCpuQuota(TDuration duration, double cpuSecondsConsumed) { + CpuQuotaManager.AdjustCpuQuota(0.0, duration, cpuSecondsConsumed); + CheckPendingQueue(); + } + + std::optional GetCpuLoadRequestDelay() { + if (CpuLoadRequestRunning) { + return std::nullopt; + } + + auto requestTime = CpuQuotaManager.GetMonitoringRequestTime(); + if (!CpuLoadRequestTime || CpuLoadRequestTime > requestTime) { + CpuLoadRequestTime = requestTime; + return CpuLoadRequestTime - TInstant::Now(); + } + return std::nullopt; + } + + void CleanupHandler(TActorId poolHandler) { + if (auto it = HandlersLimits.find(poolHandler); it != HandlersLimits.end()) { + PendingHandlers[it->second].erase(poolHandler); + HandlersLimits.erase(it); + } + } + +private: + void CheckPendingQueue() { + while (!PendingHandlers.empty()) { + const auto& [maxClusterLoad, poolHandlers] = *PendingHandlers.begin(); + if (!CpuQuotaManager.HasCpuQuota(maxClusterLoad)) { + break; + } + + for (const TActorId& poolHandler : poolHandlers) { + ActorContext.Send(poolHandler, new TEvPrivate::TEvRefreshPoolState()); + HandlersLimits.erase(poolHandler); + } + PendingHandlers.erase(PendingHandlers.begin()); + } + } + +private: + std::map> PendingHandlers; + std::unordered_map HandlersLimits; +}; + +} // namespace NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 7edc17a78525..169f3d2afd2a 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -8,9 +8,10 @@ #include #include #include - #include +#include + namespace NKikimr::NKqp::NWorkload { @@ -287,6 +288,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { poolConfig.QueueSize = Settings_.QueueSize_; poolConfig.QueryCancelAfter = Settings_.QueryCancelAfter_; poolConfig.QueryMemoryLimitPercentPerNode = Settings_.QueryMemoryLimitPercentPerNode_; + poolConfig.DatabaseLoadCpuThreshold = Settings_.DatabaseLoadCpuThreshold_; TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); GetRuntime()->Register(CreatePoolCreatorActor(edgeActor, Settings_.DomainName_, Settings_.PoolId_, poolConfig, nullptr, {})); @@ -303,6 +305,41 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { CreateSamplePool(); } + // Cluster helpers + void UpdateNodeCpuInfo(double usage, ui32 threads, ui64 nodeIndex = 0) override { + TVector> pools; + pools.emplace_back("User", usage, threads); + + auto edgeActor = GetRuntime()->AllocateEdgeActor(nodeIndex); + GetRuntime()->Send( + NNodeWhiteboard::MakeNodeWhiteboardServiceId(GetRuntime()->GetNodeId(nodeIndex)), edgeActor, + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools), nodeIndex + ); + + WaitFor(FUTURE_WAIT_TIMEOUT, "node cpu usage", [this, usage, threads, nodeIndex, edgeActor](TString& errorString) { + GetRuntime()->Send( + NNodeWhiteboard::MakeNodeWhiteboardServiceId(GetRuntime()->GetNodeId(nodeIndex)), edgeActor, + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest(), nodeIndex + ); + auto response = GetRuntime()->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); + + if (!response->Get()->Record.SystemStateInfoSize()) { + errorString = "empty system state info"; + return false; + } + const auto& systemStateInfo = response->Get()->Record.GetSystemStateInfo()[0]; + + if (!systemStateInfo.PoolStatsSize()) { + errorString = "empty pool stats"; + return false; + } + const auto& poolStat = systemStateInfo.GetPoolStats()[0]; + + errorString = TStringBuilder() << "usage: " << poolStat.GetUsage() << ", threads: " << poolStat.GetThreads(); + return poolStat.GetUsage() == usage && threads == poolStat.GetThreads(); + }); + } + // Scheme queries helpers NYdb::NScheme::TSchemeClient GetSchemeClient() const override { return NYdb::NScheme::TSchemeClient(*YdbDriver_); @@ -323,21 +360,17 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { void WaitPoolAccess(const TString& userSID, ui32 access, const TString& poolId = "") const override { auto token = NACLib::TUserToken(userSID, {}); - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) { - if (auto response = Navigate(TStringBuilder() << ".resource_pools/" << (poolId ? poolId : Settings_.PoolId_))) { - const auto& result = response->ResultSet.at(0); - bool resourcePool = result.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindResourcePool; - if (resourcePool && (!result.SecurityObject || result.SecurityObject->CheckAccess(access, token))) { - return; - } - Cerr << "WaitPoolAccess " << TInstant::Now() - start << ": " << (resourcePool ? TStringBuilder() << "access denied" : TStringBuilder() << "unexpected kind " << result.Kind) << "\n"; - } else { - Cerr << "WaitPoolAccess " << TInstant::Now() - start << ": empty response\n"; + WaitFor(FUTURE_WAIT_TIMEOUT, "pool acl", [this, token, access, poolId](TString& errorString) { + auto response = Navigate(TStringBuilder() << ".resource_pools/" << (poolId ? poolId : Settings_.PoolId_)); + if (!response) { + errorString = "empty response"; + return false; } - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool version waiting timeout"); + const auto& result = response->ResultSet.at(0); + bool resourcePool = result.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindResourcePool; + errorString = (resourcePool ? TStringBuilder() << "access denied" : TStringBuilder() << "unexpected kind " << result.Kind); + return resourcePool && (!result.SecurityObject || result.SecurityObject->CheckAccess(access, token)); + }); } // Generic query helpers @@ -390,17 +423,11 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { } void WaitPoolState(const TPoolStateDescription& state, const TString& poolId = "") const override { - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) { + WaitFor(FUTURE_WAIT_TIMEOUT, "pool state", [this, state, poolId](TString& errorString) { auto description = GetPoolDescription(TDuration::Zero(), poolId); - if (description.DelayedRequests == state.DelayedRequests && description.RunningRequests == state.RunningRequests) { - return; - } - - Cerr << "WaitPoolState " << TInstant::Now() - start << ": delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool state waiting timeout"); + errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests; + return description.DelayedRequests == state.DelayedRequests && description.RunningRequests == state.RunningRequests; + }); } void WaitPoolHandlersCount(i64 finalCount, std::optional initialCount = std::nullopt, TDuration timeout = FUTURE_WAIT_TIMEOUT) const override { @@ -410,16 +437,10 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { UNIT_ASSERT_VALUES_EQUAL_C(counter->Val(), *initialCount, "Unexpected pool handlers count"); } - TInstant start = TInstant::Now(); - while (TInstant::Now() - start < timeout) { - if (counter->Val() == finalCount) { - return; - } - - Cerr << "WaitPoolHandlersCount " << TInstant::Now() - start << ": number handlers = " << counter->Val() << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool handlers count wait timeout"); + WaitFor(timeout, "pool handlers", [counter, finalCount](TString& errorString) { + errorString = TStringBuilder() << "number handlers = " << counter->Val(); + return counter->Val() == finalCount; + }); } void StopWorkloadService(ui64 nodeIndex = 0) const override { @@ -432,13 +453,15 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { auto subgroup = GetWorkloadManagerCounters(nodeIndex) ->GetSubgroup("pool", CanonizePath(TStringBuilder() << Settings_.DomainName_ << "/" << (poolId ? poolId : Settings_.PoolId_))); - CheckCommonCounters(subgroup); + const TString description = TStringBuilder() << "Node id: " << GetRuntime()->GetNodeId(nodeIndex); + CheckCommonCounters(subgroup, description); if (checkTableCounters) { - CheckTableCounters(subgroup); + CheckTableCounters(subgroup, description); } } } + // Coomon helpers TTestActorRuntime* GetRuntime() const override { return Server_->GetRuntime(); } @@ -475,37 +498,30 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { ->GetSubgroup("subsystem", "workload_manager"); } - static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup) { - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalInFly", false)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueError", true)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("CleanupError", true)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("Cancelled", true)->Val(), 0); - - UNIT_ASSERT_GE(subgroup->GetCounter("ContinueOk", true)->Val(), 1); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOk", true)->Val(), subgroup->GetCounter("CleanupOk", true)->Val()); + static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup, const TString& description) { + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("LocalInFly", false)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueError", true)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("CleanupError", true)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("Cancelled", true)->Val(), 0, description); } - static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup) { - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0); + static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup, const TString& description) { + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0, description); - const std::vector> tableQueries = { - {"TCleanupTablesQuery", false}, - {"TRefreshPoolStateQuery", true}, - {"TDelayRequestQuery", true}, - {"TStartFirstDelayedRequestQuery", true}, - {"TStartRequestQuery", false}, - {"TCleanupRequestsQuery", true}, + const std::vector tableQueries = { + "TCleanupTablesQuery", + "TRefreshPoolStateQuery", + "TDelayRequestQuery", + "TStartFirstDelayedRequestQuery", + "TStartRequestQuery", + "TCleanupRequestsQuery", }; - for (const auto& [operation, runExpected] : tableQueries) { + for (const auto& operation : tableQueries) { auto operationSubgroup = subgroup->GetSubgroup("operation", operation); - - UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << "Unexpected vaule for operation " << operation); - if (runExpected) { - UNIT_ASSERT_GE_C(operationSubgroup->GetCounter("FinishOk", true)->Val(), 1, TStringBuilder() << "Unexpected vaule for operation " << operation); - } + UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << description << ", unexpected vaule for operation " << operation); } } @@ -564,6 +580,21 @@ TIntrusivePtr TYdbSetupSettings::Create() const { return MakeIntrusive(*this); } +//// IYdbSetup + +void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function callback) { + TInstant start = TInstant::Now(); + while (TInstant::Now() - start <= timeout) { + TString errorString; + if (callback(errorString)) { + return; + } + Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n"; + Sleep(TDuration::Seconds(1)); + } + UNIT_ASSERT_C(false, "Waiting " << description << " timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout); +} + //// TSampleQueriess void TSampleQueries::CompareYson(const TString& expected, const TString& actual) { diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index f673f070c11b..35f1a1693140 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -74,12 +74,16 @@ struct TYdbSetupSettings { FLUENT_SETTING_DEFAULT(i32, QueueSize, -1); FLUENT_SETTING_DEFAULT(TDuration, QueryCancelAfter, FUTURE_WAIT_TIMEOUT); FLUENT_SETTING_DEFAULT(double, QueryMemoryLimitPercentPerNode, -1); + FLUENT_SETTING_DEFAULT(double, DatabaseLoadCpuThreshold, -1); TIntrusivePtr Create() const; }; class IYdbSetup : public TThrRefBase { public: + // Cluster helpers + virtual void UpdateNodeCpuInfo(double usage, ui32 threads, ui64 nodeIndex = 0) = 0; + // Scheme queries helpers virtual NYdb::NScheme::TSchemeClient GetSchemeClient() const = 0; virtual void ExecuteSchemeQuery(const TString& query, NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS, const TString& expectedMessage = "") const = 0; @@ -102,8 +106,10 @@ class IYdbSetup : public TThrRefBase { virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0; virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0; + // Coomon helpers virtual TTestActorRuntime* GetRuntime() const = 0; virtual const TYdbSetupSettings& GetSettings() const = 0; + static void WaitFor(TDuration timeout, TString description, std::function callback); }; // Test queries diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 8b9a8262609d..271d7accbbfd 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -16,10 +16,20 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c auto runtime = ydb->GetRuntime(); const auto& edgeActor = runtime->AllocateEdgeActor(); - runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive(userSID, TVector{}))); + auto userToken = MakeIntrusive(userSID, TVector{}); + userToken->SaveSerializationInfo(); + runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true)); return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); } +TEvPrivate::TEvCpuLoadResponse::TPtr FetchCpuInfo(TIntrusivePtr ydb) { + auto runtime = ydb->GetRuntime(); + const auto& edgeActor = runtime->AllocateEdgeActor(); + + runtime->Register(CreateCpuLoadFetcherActor(edgeActor)); + return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); +} + } // anonymous namespace Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { @@ -100,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { // Check default pool access TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID))); - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(""))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT))); } Y_UNIT_TEST(TestDefaultPoolAdminPermissions) { @@ -129,6 +140,28 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { DROP RESOURCE POOL )" << NResourcePool::DEFAULT_POOL_ID << ";" , settings)); } + + Y_UNIT_TEST(TestCpuLoadActor) { + const ui32 nodeCount = 5; + auto ydb = TYdbSetupSettings() + .NodeCount(nodeCount) + .Create(); + + auto response = FetchCpuInfo(ydb); + UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::NOT_FOUND, response->Get()->Issues.ToOneLineString()); + UNIT_ASSERT_STRING_CONTAINS(response->Get()->Issues.ToString(), "Cpu info not found"); + + const double usage = 0.25; + const ui32 threads = 2; + for (size_t nodeIndex = 0; nodeIndex < nodeCount; ++nodeIndex) { + ydb->UpdateNodeCpuInfo(usage, threads, nodeIndex); + } + + response = FetchCpuInfo(ydb); + UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(response->Get()->CpuNumber, threads * nodeCount); + UNIT_ASSERT_DOUBLES_EQUAL(response->Get()->InstantLoad, usage, 0.01); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp index 288f3b72edeb..4d37370a8599 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp @@ -133,22 +133,26 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) { Y_UNIT_TEST(TestLeaseExpiration) { auto ydb = TYdbSetupSettings() .ConcurrentQueryLimit(1) + .QueryCancelAfter(TDuration::Zero()) .Create(); // Create tables - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); + auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true)); + ydb->WaitQueryExecution(hangingRequest); - const TDuration leaseDuration = TDuration::Seconds(10); - StartRequest(ydb, "test_session", leaseDuration); - DelayRequest(ydb, "test_session", leaseDuration); - CheckPoolDescription(ydb, 1, 1, leaseDuration); + auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1}); ydb->StopWorkloadService(); ydb->WaitPoolHandlersCount(0); // Check that lease expired - Sleep(leaseDuration + TDuration::Seconds(5)); - CheckPoolDescription(ydb, 0, 0); + IYdbSetup::WaitFor(TDuration::Seconds(60), "lease expiration", [ydb](TString& errorString) { + auto description = ydb->GetPoolDescription(TDuration::Zero()); + + errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests; + return description.AmountRequests() == 0; + }); } Y_UNIT_TEST(TestLeaseUpdates) { diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 553cb5b17933..a750be18bd79 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -123,6 +123,36 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); } + Y_UNIT_TEST(TestZeroQueueSizeManyQueries) { + const i32 inFlight = 10; + auto ydb = TYdbSetupSettings() + .ConcurrentQueryLimit(inFlight) + .QueueSize(0) + .QueryCancelAfter(FUTURE_WAIT_TIMEOUT * inFlight) + .Create(); + + auto settings = TQueryRunnerSettings().HangUpDuringExecution(true); + + std::vector asyncResults; + for (size_t i = 0; i < inFlight; ++i) { + asyncResults.emplace_back(ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings)); + } + + for (const auto& asyncResult : asyncResults) { + ydb->WaitQueryExecution(asyncResult); + } + + TSampleQueries::CheckOverloaded( + ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)), + ydb->GetSettings().PoolId_ + ); + + for (const auto& asyncResult : asyncResults) { + ydb->ContinueQueryExecution(asyncResult); + TSampleQueries::TSelect42::CheckResult(asyncResult.GetResult()); + } + } + Y_UNIT_TEST(TestQueryCancelAfterUnlimitedPool) { auto ydb = TYdbSetupSettings() .QueryCancelAfter(TDuration::Seconds(10)) @@ -189,6 +219,38 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << ydb->GetSettings().PoolId_ << " was disabled due to zero concurrent query limit"); } + Y_UNIT_TEST(TestCpuLoadThreshold) { + auto ydb = TYdbSetupSettings() + .DatabaseLoadCpuThreshold(90) + .QueryCancelAfter(TDuration::Seconds(10)) + .Create(); + + // Simulate load + ydb->UpdateNodeCpuInfo(1.0, 1); + + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Delay deadline exceeded in pool " << ydb->GetSettings().PoolId_); + } + + Y_UNIT_TEST(TestCpuLoadThresholdRefresh) { + auto ydb = TYdbSetupSettings() + .DatabaseLoadCpuThreshold(90) + .Create(); + + // Simulate load + ydb->UpdateNodeCpuInfo(1.0, 1); + + // Delay request + auto result = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 0}); + + // Free load + ydb->ContinueQueryExecution(result); + ydb->UpdateNodeCpuInfo(0.0, 1); + TSampleQueries::TSelect42::CheckResult(result.GetResult(TDuration::Seconds(5))); + } + Y_UNIT_TEST(TestHandlerActorCleanup) { auto ydb = TYdbSetupSettings() .ConcurrentQueryLimit(1) @@ -197,7 +259,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(NResourcePool::DEFAULT_POOL_ID))); - ydb->WaitPoolHandlersCount(0, 2, TDuration::Seconds(35)); + ydb->WaitPoolHandlersCount(0, 2, TDuration::Seconds(95)); } } @@ -412,19 +474,16 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { DROP RESOURCE POOL )" << poolId << ";" ); - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) { - if (ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown) { - auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found"); - return; - } - - Cerr << "WaitPoolDrop " << TInstant::Now() - start << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool drop waiting timeout"); + IYdbSetup::WaitFor(FUTURE_WAIT_TIMEOUT, "pool drop", [ydb, poolId](TString& errorString) { + auto kind = ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind; + + errorString = TStringBuilder() << "kind = " << kind; + return kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown; + }); + + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found"); } Y_UNIT_TEST(TestResourcePoolAcl) { diff --git a/ydb/core/kqp/workload_service/ya.make b/ydb/core/kqp/workload_service/ya.make index 40ee9196cdad..b8b5704044e2 100644 --- a/ydb/core/kqp/workload_service/ya.make +++ b/ydb/core/kqp/workload_service/ya.make @@ -7,7 +7,11 @@ SRCS( PEERDIR( ydb/core/cms/console + ydb/core/fq/libs/compute/common + ydb/core/kqp/workload_service/actors + + ydb/library/actors/interconnect ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 49d3096b7dd6..62fa1dd141f8 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -141,7 +141,8 @@ message TFeatureFlags { optional bool EnableExternalSourceSchemaInference = 126 [default = false]; optional bool EnableDbMetadataCache = 127 [default = false]; optional bool EnableTableDatetime64 = 128 [default = false]; - optional bool EnableResourcePools = 129 [default = false]; + optional bool EnableResourcePools = 129 [default = false]; optional bool EnableColumnStatistics = 130 [default = false]; optional bool EnableSingleCompositeActionGroup = 131 [default = false]; + optional bool EnableResourcePoolsOnServerless = 132 [default = false]; } diff --git a/ydb/core/resource_pools/resource_pool_settings.cpp b/ydb/core/resource_pools/resource_pool_settings.cpp index 1a5c39644a54..f477d334c625 100644 --- a/ydb/core/resource_pools/resource_pool_settings.cpp +++ b/ydb/core/resource_pools/resource_pool_settings.cpp @@ -7,7 +7,8 @@ std::unordered_map GetPropertiesMap(TPoolSettings& settings, std::unordered_map properties = { {"concurrent_query_limit", &settings.ConcurrentQueryLimit}, {"queue_size", &settings.QueueSize}, - {"query_memory_limit_percent_per_node", &settings.QueryMemoryLimitPercentPerNode} + {"query_memory_limit_percent_per_node", &settings.QueryMemoryLimitPercentPerNode}, + {"database_load_cpu_threshold", &settings.DatabaseLoadCpuThreshold} }; if (!restricted) { properties.insert({"query_cancel_after_seconds", &settings.QueryCancelAfter}); diff --git a/ydb/core/resource_pools/resource_pool_settings.h b/ydb/core/resource_pools/resource_pool_settings.h index c2dc319838e1..cecfd4eefb59 100644 --- a/ydb/core/resource_pools/resource_pool_settings.h +++ b/ydb/core/resource_pools/resource_pool_settings.h @@ -17,6 +17,8 @@ struct TPoolSettings { TPercent QueryMemoryLimitPercentPerNode = -1; // Percent from node memory capacity, -1 = disabled + TPercent DatabaseLoadCpuThreshold = -1; // -1 = disabled + bool operator==(const TPoolSettings& other) const = default; }; diff --git a/ydb/core/tx/replication/controller/util.h b/ydb/core/tx/replication/controller/util.h index 0963fa19d044..30b92d243965 100644 --- a/ydb/core/tx/replication/controller/util.h +++ b/ydb/core/tx/replication/controller/util.h @@ -32,6 +32,7 @@ inline TMaybe TryTargetKindFromEntryType(NYdb::NSchem case NYdb::NScheme::ESchemeEntryType::ExternalTable: case NYdb::NScheme::ESchemeEntryType::ExternalDataSource: case NYdb::NScheme::ESchemeEntryType::View: + case NYdb::NScheme::ESchemeEntryType::ResourcePool: return Nothing(); } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_resource_pool.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_resource_pool.cpp index 863533fe7aa2..25ce9ad0c08e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_resource_pool.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_resource_pool.cpp @@ -129,6 +129,13 @@ class TAlterResourcePool : public TSubOperation { static_cast(OperationId.GetTxId()), static_cast(context.SS->SelfTabletId())); + if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) { + if (!context.SS->EnableResourcePoolsOnServerless) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it"); + return result; + } + } + const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS); RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_resource_pool.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_resource_pool.cpp index 29c5aa399e72..a765216aa5b5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_resource_pool.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_resource_pool.cpp @@ -155,6 +155,13 @@ class TCreateResourcePool : public TSubOperation { static_cast(OperationId.GetTxId()), static_cast(context.SS->SelfTabletId())); + if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) { + if (!context.SS->EnableResourcePoolsOnServerless) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it"); + return result; + } + } + const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS); RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath)); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 73c90f5df789..4cb4ddd93d22 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -7012,6 +7012,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu EnableTempTables = featureFlags.GetEnableTempTables(); EnableReplaceIfExistsForExternalEntities = featureFlags.GetEnableReplaceIfExistsForExternalEntities(); EnableTableDatetime64 = featureFlags.GetEnableTableDatetime64(); + EnableResourcePoolsOnServerless = featureFlags.GetEnableResourcePoolsOnServerless(); } void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 02b10af80233..56e4bb9cfbe9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -329,6 +329,7 @@ class TSchemeShard bool EnableReplaceIfExistsForExternalEntities = false; bool EnableTempTables = false; bool EnableTableDatetime64 = false; + bool EnableResourcePoolsOnServerless = false; TShardDeleter ShardDeleter; diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 38dc0a914cb2..769946ce258a 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -392,7 +392,9 @@ THolder BuildSchemeCacheNavigateRequest(cons auto request = MakeHolder(); auto databasePath = SplitPath(database); request->DatabaseName = CanonizePath(databasePath); - request->UserToken = userToken; + if (userToken && !userToken->GetSerializedToken().empty()) { + request->UserToken = userToken; + } for (const auto& pathComponents : pathsComponents) { auto& entry = request->ResultSet.emplace_back(); diff --git a/ydb/mvp/core/core_ydb_impl.h b/ydb/mvp/core/core_ydb_impl.h index 89691b2ca5dc..16682fb3da80 100644 --- a/ydb/mvp/core/core_ydb_impl.h +++ b/ydb/mvp/core/core_ydb_impl.h @@ -519,7 +519,8 @@ struct THandlerActorYdb { {"CoordinationNode", "coordination"}, {"ColumnStore", "column-store"}, {"ExternalTable", "external-table"}, - {"ExternalDataSource", "external-data-source"} + {"ExternalDataSource", "external-data-source"}, + {"ResourcePool", "resource-pool"} }; if (const auto* mapping = specialCases.FindPtr(schemeEntry)) { return *mapping; diff --git a/ydb/public/api/protos/ydb_scheme.proto b/ydb/public/api/protos/ydb_scheme.proto index f111e7a5c89b..4ddcafc40a04 100644 --- a/ydb/public/api/protos/ydb_scheme.proto +++ b/ydb/public/api/protos/ydb_scheme.proto @@ -64,6 +64,7 @@ message Entry { EXTERNAL_TABLE = 18; EXTERNAL_DATA_SOURCE = 19; VIEW = 20; + RESOURCE_POOL = 21; } // Name of scheme entry (dir2 of /dir1/dir2) diff --git a/ydb/public/lib/ydb_cli/common/print_utils.cpp b/ydb/public/lib/ydb_cli/common/print_utils.cpp index c3c548588daf..68b49ea40eed 100644 --- a/ydb/public/lib/ydb_cli/common/print_utils.cpp +++ b/ydb/public/lib/ydb_cli/common/print_utils.cpp @@ -39,6 +39,9 @@ void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NCol case NScheme::ESchemeEntryType::ExternalDataSource: o << colors.LightWhite(); break; + case NScheme::ESchemeEntryType::ResourcePool: + o << colors.LightWhite(); + break; default: o << colors.RedColor(); } @@ -106,6 +109,8 @@ TString EntryTypeToString(NScheme::ESchemeEntryType entry) { return "view"; case NScheme::ESchemeEntryType::Replication: return "replication"; + case NScheme::ESchemeEntryType::ResourcePool: + return "resource-pool"; case NScheme::ESchemeEntryType::Unknown: case NScheme::ESchemeEntryType::Sequence: return "unknown"; diff --git a/ydb/public/sdk/cpp/client/ydb_scheme/scheme.cpp b/ydb/public/sdk/cpp/client/ydb_scheme/scheme.cpp index 0570d421b247..b62477cc3503 100644 --- a/ydb/public/sdk/cpp/client/ydb_scheme/scheme.cpp +++ b/ydb/public/sdk/cpp/client/ydb_scheme/scheme.cpp @@ -95,6 +95,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry) return ESchemeEntryType::ExternalDataSource; case ::Ydb::Scheme::Entry::VIEW: return ESchemeEntryType::View; + case ::Ydb::Scheme::Entry::RESOURCE_POOL: + return ESchemeEntryType::ResourcePool; default: return ESchemeEntryType::Unknown; } diff --git a/ydb/public/sdk/cpp/client/ydb_scheme/scheme.h b/ydb/public/sdk/cpp/client/ydb_scheme/scheme.h index 90ecbea9ee4a..832deb172527 100644 --- a/ydb/public/sdk/cpp/client/ydb_scheme/scheme.h +++ b/ydb/public/sdk/cpp/client/ydb_scheme/scheme.h @@ -42,7 +42,8 @@ enum class ESchemeEntryType : i32 { Topic = 17, ExternalTable = 18, ExternalDataSource = 19, - View = 20 + View = 20, + ResourcePool = 21 }; struct TVirtualTimestamp {