diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 97f89dc978a2..7d112c064043 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -173,6 +173,7 @@ struct TKikimrEvents : TEvents { ES_GRAPH, ES_REPLICATION_SERVICE, ES_CHANGE_EXCHANGE, + ES_LIMITER }; }; diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index ae040faa9e2c..9a113c5d5f3d 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -84,6 +84,7 @@ union TBasicKikimrServicesMask { bool EnableDatabaseMetadataCache:1; bool EnableGraphService:1; + bool EnableCompDiskLimiter:1; }; struct { diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 0dae3013e17b..ca9a991cb5a0 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -173,9 +173,12 @@ #include #include -#include #include +#include #include +#include +#include +#include #include #include @@ -2080,6 +2083,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu } } +TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NLimiter::TConfig serviceConfig; + Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetCompDiskLimiterConfig())); + + if (serviceConfig.IsEnabled()) { + TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets"); + TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_COMP_DISK_LIMITER"); + + auto service = NLimiter::TCompDiskOperator::CreateService(serviceConfig, countersGroup); + + setup->LocalServices.push_back(std::make_pair( + NLimiter::TCompDiskOperator::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig) : IKikimrServicesInitializer(runConfig) { } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 2b6cd37f7c6d..7a541f380817 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -391,6 +391,12 @@ class TKqpServiceInitializer : public IKikimrServicesInitializer { IGlobalObjectStorage& GlobalObjects; }; +class TCompDiskLimiterInitializer: public IKikimrServicesInitializer { +public: + TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TCompConveyorInitializer: public IKikimrServicesInitializer { public: TCompConveyorInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index bfd467224554..3f11f2d088e7 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1512,6 +1512,10 @@ TIntrusivePtr TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig)); } + if (serviceMask.EnableCompDiskLimiter) { + sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig)); + } + if (serviceMask.EnableScanConveyor) { sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig)); } diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 63b1a7f4cc5c..175eb324bcc1 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -112,6 +112,7 @@ PEERDIR( ydb/core/tx/columnshard ydb/core/tx/coordinator ydb/core/tx/conveyor/service + ydb/core/tx/limiter/service ydb/core/tx/datashard ydb/core/tx/long_tx_service ydb/core/tx/long_tx_service/public diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index fce658e4d891..4bb4a80120ec 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -621,6 +621,12 @@ message TConveyorConfig { optional double WorkersCountDouble = 5; } +message TLimiterConfig { + optional bool Enabled = 1 [default = true]; + optional uint64 Limit = 2; + optional uint64 PeriodMilliSeconds = 3 [default = 1000]; +} + message TExternalIndexConfig { optional bool Enabled = 1 [default = true]; optional TInternalRequestConfig RequestConfig = 2; @@ -1802,6 +1808,7 @@ message TAppConfig { optional bool AllowEditYamlInUi = 75; optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76; optional TBackgroundCleaningConfig BackgroundCleaningConfig = 77; + optional TLimiterConfig CompDiskLimiterConfig = 79; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index b98537ddbea7..4400227e1954 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -6,10 +6,34 @@ #include "engines/changes/abstract/abstract.h" #include "engines/writer/compacted_blob_constructor.h" +#include +#include + #include namespace NKikimr::NColumnShard { +class TDiskResourcesRequest: public NLimiter::IResourceRequest { +private: + using TBase = NLimiter::IResourceRequest; + std::shared_ptr WriteController; + const ui64 TabletId; + +private: + virtual void DoOnResourceAllocated() override { + NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max())); + } + +public: + TDiskResourcesRequest(const std::shared_ptr& writeController, const ui64 tabletId) + : TBase(writeController->GetWriteVolume()) + , WriteController(writeController) + , TabletId(tabletId) + { + + } +}; + void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { auto putStatus = ev->Get()->GetPutStatus(); @@ -32,7 +56,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte if (*needDraftTransaction) { Execute(new TTxWriteDraft(this, writeController)); } else { - ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max())); + NLimiter::TCompDiskOperator::AskResource(std::make_shared(writeController, TabletID())); } } } else { diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp index dfac05657e2c..4f48bc21c245 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp @@ -22,6 +22,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T for (auto&& b : portionWithBlobs.GetBlobs()) { auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId()))); b.RegisterBlobId(portionWithBlobs, task.GetBlobId()); + WriteVolume += b.GetSize(); } } } diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h index eb8ef262bb97..99eb6f29961d 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h @@ -14,11 +14,17 @@ class TCompactedWriteController : public NColumnShard::IWriteController { private: TAutoPtr WriteIndexEv; TActorId DstActor; + ui64 WriteVolume = 0; + protected: void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; virtual void DoAbort(const TString& reason) override; + public: const TBlobsAction& GetBlobsAction(); + ui64 GetWriteVolume() const { + return WriteVolume; + } TCompactedWriteController(const TActorId& dstActor, TAutoPtr writeEv); ~TCompactedWriteController(); diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp index c51b7e2b64e9..89966da43ae8 100644 --- a/ydb/core/tx/conveyor/service/service.cpp +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -30,11 +30,13 @@ void TDistributor::Bootstrap() { } void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) { + const auto now = TMonotonic::Now(); + const TDuration dExecution = now - ev->Get()->GetStartInstant(); Counters.SolutionsRate->Inc(); - Counters.ExecuteHistogram->Collect((TMonotonic::Now() - ev->Get()->GetStartInstant()).MilliSeconds()); + Counters.ExecuteHistogram->Collect(dExecution.MilliSeconds()); if (Waiting.size()) { auto task = Waiting.pop(); - Counters.WaitingHistogram->Collect((TMonotonic::Now() - task.GetCreateInstant()).MilliSeconds()); + Counters.WaitingHistogram->Collect((ev->Get()->GetStartInstant() - task.GetCreateInstant()).MilliSeconds()); task.OnBeforeStart(); Send(ev->Sender, new TEvInternal::TEvNewTask(task)); } else { diff --git a/ydb/core/tx/limiter/service/service.cpp b/ydb/core/tx/limiter/service/service.cpp new file mode 100644 index 000000000000..edbce04b7256 --- /dev/null +++ b/ydb/core/tx/limiter/service/service.cpp @@ -0,0 +1,63 @@ +#include "service.h" + +namespace NKikimr::NLimiter { + +TLimiterActor::TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters) + : LimiterName(limiterName) + , Config(config) + , Counters(LimiterName, baseCounters) +{ + Counters.InProgressLimit->Set(Config.GetLimit()); +} + +void TLimiterActor::HandleMain(TEvExternal::TEvAskResource::TPtr& ev) { + const auto now = TMonotonic::Now(); + if (RequestsInFlight.empty() || VolumeInFlight + ev->Get()->GetRequest()->GetVolume() <= Config.GetLimit()) { + VolumeInFlight += ev->Get()->GetRequest()->GetVolume(); + RequestsInFlight.emplace_back(now, ev->Get()->GetRequest()->GetVolume()); + if (RequestsInFlight.size() == 1) { + Schedule(now + Config.GetPeriod(), new NActors::TEvents::TEvWakeup()); + } + ev->Get()->GetRequest()->OnResourceAllocated(); + Counters.InProgressStart->Inc(); + } else { + RequestsQueue.emplace_back(now, ev->Get()->GetRequest()); + VolumeInWaiting += ev->Get()->GetRequest()->GetVolume(); + } + Counters.InProgressCount->Set(RequestsInFlight.size()); + Counters.InProgressVolume->Set(VolumeInFlight); + Counters.WaitingQueueCount->Set(RequestsQueue.size()); + Counters.WaitingQueueVolume->Set(VolumeInWaiting); +} + +void TLimiterActor::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) { + const auto now = TMonotonic::Now(); + AFL_VERIFY(RequestsInFlight.size()); + while (RequestsInFlight.size() && RequestsInFlight.front().GetInstant() + Config.GetPeriod() <= now) { + AFL_VERIFY(RequestsInFlight.front().GetVolume() <= VolumeInFlight); + VolumeInFlight = VolumeInFlight - RequestsInFlight.front().GetVolume(); + RequestsInFlight.pop_front(); + } + if (RequestsInFlight.empty()) { + AFL_VERIFY(!VolumeInFlight); + } + while (RequestsQueue.size() && (RequestsInFlight.empty() || VolumeInFlight + RequestsQueue.front().GetRequest()->GetVolume() <= Config.GetLimit())) { + Counters.WaitingHistogram->Collect((i64)(now - RequestsQueue.front().GetInstant()).MilliSeconds(), 1); + VolumeInFlight += RequestsQueue.front().GetRequest()->GetVolume(); + RequestsInFlight.emplace_back(now, RequestsQueue.front().GetRequest()->GetVolume()); + RequestsQueue.front().GetRequest()->OnResourceAllocated(); + AFL_VERIFY(VolumeInWaiting >= RequestsQueue.front().GetRequest()->GetVolume()); + VolumeInWaiting -= RequestsQueue.front().GetRequest()->GetVolume(); + RequestsQueue.pop_front(); + Counters.InProgressStart->Inc(); + } + if (RequestsInFlight.size()) { + Schedule(RequestsInFlight.front().GetInstant() + Config.GetPeriod(), new NActors::TEvents::TEvWakeup()); + } + Counters.InProgressCount->Set(RequestsInFlight.size()); + Counters.InProgressVolume->Set(VolumeInFlight); + Counters.WaitingQueueCount->Set(RequestsQueue.size()); + Counters.WaitingQueueVolume->Set(VolumeInWaiting); +} + +} diff --git a/ydb/core/tx/limiter/service/service.h b/ydb/core/tx/limiter/service/service.h new file mode 100644 index 000000000000..32e9c45faca5 --- /dev/null +++ b/ydb/core/tx/limiter/service/service.h @@ -0,0 +1,101 @@ +#pragma once +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +namespace NKikimr::NLimiter { + +class TCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; +public: + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueCount; + const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueVolume; + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressLimit; + + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressCount; + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressVolume; + + const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressStart; + + const ::NMonitoring::THistogramPtr WaitingHistogram; + + TCounters(const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals) + : TBase("Limiter/" + limiterName, baseSignals) + , WaitingQueueCount(TBase::GetValue("WaitingQueue/Count")) + , WaitingQueueVolume(TBase::GetValue("WaitingQueue/Volume")) + , InProgressLimit(TBase::GetValue("InProgress/Limit/Volume")) + , InProgressCount(TBase::GetValue("InProgress/Count")) + , InProgressVolume(TBase::GetValue("InProgress/Volume")) + , InProgressStart(TBase::GetDeriviative("InProgress")) + , WaitingHistogram(TBase::GetHistogram("Waiting", NMonitoring::ExponentialHistogram(20, 2))) { + } +}; + +class TLimiterActor: public NActors::TActorBootstrapped { +private: + const TString LimiterName; + const TConfig Config; + TCounters Counters; + class TResourceRequest { + private: + YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero()); + YDB_READONLY_DEF(std::shared_ptr, Request); + public: + TResourceRequest(const TMonotonic instant, const std::shared_ptr& req) + : Instant(instant) + , Request(req) { + + } + }; + + class TResourceRequestInFlight { + private: + YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero()); + YDB_READONLY(ui64, Volume, 0); + public: + TResourceRequestInFlight(const TMonotonic instant, const ui64 volume) + : Instant(instant) + , Volume(volume) { + + } + }; + + ui64 VolumeInFlight = 0; + ui64 VolumeInWaiting = 0; + std::deque RequestsQueue; + std::deque RequestsInFlight; + + void HandleMain(TEvExternal::TEvAskResource::TPtr& ev); + void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev); + +public: + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternal::TEvAskResource, HandleMain); + hFunc(NActors::TEvents::TEvWakeup, HandleMain); + default: + AFL_ERROR(NKikimrServices::TX_LIMITER)("limiter", LimiterName)("problem", "unexpected event")("type", ev->GetTypeRewrite()); + AFL_VERIFY_DEBUG(false)("type", ev->GetTypeRewrite()); + break; + } + } + + TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters); + + void Bootstrap() { + Become(&TLimiterActor::StateMain); + } +}; + +} diff --git a/ydb/core/tx/limiter/service/ya.make b/ydb/core/tx/limiter/service/ya.make new file mode 100644 index 000000000000..5560098a6b5a --- /dev/null +++ b/ydb/core/tx/limiter/service/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + service.cpp +) + +PEERDIR( + ydb/core/tx/limiter/usage + ydb/core/protos +) + +END() diff --git a/ydb/core/tx/limiter/usage/abstract.cpp b/ydb/core/tx/limiter/usage/abstract.cpp new file mode 100644 index 000000000000..724f95db7f2f --- /dev/null +++ b/ydb/core/tx/limiter/usage/abstract.cpp @@ -0,0 +1,5 @@ +#include "abstract.h" + +namespace NKikimr::NLimiter { + +} diff --git a/ydb/core/tx/limiter/usage/abstract.h b/ydb/core/tx/limiter/usage/abstract.h new file mode 100644 index 000000000000..ff3e460a771c --- /dev/null +++ b/ydb/core/tx/limiter/usage/abstract.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace NKikimr::NLimiter { +class IResourceRequest { +private: + YDB_READONLY(ui64, Volume, 0); + virtual void DoOnResourceAllocated() = 0; +public: + void OnResourceAllocated() { + return DoOnResourceAllocated(); + } + + virtual ~IResourceRequest() = default; + + IResourceRequest(const ui64 volume) + : Volume(volume) + { + + } +}; + +} diff --git a/ydb/core/tx/limiter/usage/config.cpp b/ydb/core/tx/limiter/usage/config.cpp new file mode 100644 index 000000000000..a4448ea10320 --- /dev/null +++ b/ydb/core/tx/limiter/usage/config.cpp @@ -0,0 +1,12 @@ +#include "config.h" +#include + +namespace NKikimr::NLimiter { + +TString TConfig::DebugString() const { + TStringBuilder sb; + sb << "Period=" << Period << ";Limit=" << Limit << ";Enabled=" << EnabledFlag << ";"; + return sb; +} + +} diff --git a/ydb/core/tx/limiter/usage/config.h b/ydb/core/tx/limiter/usage/config.h new file mode 100644 index 000000000000..434f66f2d485 --- /dev/null +++ b/ydb/core/tx/limiter/usage/config.h @@ -0,0 +1,37 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NLimiter { + +class TConfig { +private: + YDB_READONLY(TDuration, Period, TDuration::Seconds(1)); + YDB_READONLY(ui64, Limit, 0); + YDB_READONLY_FLAG(Enabled, true); +public: + template + bool DeserializeFromProto(const NKikimrConfig::TLimiterConfig& config) { + if (config.HasPeriodMilliSeconds()) { + Period = TDuration::MilliSeconds(config.GetPeriodMilliSeconds()); + } else { + Period = TPolicy::DefaultPeriod; + } + if (config.HasLimit()) { + Limit = config.GetLimit(); + } else { + Limit = TPolicy::DefaultLimit; + } + if (config.HasEnabled()) { + EnabledFlag = config.GetEnabled(); + } else { + EnabledFlag = TPolicy::DefaultEnabled; + } + return true; + } + + TString DebugString() const; +}; + +} diff --git a/ydb/core/tx/limiter/usage/events.cpp b/ydb/core/tx/limiter/usage/events.cpp new file mode 100644 index 000000000000..bf9582f2b302 --- /dev/null +++ b/ydb/core/tx/limiter/usage/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NLimiter { + +} diff --git a/ydb/core/tx/limiter/usage/events.h b/ydb/core/tx/limiter/usage/events.h new file mode 100644 index 000000000000..8b5b3a248e65 --- /dev/null +++ b/ydb/core/tx/limiter/usage/events.h @@ -0,0 +1,30 @@ +#pragma once +#include "abstract.h" +#include +#include +#include +#include + +namespace NKikimr::NLimiter { + +struct TEvExternal { + enum EEv { + EvAskResource = EventSpaceBegin(TKikimrEvents::ES_LIMITER), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_LIMITER), "expected EvEnd < EventSpaceEnd"); + + class TEvAskResource: public NActors::TEventLocal { + private: + YDB_READONLY_DEF(std::shared_ptr, Request); + public: + TEvAskResource() = default; + + explicit TEvAskResource(const std::shared_ptr& request) + : Request(request) { + } + }; +}; + +} diff --git a/ydb/core/tx/limiter/usage/service.cpp b/ydb/core/tx/limiter/usage/service.cpp new file mode 100644 index 000000000000..c4e435d65f40 --- /dev/null +++ b/ydb/core/tx/limiter/usage/service.cpp @@ -0,0 +1,5 @@ +#include "service.h" + +namespace NKikimr::NConveyor { + +} diff --git a/ydb/core/tx/limiter/usage/service.h b/ydb/core/tx/limiter/usage/service.h new file mode 100644 index 000000000000..b0b435fa802f --- /dev/null +++ b/ydb/core/tx/limiter/usage/service.h @@ -0,0 +1,58 @@ +#pragma once +#include "config.h" +#include +#include +#include +#include + +namespace NKikimr::NLimiter { + +template +class TServiceOperatorImpl { +private: + using TSelf = TServiceOperatorImpl; + std::atomic IsEnabledFlag = false; + static void Register(const TConfig& serviceConfig) { + Singleton()->IsEnabledFlag = serviceConfig.IsEnabled() && serviceConfig.GetLimit(); + } + static const TString& GetLimiterName() { + Y_ABORT_UNLESS(TLimiterPolicy::Name.size() == 4); + return TLimiterPolicy::Name; + } +public: + static bool AskResource(const std::shared_ptr& request) { + AFL_VERIFY(!!request); + auto& context = NActors::TActorContext::AsActorContext(); + const NActors::TActorId& selfId = context.SelfID; + if (TSelf::IsEnabled()) { + context.Send(MakeServiceId(selfId.NodeId()), new TEvExternal::TEvAskResource(request)); + return true; + } else { + request->OnResourceAllocated(); + return false; + } + } + static bool IsEnabled() { + return Singleton()->IsEnabledFlag; + } + static NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcLimt" + GetLimiterName()); + } + static NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals) { + Register(config); + return new TLimiterActor(config, GetLimiterName(), baseSignals); + } + +}; + +class TCompDiskLimiterPolicy { +public: + static const inline TString Name = "CMPD"; + static const inline TDuration DefaultPeriod = TDuration::Seconds(1); + static const inline ui64 DefaultLimit = (ui64)256 * 1024 * 1024; + static const inline bool DefaultEnabled = true; +}; + +using TCompDiskOperator = TServiceOperatorImpl; + +} diff --git a/ydb/core/tx/limiter/usage/ya.make b/ydb/core/tx/limiter/usage/ya.make new file mode 100644 index 000000000000..28d0c6f8eead --- /dev/null +++ b/ydb/core/tx/limiter/usage/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + events.cpp + config.cpp + abstract.cpp + service.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/services/metadata/request +) + +END() diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 0519bab8b387..50f491020c0d 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -376,6 +376,7 @@ enum EServiceKikimr { EXT_INDEX = 1900; TX_CONVEYOR = 2000; + TX_LIMITER=2001; ARROW_HELPER = 2100; KAFKA_PROXY = 2200;