From ae027eb7ea9ff6bd8e020c143e59960ce904184e Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 26 May 2024 11:11:02 +0300 Subject: [PATCH] disk usage limit for CS compaction (in general slider limit) --- ydb/core/base/events.h | 3 +- .../run/kikimr_services_initializers.cpp | 31 +++++- .../run/kikimr_services_initializers.h | 6 ++ ydb/core/driver_lib/run/ya.make | 1 + ydb/core/protos/config.proto | 7 ++ .../columnshard/columnshard__write_index.cpp | 26 ++++- .../writer/compacted_blob_constructor.cpp | 1 + .../writer/compacted_blob_constructor.h | 6 ++ ydb/core/tx/limiter/service/service.cpp | 59 ++++++++++ ydb/core/tx/limiter/service/service.h | 101 ++++++++++++++++++ ydb/core/tx/limiter/service/ya.make | 12 +++ ydb/core/tx/limiter/usage/abstract.cpp | 5 + ydb/core/tx/limiter/usage/abstract.h | 24 +++++ ydb/core/tx/limiter/usage/config.cpp | 12 +++ ydb/core/tx/limiter/usage/config.h | 33 ++++++ ydb/core/tx/limiter/usage/events.cpp | 5 + ydb/core/tx/limiter/usage/events.h | 30 ++++++ ydb/core/tx/limiter/usage/service.cpp | 5 + ydb/core/tx/limiter/usage/service.h | 57 ++++++++++ ydb/core/tx/limiter/usage/ya.make | 15 +++ ydb/library/services/services.proto | 1 + 21 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 ydb/core/tx/limiter/service/service.cpp create mode 100644 ydb/core/tx/limiter/service/service.h create mode 100644 ydb/core/tx/limiter/service/ya.make create mode 100644 ydb/core/tx/limiter/usage/abstract.cpp create mode 100644 ydb/core/tx/limiter/usage/abstract.h create mode 100644 ydb/core/tx/limiter/usage/config.cpp create mode 100644 ydb/core/tx/limiter/usage/config.h create mode 100644 ydb/core/tx/limiter/usage/events.cpp create mode 100644 ydb/core/tx/limiter/usage/events.h create mode 100644 ydb/core/tx/limiter/usage/service.cpp create mode 100644 ydb/core/tx/limiter/usage/service.h create mode 100644 ydb/core/tx/limiter/usage/ya.make diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 7a6f03b3cfe5..6ccf5f838254 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -178,7 +178,8 @@ struct TKikimrEvents : TEvents { ES_REPLICATION_SERVICE, ES_BACKUP_SERVICE, ES_TX_BACKGROUND, - ES_SS_BG_TASKS + ES_SS_BG_TASKS, + ES_LIMITER }; }; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index c6c91ff219d6..1db90ce4a433 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -176,9 +176,12 @@ #include #include -#include #include +#include #include +#include +#include +#include #include @@ -2169,6 +2172,28 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu } } +TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NLimiter::TConfig serviceConfig; + if (Config.HasCompDiskLimiterConfig()) { + Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetCompDiskLimiterConfig())); + } + + if (serviceConfig.IsEnabled()) { + TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); + TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_COMP_DISK_LIMITER"); + + auto service = NLimiter::TCompDiskOperator::CreateService(serviceConfig, countersGroup); + + setup->LocalServices.push_back(std::make_pair( + NLimiter::TCompDiskOperator::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig) : IKikimrServicesInitializer(runConfig) { } @@ -2178,9 +2203,7 @@ void TCompConveyorInitializer::InitializeServices(NActors::TActorSystemSetup* se if (Config.HasCompConveyorConfig()) { Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetCompConveyorConfig())); } - if (!serviceConfig.HasDefaultFractionOfThreadsCount()) { - serviceConfig.SetDefaultFractionOfThreadsCount(0.33); - } + serviceConfig.SetWorkersCount(2); if (serviceConfig.IsEnabled()) { TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 06238a6f08e4..44d450a35636 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -391,6 +391,12 @@ class TKqpServiceInitializer : public IKikimrServicesInitializer { IGlobalObjectStorage& GlobalObjects; }; +class TCompDiskLimiterInitializer: public IKikimrServicesInitializer { +public: + TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TCompConveyorInitializer: public IKikimrServicesInitializer { public: TCompConveyorInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 1bd7d39f8610..97ff4c836216 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -112,6 +112,7 @@ PEERDIR( ydb/core/tx/columnshard ydb/core/tx/coordinator ydb/core/tx/conveyor/service + ydb/core/tx/limiter/service ydb/core/tx/datashard ydb/core/tx/long_tx_service ydb/core/tx/long_tx_service/public diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2293d861341e..a4b5ee645567 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -593,6 +593,12 @@ message TConveyorConfig { optional double DefaultFractionOfThreadsCount = 4; } +message TLimiterConfig { + optional bool Enabled = 1 [default = true]; + optional uint64 Limit = 2; + optional uint64 PeriodMilliSeconds = 3 [default = 1000]; +} + message TExternalIndexConfig { optional bool Enabled = 1 [default = true]; optional TInternalRequestConfig RequestConfig = 2; @@ -1839,6 +1845,7 @@ message TAppConfig { optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76; optional TBackgroundCleaningConfig BackgroundCleaningConfig = 77; optional TBlobCacheConfig BlobCacheConfig = 78; + optional TLimiterConfig CompDiskLimiterConfig = 79; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index b98537ddbea7..4400227e1954 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -6,10 +6,34 @@ #include "engines/changes/abstract/abstract.h" #include "engines/writer/compacted_blob_constructor.h" +#include +#include + #include namespace NKikimr::NColumnShard { +class TDiskResourcesRequest: public NLimiter::IResourceRequest { +private: + using TBase = NLimiter::IResourceRequest; + std::shared_ptr WriteController; + const ui64 TabletId; + +private: + virtual void DoOnResourceAllocated() override { + NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max())); + } + +public: + TDiskResourcesRequest(const std::shared_ptr& writeController, const ui64 tabletId) + : TBase(writeController->GetWriteVolume()) + , WriteController(writeController) + , TabletId(tabletId) + { + + } +}; + void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { auto putStatus = ev->Get()->GetPutStatus(); @@ -32,7 +56,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte if (*needDraftTransaction) { Execute(new TTxWriteDraft(this, writeController)); } else { - ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max())); + NLimiter::TCompDiskOperator::AskResource(std::make_shared(writeController, TabletID())); } } } else { diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp index 62fab2ed57d8..54980e28d6ab 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp @@ -23,6 +23,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T for (auto&& b : portionWithBlobs.GetBlobs()) { auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId()))); b.RegisterBlobId(portionWithBlobs, task.GetBlobId()); + WriteVolume += b.GetSize(); } } } diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h index 6772599ee05f..e7341ee19245 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h @@ -13,11 +13,17 @@ class TCompactedWriteController : public NColumnShard::IWriteController { private: TAutoPtr WriteIndexEv; TActorId DstActor; + ui64 WriteVolume = 0; + protected: void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; virtual void DoAbort(const TString& reason) override; + public: const TBlobsAction& GetBlobsAction(); + ui64 GetWriteVolume() const { + return WriteVolume; + } TCompactedWriteController(const TActorId& dstActor, TAutoPtr writeEv); ~TCompactedWriteController(); diff --git a/ydb/core/tx/limiter/service/service.cpp b/ydb/core/tx/limiter/service/service.cpp new file mode 100644 index 000000000000..291e6ebe4cae --- /dev/null +++ b/ydb/core/tx/limiter/service/service.cpp @@ -0,0 +1,59 @@ +#include "service.h" + +namespace NKikimr::NLimiter { + +TLimiterActor::TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters) + : LimiterName(limiterName) + , Config(config) + , Counters(LimiterName, baseCounters) +{ + Counters.InProgressLimit->Set(Config.GetLimit()); +} + +void TLimiterActor::HandleMain(TEvExternal::TEvAskResource::TPtr& ev) { + const auto now = TMonotonic::Now(); + if (VolumeInFlight + ev->Get()->GetRequest()->GetVolume() <= Config.GetLimit()) { + VolumeInFlight += ev->Get()->GetRequest()->GetVolume(); + RequestsInFlight.emplace_back(now, ev->Get()->GetRequest()->GetVolume()); + if (RequestsInFlight.size() == 1) { + Schedule(now + Config.GetPeriod(), new NActors::TEvents::TEvWakeup()); + } + ev->Get()->GetRequest()->OnResourceAllocated(); + Counters.InProgressStart->Inc(); + } else { + RequestsQueue.emplace_back(now, ev->Get()->GetRequest()); + VolumeInWaiting += ev->Get()->GetRequest()->GetVolume(); + } + Counters.InProgressCount->Set(RequestsInFlight.size()); + Counters.InProgressVolume->Set(VolumeInFlight); + Counters.WaitingQueueCount->Set(RequestsQueue.size()); + Counters.WaitingQueueVolume->Set(VolumeInWaiting); +} + +void TLimiterActor::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) { + const auto now = TMonotonic::Now(); + AFL_VERIFY(RequestsInFlight.size()); + while (RequestsInFlight.size() && now + Config.GetPeriod() <= RequestsInFlight.front().GetInstant()) { + AFL_VERIFY(RequestsInFlight.front().GetVolume() <= VolumeInFlight); + VolumeInFlight = VolumeInFlight - RequestsInFlight.front().GetVolume(); + RequestsInFlight.pop_front(); + } + if (RequestsInFlight.size()) { + Schedule(RequestsInFlight.front().GetInstant() + Config.GetPeriod(), new NActors::TEvents::TEvWakeup()); + } + while (RequestsQueue.size() && (!VolumeInFlight || VolumeInFlight + RequestsQueue.front().GetRequest()->GetVolume() <= Config.GetLimit())) { + Counters.WaitingHistogram->Collect((i64)(now - RequestsQueue.front().GetInstant()).MilliSeconds(), 1); + VolumeInFlight += RequestsQueue.front().GetRequest()->GetVolume(); + RequestsInFlight.emplace_back(now, RequestsQueue.front().GetRequest()->GetVolume()); + RequestsQueue.front().GetRequest()->OnResourceAllocated(); + VolumeInWaiting -= RequestsQueue.front().GetRequest()->GetVolume(); + RequestsQueue.pop_front(); + Counters.InProgressStart->Inc(); + } + Counters.InProgressCount->Set(RequestsInFlight.size()); + Counters.InProgressVolume->Set(VolumeInFlight); + Counters.WaitingQueueCount->Set(RequestsQueue.size()); + Counters.WaitingQueueVolume->Set(VolumeInWaiting); +} + +} diff --git a/ydb/core/tx/limiter/service/service.h b/ydb/core/tx/limiter/service/service.h new file mode 100644 index 000000000000..32e9c45faca5 --- /dev/null +++ b/ydb/core/tx/limiter/service/service.h @@ -0,0 +1,101 @@ +#pragma once +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +namespace NKikimr::NLimiter { + +class TCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; +public: + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueCount; + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueVolume; + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressLimit; + + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressCount; + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressVolume; + + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressStart; + + const ::NMonitoring::THistogramPtr WaitingHistogram; + + TCounters(const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals) + : TBase("Limiter/" + limiterName, baseSignals) + , WaitingQueueCount(TBase::GetValue("WaitingQueue/Count")) + , WaitingQueueVolume(TBase::GetValue("WaitingQueue/Volume")) + , InProgressLimit(TBase::GetValue("InProgress/Limit/Volume")) + , InProgressCount(TBase::GetValue("InProgress/Count")) + , InProgressVolume(TBase::GetValue("InProgress/Volume")) + , InProgressStart(TBase::GetDeriviative("InProgress")) + , WaitingHistogram(TBase::GetHistogram("Waiting", NMonitoring::ExponentialHistogram(20, 2))) { + } +}; + +class TLimiterActor: public NActors::TActorBootstrapped { +private: + const TString LimiterName; + const TConfig Config; + TCounters Counters; + class TResourceRequest { + private: + YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero()); + YDB_READONLY_DEF(std::shared_ptr, Request); + public: + TResourceRequest(const TMonotonic instant, const std::shared_ptr& req) + : Instant(instant) + , Request(req) { + + } + }; + + class TResourceRequestInFlight { + private: + YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero()); + YDB_READONLY(ui64, Volume, 0); + public: + TResourceRequestInFlight(const TMonotonic instant, const ui64 volume) + : Instant(instant) + , Volume(volume) { + + } + }; + + ui64 VolumeInFlight = 0; + ui64 VolumeInWaiting = 0; + std::deque RequestsQueue; + std::deque RequestsInFlight; + + void HandleMain(TEvExternal::TEvAskResource::TPtr& ev); + void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev); + +public: + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternal::TEvAskResource, HandleMain); + hFunc(NActors::TEvents::TEvWakeup, HandleMain); + default: + AFL_ERROR(NKikimrServices::TX_LIMITER)("limiter", LimiterName)("problem", "unexpected event")("type", ev->GetTypeRewrite()); + AFL_VERIFY_DEBUG(false)("type", ev->GetTypeRewrite()); + break; + } + } + + TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters); + + void Bootstrap() { + Become(&TLimiterActor::StateMain); + } +}; + +} diff --git a/ydb/core/tx/limiter/service/ya.make b/ydb/core/tx/limiter/service/ya.make new file mode 100644 index 000000000000..5560098a6b5a --- /dev/null +++ b/ydb/core/tx/limiter/service/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + service.cpp +) + +PEERDIR( + ydb/core/tx/limiter/usage + ydb/core/protos +) + +END() diff --git a/ydb/core/tx/limiter/usage/abstract.cpp b/ydb/core/tx/limiter/usage/abstract.cpp new file mode 100644 index 000000000000..724f95db7f2f --- /dev/null +++ b/ydb/core/tx/limiter/usage/abstract.cpp @@ -0,0 +1,5 @@ +#include "abstract.h" + +namespace NKikimr::NLimiter { + +} diff --git a/ydb/core/tx/limiter/usage/abstract.h b/ydb/core/tx/limiter/usage/abstract.h new file mode 100644 index 000000000000..ff3e460a771c --- /dev/null +++ b/ydb/core/tx/limiter/usage/abstract.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace NKikimr::NLimiter { +class IResourceRequest { +private: + YDB_READONLY(ui64, Volume, 0); + virtual void DoOnResourceAllocated() = 0; +public: + void OnResourceAllocated() { + return DoOnResourceAllocated(); + } + + virtual ~IResourceRequest() = default; + + IResourceRequest(const ui64 volume) + : Volume(volume) + { + + } +}; + +} diff --git a/ydb/core/tx/limiter/usage/config.cpp b/ydb/core/tx/limiter/usage/config.cpp new file mode 100644 index 000000000000..a4448ea10320 --- /dev/null +++ b/ydb/core/tx/limiter/usage/config.cpp @@ -0,0 +1,12 @@ +#include "config.h" +#include + +namespace NKikimr::NLimiter { + +TString TConfig::DebugString() const { + TStringBuilder sb; + sb << "Period=" << Period << ";Limit=" << Limit << ";Enabled=" << EnabledFlag << ";"; + return sb; +} + +} diff --git a/ydb/core/tx/limiter/usage/config.h b/ydb/core/tx/limiter/usage/config.h new file mode 100644 index 000000000000..9aeec0ce50e5 --- /dev/null +++ b/ydb/core/tx/limiter/usage/config.h @@ -0,0 +1,33 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NLimiter { + +class TConfig { +private: + YDB_READONLY(TDuration, Period, TDuration::Seconds(1)); + YDB_READONLY(ui64, Limit, 0); + YDB_READONLY_FLAG(Enabled, false); +public: + template + bool DeserializeFromProto(const NKikimrConfig::TLimiterConfig& config) { + if (config.HasPeriodMilliSeconds()) { + Period = TDuration::MilliSeconds(config.GetPeriodMilliSeconds()); + } else { + Period = TPolicy::DefaultPeriod; + } + if (config.HasLimit()) { + Limit = config.GetLimit(); + } else { + Limit = TPolicy::DefaultLimit; + } + EnabledFlag = config.GetEnabled(); + return true; + } + + TString DebugString() const; +}; + +} diff --git a/ydb/core/tx/limiter/usage/events.cpp b/ydb/core/tx/limiter/usage/events.cpp new file mode 100644 index 000000000000..bf9582f2b302 --- /dev/null +++ b/ydb/core/tx/limiter/usage/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NLimiter { + +} diff --git a/ydb/core/tx/limiter/usage/events.h b/ydb/core/tx/limiter/usage/events.h new file mode 100644 index 000000000000..8b5b3a248e65 --- /dev/null +++ b/ydb/core/tx/limiter/usage/events.h @@ -0,0 +1,30 @@ +#pragma once +#include "abstract.h" +#include +#include +#include +#include + +namespace NKikimr::NLimiter { + +struct TEvExternal { + enum EEv { + EvAskResource = EventSpaceBegin(TKikimrEvents::ES_LIMITER), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_LIMITER), "expected EvEnd < EventSpaceEnd"); + + class TEvAskResource: public NActors::TEventLocal { + private: + YDB_READONLY_DEF(std::shared_ptr, Request); + public: + TEvAskResource() = default; + + explicit TEvAskResource(const std::shared_ptr& request) + : Request(request) { + } + }; +}; + +} diff --git a/ydb/core/tx/limiter/usage/service.cpp b/ydb/core/tx/limiter/usage/service.cpp new file mode 100644 index 000000000000..c4e435d65f40 --- /dev/null +++ b/ydb/core/tx/limiter/usage/service.cpp @@ -0,0 +1,5 @@ +#include "service.h" + +namespace NKikimr::NConveyor { + +} diff --git a/ydb/core/tx/limiter/usage/service.h b/ydb/core/tx/limiter/usage/service.h new file mode 100644 index 000000000000..55b4c455177b --- /dev/null +++ b/ydb/core/tx/limiter/usage/service.h @@ -0,0 +1,57 @@ +#pragma once +#include "config.h" +#include +#include +#include +#include + +namespace NKikimr::NLimiter { + +template +class TServiceOperatorImpl { +private: + using TSelf = TServiceOperatorImpl; + std::atomic IsEnabledFlag = false; + static void Register(const TConfig& serviceConfig) { + Singleton()->IsEnabledFlag = serviceConfig.IsEnabled() && serviceConfig.GetLimit(); + } + static const TString& GetLimiterName() { + Y_ABORT_UNLESS(TLimiterPolicy::Name.size() == 4); + return TLimiterPolicy::Name; + } +public: + static bool AskResource(const std::shared_ptr& request) { + AFL_VERIFY(!!request); + auto& context = NActors::TActorContext::AsActorContext(); + const NActors::TActorId& selfId = context.SelfID; + if (TSelf::IsEnabled()) { + context.Send(MakeServiceId(selfId.NodeId()), new TEvExternal::TEvAskResource(request)); + return true; + } else { + request->OnResourceAllocated(); + return false; + } + } + static bool IsEnabled() { + return Singleton()->IsEnabledFlag; + } + static NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcLimt" + GetLimiterName()); + } + static NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals) { + Register(config); + return new TLimiterActor(config, GetLimiterName(), baseSignals); + } + +}; + +class TCompDiskLimiterPolicy { +public: + static const inline TString Name = "CMPD"; + static const inline TDuration DefaultPeriod = TDuration::Seconds(1); + static const inline ui64 DefaultLimit = (ui64)256 * 1024 * 1024; +}; + +using TCompDiskOperator = TServiceOperatorImpl; + +} diff --git a/ydb/core/tx/limiter/usage/ya.make b/ydb/core/tx/limiter/usage/ya.make new file mode 100644 index 000000000000..28d0c6f8eead --- /dev/null +++ b/ydb/core/tx/limiter/usage/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + events.cpp + config.cpp + abstract.cpp + service.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/services/metadata/request +) + +END() diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 2a70e16707cf..32effb55cc6a 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -378,6 +378,7 @@ enum EServiceKikimr { EXT_INDEX = 1900; TX_CONVEYOR = 2000; + TX_LIMITER=2001; ARROW_HELPER = 2100; KAFKA_PROXY = 2200;