From af1f1adec91858503394e016964b549a3939649c Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 27 May 2024 08:33:55 +0300 Subject: [PATCH] fraction cpu usage in conveyors --- ydb/core/protos/config.proto | 1 + ydb/core/tx/conveyor/service/service.cpp | 16 +++++++++--- ydb/core/tx/conveyor/service/service.h | 3 ++- ydb/core/tx/conveyor/service/worker.cpp | 31 +++++++++++++++++++++--- ydb/core/tx/conveyor/service/worker.h | 15 +++++++++--- ydb/core/tx/conveyor/usage/config.cpp | 19 +++++++++++++-- ydb/core/tx/conveyor/usage/config.h | 3 ++- 7 files changed, 74 insertions(+), 14 deletions(-) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2293d861341e..ce550316d18d 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -591,6 +591,7 @@ message TConveyorConfig { optional uint32 WorkersCount = 2; optional uint32 QueueSizeLimit = 3; optional double DefaultFractionOfThreadsCount = 4; + optional double WorkersCountDouble = 5; } message TExternalIndexConfig { diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp index 0ab75a217394..c51b7e2b64e9 100644 --- a/ydb/core/tx/conveyor/service/service.cpp +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -16,7 +16,12 @@ void TDistributor::Bootstrap() { const ui32 workersCount = Config.GetWorkersCountForConveyor(NKqp::TStagePredictor::GetUsableThreads()); AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("action", "conveyor_registered")("actor_id", SelfId())("workers_count", workersCount)("config", Config.DebugString()); for (ui32 i = 0; i < workersCount; ++i) { - Workers.emplace_back(Register(new TWorker(ConveyorName))); + const double usage = Config.GetWorkerCPUUsage(i); + Workers.emplace_back(Register(new TWorker(ConveyorName, usage, SelfId()))); + if (usage < 1) { + AFL_VERIFY(!SlowWorkerId); + SlowWorkerId = Workers.back(); + } } Counters.AvailableWorkersCount->Set(Workers.size()); Counters.WorkersCountLimit->Set(Workers.size()); @@ -62,8 +67,13 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) { Counters.WaitingHistogram->Collect(0); wTask.OnBeforeStart(); - Send(Workers.back(), new TEvInternal::TEvNewTask(wTask)); - Workers.pop_back(); + if (Workers.size() == 1 || !SlowWorkerId || Workers.back() != *SlowWorkerId) { + Send(Workers.back(), new TEvInternal::TEvNewTask(wTask)); + Workers.pop_back(); + } else { + Send(Workers.front(), new TEvInternal::TEvNewTask(wTask)); + Workers.pop_front(); + } Counters.UseWorkerRate->Inc(); } else if (Waiting.size() < Config.GetQueueSizeLimit()) { Waiting.push(wTask); diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h index 9891e818a973..833952090905 100644 --- a/ydb/core/tx/conveyor/service/service.h +++ b/ydb/core/tx/conveyor/service/service.h @@ -75,7 +75,8 @@ class TDistributor: public TActorBootstrapped { const TConfig Config; const TString ConveyorName = "common"; TDequePriorityFIFO Waiting; - std::vector Workers; + std::deque Workers; + std::optional SlowWorkerId; TCounters Counters; THashMap> Signals; diff --git a/ydb/core/tx/conveyor/service/worker.cpp b/ydb/core/tx/conveyor/service/worker.cpp index a71e8dffda87..42c0b4e239d7 100644 --- a/ydb/core/tx/conveyor/service/worker.cpp +++ b/ydb/core/tx/conveyor/service/worker.cpp @@ -2,12 +2,35 @@ namespace NKikimr::NConveyor { -void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) { - auto& workerTask = ev->Get()->GetTask(); +void TWorker::ExecuteTask(const TWorkerTask& workerTask) { + std::optional start; + if (CPUUsage < 1) { + start = TMonotonic::Now(); + } if (workerTask.GetTask()->Execute(workerTask.GetTaskSignals())) { - TBase::Sender(workerTask, workerTask.GetTask()).SendTo(ev->Sender); + TBase::Sender(workerTask, workerTask.GetTask()).SendTo(DistributorId); + } else { + TBase::Sender(workerTask, workerTask.GetTask()->GetErrorMessage()).SendTo(DistributorId); + } + if (CPUUsage < 1) { + Schedule((TMonotonic::Now() - *start) * (1 - CPUUsage), new NActors::TEvents::TEvWakeup); + WaitWakeUp = true; + } +} + +void TWorker::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) { + WaitWakeUp = false; + if (WaitTask) { + ExecuteTask(*WaitTask); + WaitTask.reset(); + } +} + +void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) { + if (!WaitWakeUp) { + ExecuteTask(ev->Get()->GetTask()); } else { - TBase::Sender(workerTask, workerTask.GetTask()->GetErrorMessage()).SendTo(ev->Sender); + WaitTask = ev->Get()->GetTask(); } } diff --git a/ydb/core/tx/conveyor/service/worker.h b/ydb/core/tx/conveyor/service/worker.h index b87b5aacda40..0159f257c433 100644 --- a/ydb/core/tx/conveyor/service/worker.h +++ b/ydb/core/tx/conveyor/service/worker.h @@ -91,13 +91,20 @@ struct TEvInternal { class TWorker: public NActors::TActorBootstrapped { private: using TBase = NActors::TActorBootstrapped; -public: + const double CPUUsage = 1; + bool WaitWakeUp = false; + const NActors::TActorId DistributorId; + std::optional WaitTask; + void ExecuteTask(const TWorkerTask& workerTask); void HandleMain(TEvInternal::TEvNewTask::TPtr& ev); + void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev); +public: STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(TEvInternal::TEvNewTask, HandleMain); - default: + hFunc(NActors::TEvents::TEvWakeup, HandleMain); + default: ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite(); break; } @@ -107,8 +114,10 @@ class TWorker: public NActors::TActorBootstrapped { Become(&TWorker::StateMain); } - TWorker(const TString& conveyorName) + TWorker(const TString& conveyorName, const double cpuUsage, const NActors::TActorId& distributorId) : TBase("CONVEYOR::" + conveyorName + "::WORKER") + , CPUUsage(cpuUsage) + , DistributorId(distributorId) { } diff --git a/ydb/core/tx/conveyor/usage/config.cpp b/ydb/core/tx/conveyor/usage/config.cpp index d39f93a65e4e..2117e2d0870e 100644 --- a/ydb/core/tx/conveyor/usage/config.cpp +++ b/ydb/core/tx/conveyor/usage/config.cpp @@ -12,7 +12,9 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config) if (config.HasQueueSizeLimit()) { QueueSizeLimit = config.GetQueueSizeLimit(); } - if (config.HasWorkersCount()) { + if (config.HasWorkersCountDouble()) { + WorkersCount = config.GetWorkersCountDouble(); + } else if (config.HasWorkersCount()) { WorkersCount = config.GetWorkersCount(); } if (config.HasDefaultFractionOfThreadsCount()) { @@ -23,7 +25,7 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config) ui32 TConfig::GetWorkersCountForConveyor(const ui32 poolThreadsCount) const { if (WorkersCount) { - return *WorkersCount; + return std::ceil(*WorkersCount); } else if (DefaultFractionOfThreadsCount) { return Min(poolThreadsCount, Max(1, *DefaultFractionOfThreadsCount * poolThreadsCount)); } else { @@ -44,4 +46,17 @@ TString TConfig::DebugString() const { return sb; } +double TConfig::GetWorkerCPUUsage(const ui32 workerIdx) const { + if (!WorkersCount) { + return 1; + } + double wholePart; + const double fractionalPart = std::modf(*WorkersCount, &wholePart); + if (workerIdx + 1 <= wholePart) { + return 1; + } else { + return fractionalPart; + } +} + } diff --git a/ydb/core/tx/conveyor/usage/config.h b/ydb/core/tx/conveyor/usage/config.h index ed9b2c9d05d5..12ed6780a7f6 100644 --- a/ydb/core/tx/conveyor/usage/config.h +++ b/ydb/core/tx/conveyor/usage/config.h @@ -6,13 +6,14 @@ namespace NKikimr::NConveyor { class TConfig { private: - YDB_OPT(ui32, WorkersCount); + YDB_OPT(double, WorkersCount); YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024); YDB_READONLY_FLAG(Enabled, true); YDB_OPT(double, DefaultFractionOfThreadsCount); public: bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config); ui32 GetWorkersCountForConveyor(const ui32 poolThreadsCount) const; + double GetWorkerCPUUsage(const ui32 workerIdx) const; TString DebugString() const; };