Skip to content

Commit

Permalink
disk usage limit for CS compaction (in general slider limit) (ydb-pla…
Browse files Browse the repository at this point in the history
…tform#4864)

Conflicts:
	ydb/core/base/events.h
	ydb/core/driver_lib/run/service_mask.h
	ydb/core/protos/config.proto
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jun 18, 2024
1 parent 0a7bc1e commit 19e4fbb
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 4 deletions.
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ struct TKikimrEvents : TEvents {
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
ES_LIMITER
};
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ union TBasicKikimrServicesMask {

bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
bool EnableCompDiskLimiter:1;
};

struct {
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,12 @@
#include <ydb/services/metadata/ds_table/service.h>
#include <ydb/services/metadata/service.h>

#include <ydb/core/tx/conveyor/usage/config.h>
#include <ydb/core/tx/conveyor/service/service.h>
#include <ydb/core/tx/conveyor/usage/config.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/service/service.h>
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/service.h>

#include <ydb/services/bg_tasks/ds_table/executor.h>
#include <ydb/services/bg_tasks/service.h>
Expand Down Expand Up @@ -2080,6 +2083,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}

TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}

void TCompDiskLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NLimiter::TConfig serviceConfig;
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto<NLimiter::TCompDiskLimiterPolicy>(Config.GetCompDiskLimiterConfig()));

if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_COMP_DISK_LIMITER");

auto service = NLimiter::TCompDiskOperator::CreateService(serviceConfig, countersGroup);

setup->LocalServices.push_back(std::make_pair(
NLimiter::TCompDiskOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}

TCompConveyorInitializer::TCompConveyorInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ class TKqpServiceInitializer : public IKikimrServicesInitializer {
IGlobalObjectStorage& GlobalObjects;
};

class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
public:
TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
}

if (serviceMask.EnableCompDiskLimiter) {
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
}

if (serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ PEERDIR(
ydb/core/tx/columnshard
ydb/core/tx/coordinator
ydb/core/tx/conveyor/service
ydb/core/tx/limiter/service
ydb/core/tx/datashard
ydb/core/tx/long_tx_service
ydb/core/tx/long_tx_service/public
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,12 @@ message TConveyorConfig {
optional double WorkersCountDouble = 5;
}

message TLimiterConfig {
optional bool Enabled = 1 [default = true];
optional uint64 Limit = 2;
optional uint64 PeriodMilliSeconds = 3 [default = 1000];
}

message TExternalIndexConfig {
optional bool Enabled = 1 [default = true];
optional TInternalRequestConfig RequestConfig = 2;
Expand Down Expand Up @@ -1802,6 +1808,7 @@ message TAppConfig {
optional bool AllowEditYamlInUi = 75;
optional TS3ProxyResolverConfig S3ProxyResolverConfig = 76;
optional TBackgroundCleaningConfig BackgroundCleaningConfig = 77;
optional TLimiterConfig CompDiskLimiterConfig = 79;

repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
Expand Down
26 changes: 25 additions & 1 deletion ydb/core/tx/columnshard/columnshard__write_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,34 @@
#include "engines/changes/abstract/abstract.h"
#include "engines/writer/compacted_blob_constructor.h"

#include <ydb/core/tx/limiter/usage/abstract.h>
#include <ydb/core/tx/limiter/usage/service.h>

#include <ydb/library/actors/core/log.h>

namespace NKikimr::NColumnShard {

class TDiskResourcesRequest: public NLimiter::IResourceRequest {
private:
using TBase = NLimiter::IResourceRequest;
std::shared_ptr<NOlap::TCompactedWriteController> WriteController;
const ui64 TabletId;

private:
virtual void DoOnResourceAllocated() override {
NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max()));
}

public:
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId)
: TBase(writeController->GetWriteVolume())
, WriteController(writeController)
, TabletId(tabletId)
{

}
};

void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
auto putStatus = ev->Get()->GetPutStatus();

Expand All @@ -32,7 +56,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
if (*needDraftTransaction) {
Execute(new TTxWriteDraft(this, writeController));
} else {
ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID()));
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T
for (auto&& b : portionWithBlobs.GetBlobs()) {
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId())));
b.RegisterBlobId(portionWithBlobs, task.GetBlobId());
WriteVolume += b.GetSize();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ class TCompactedWriteController : public NColumnShard::IWriteController {
private:
TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv;
TActorId DstActor;
ui64 WriteVolume = 0;

protected:
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
virtual void DoAbort(const TString& reason) override;

public:
const TBlobsAction& GetBlobsAction();
ui64 GetWriteVolume() const {
return WriteVolume;
}

TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv);
~TCompactedWriteController();
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/conveyor/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ void TDistributor::Bootstrap() {
}

void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
const auto now = TMonotonic::Now();
const TDuration dExecution = now - ev->Get()->GetStartInstant();
Counters.SolutionsRate->Inc();
Counters.ExecuteHistogram->Collect((TMonotonic::Now() - ev->Get()->GetStartInstant()).MilliSeconds());
Counters.ExecuteHistogram->Collect(dExecution.MilliSeconds());
if (Waiting.size()) {
auto task = Waiting.pop();
Counters.WaitingHistogram->Collect((TMonotonic::Now() - task.GetCreateInstant()).MilliSeconds());
Counters.WaitingHistogram->Collect((ev->Get()->GetStartInstant() - task.GetCreateInstant()).MilliSeconds());
task.OnBeforeStart();
Send(ev->Sender, new TEvInternal::TEvNewTask(task));
} else {
Expand Down
63 changes: 63 additions & 0 deletions ydb/core/tx/limiter/service/service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "service.h"

namespace NKikimr::NLimiter {

TLimiterActor::TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters)
: LimiterName(limiterName)
, Config(config)
, Counters(LimiterName, baseCounters)
{
Counters.InProgressLimit->Set(Config.GetLimit());
}

void TLimiterActor::HandleMain(TEvExternal::TEvAskResource::TPtr& ev) {
const auto now = TMonotonic::Now();
if (RequestsInFlight.empty() || VolumeInFlight + ev->Get()->GetRequest()->GetVolume() <= Config.GetLimit()) {
VolumeInFlight += ev->Get()->GetRequest()->GetVolume();
RequestsInFlight.emplace_back(now, ev->Get()->GetRequest()->GetVolume());
if (RequestsInFlight.size() == 1) {
Schedule(now + Config.GetPeriod(), new NActors::TEvents::TEvWakeup());
}
ev->Get()->GetRequest()->OnResourceAllocated();
Counters.InProgressStart->Inc();
} else {
RequestsQueue.emplace_back(now, ev->Get()->GetRequest());
VolumeInWaiting += ev->Get()->GetRequest()->GetVolume();
}
Counters.InProgressCount->Set(RequestsInFlight.size());
Counters.InProgressVolume->Set(VolumeInFlight);
Counters.WaitingQueueCount->Set(RequestsQueue.size());
Counters.WaitingQueueVolume->Set(VolumeInWaiting);
}

void TLimiterActor::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) {
const auto now = TMonotonic::Now();
AFL_VERIFY(RequestsInFlight.size());
while (RequestsInFlight.size() && RequestsInFlight.front().GetInstant() + Config.GetPeriod() <= now) {
AFL_VERIFY(RequestsInFlight.front().GetVolume() <= VolumeInFlight);
VolumeInFlight = VolumeInFlight - RequestsInFlight.front().GetVolume();
RequestsInFlight.pop_front();
}
if (RequestsInFlight.empty()) {
AFL_VERIFY(!VolumeInFlight);
}
while (RequestsQueue.size() && (RequestsInFlight.empty() || VolumeInFlight + RequestsQueue.front().GetRequest()->GetVolume() <= Config.GetLimit())) {
Counters.WaitingHistogram->Collect((i64)(now - RequestsQueue.front().GetInstant()).MilliSeconds(), 1);
VolumeInFlight += RequestsQueue.front().GetRequest()->GetVolume();
RequestsInFlight.emplace_back(now, RequestsQueue.front().GetRequest()->GetVolume());
RequestsQueue.front().GetRequest()->OnResourceAllocated();
AFL_VERIFY(VolumeInWaiting >= RequestsQueue.front().GetRequest()->GetVolume());
VolumeInWaiting -= RequestsQueue.front().GetRequest()->GetVolume();
RequestsQueue.pop_front();
Counters.InProgressStart->Inc();
}
if (RequestsInFlight.size()) {
Schedule(RequestsInFlight.front().GetInstant() + Config.GetPeriod(), new NActors::TEvents::TEvWakeup());
}
Counters.InProgressCount->Set(RequestsInFlight.size());
Counters.InProgressVolume->Set(VolumeInFlight);
Counters.WaitingQueueCount->Set(RequestsQueue.size());
Counters.WaitingQueueVolume->Set(VolumeInWaiting);
}

}
101 changes: 101 additions & 0 deletions ydb/core/tx/limiter/service/service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#pragma once
#include <ydb/core/tx/columnshard/counters/common/owner.h>
#include <ydb/core/tx/limiter/usage/abstract.h>
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/events.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/accessor/accessor.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>

#include <queue>

namespace NKikimr::NLimiter {

class TCounters: public NColumnShard::TCommonCountersOwner {
private:
using TBase = NColumnShard::TCommonCountersOwner;
public:
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueCount;
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueVolume;
const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressLimit;

const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressCount;
const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressVolume;

const ::NMonitoring::TDynamicCounters::TCounterPtr InProgressStart;

const ::NMonitoring::THistogramPtr WaitingHistogram;

TCounters(const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals)
: TBase("Limiter/" + limiterName, baseSignals)
, WaitingQueueCount(TBase::GetValue("WaitingQueue/Count"))
, WaitingQueueVolume(TBase::GetValue("WaitingQueue/Volume"))
, InProgressLimit(TBase::GetValue("InProgress/Limit/Volume"))
, InProgressCount(TBase::GetValue("InProgress/Count"))
, InProgressVolume(TBase::GetValue("InProgress/Volume"))
, InProgressStart(TBase::GetDeriviative("InProgress"))
, WaitingHistogram(TBase::GetHistogram("Waiting", NMonitoring::ExponentialHistogram(20, 2))) {
}
};

class TLimiterActor: public NActors::TActorBootstrapped<TLimiterActor> {
private:
const TString LimiterName;
const TConfig Config;
TCounters Counters;
class TResourceRequest {
private:
YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero());
YDB_READONLY_DEF(std::shared_ptr<IResourceRequest>, Request);
public:
TResourceRequest(const TMonotonic instant, const std::shared_ptr<IResourceRequest>& req)
: Instant(instant)
, Request(req) {

}
};

class TResourceRequestInFlight {
private:
YDB_READONLY(TMonotonic, Instant, TMonotonic::Zero());
YDB_READONLY(ui64, Volume, 0);
public:
TResourceRequestInFlight(const TMonotonic instant, const ui64 volume)
: Instant(instant)
, Volume(volume) {

}
};

ui64 VolumeInFlight = 0;
ui64 VolumeInWaiting = 0;
std::deque<TResourceRequest> RequestsQueue;
std::deque<TResourceRequestInFlight> RequestsInFlight;

void HandleMain(TEvExternal::TEvAskResource::TPtr& ev);
void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);

public:

STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternal::TEvAskResource, HandleMain);
hFunc(NActors::TEvents::TEvWakeup, HandleMain);
default:
AFL_ERROR(NKikimrServices::TX_LIMITER)("limiter", LimiterName)("problem", "unexpected event")("type", ev->GetTypeRewrite());
AFL_VERIFY_DEBUG(false)("type", ev->GetTypeRewrite());
break;
}
}

TLimiterActor(const TConfig& config, const TString& limiterName, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseCounters);

void Bootstrap() {
Become(&TLimiterActor::StateMain);
}
};

}
12 changes: 12 additions & 0 deletions ydb/core/tx/limiter/service/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()

SRCS(
service.cpp
)

PEERDIR(
ydb/core/tx/limiter/usage
ydb/core/protos
)

END()
5 changes: 5 additions & 0 deletions ydb/core/tx/limiter/usage/abstract.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "abstract.h"

namespace NKikimr::NLimiter {

}
Loading

0 comments on commit 19e4fbb

Please sign in to comment.