Skip to content

Commit

Permalink
fraction cpu usage in conveyors
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed May 27, 2024
1 parent 0342dc0 commit af1f1ad
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 14 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ message TConveyorConfig {
optional uint32 WorkersCount = 2;
optional uint32 QueueSizeLimit = 3;
optional double DefaultFractionOfThreadsCount = 4;
optional double WorkersCountDouble = 5;
}

message TExternalIndexConfig {
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/tx/conveyor/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ void TDistributor::Bootstrap() {
const ui32 workersCount = Config.GetWorkersCountForConveyor(NKqp::TStagePredictor::GetUsableThreads());
AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("action", "conveyor_registered")("actor_id", SelfId())("workers_count", workersCount)("config", Config.DebugString());
for (ui32 i = 0; i < workersCount; ++i) {
Workers.emplace_back(Register(new TWorker(ConveyorName)));
const double usage = Config.GetWorkerCPUUsage(i);
Workers.emplace_back(Register(new TWorker(ConveyorName, usage, SelfId())));
if (usage < 1) {
AFL_VERIFY(!SlowWorkerId);
SlowWorkerId = Workers.back();
}
}
Counters.AvailableWorkersCount->Set(Workers.size());
Counters.WorkersCountLimit->Set(Workers.size());
Expand Down Expand Up @@ -62,8 +67,13 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
Counters.WaitingHistogram->Collect(0);

wTask.OnBeforeStart();
Send(Workers.back(), new TEvInternal::TEvNewTask(wTask));
Workers.pop_back();
if (Workers.size() == 1 || !SlowWorkerId || Workers.back() != *SlowWorkerId) {
Send(Workers.back(), new TEvInternal::TEvNewTask(wTask));
Workers.pop_back();
} else {
Send(Workers.front(), new TEvInternal::TEvNewTask(wTask));
Workers.pop_front();
}
Counters.UseWorkerRate->Inc();
} else if (Waiting.size() < Config.GetQueueSizeLimit()) {
Waiting.push(wTask);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/conveyor/service/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class TDistributor: public TActorBootstrapped<TDistributor> {
const TConfig Config;
const TString ConveyorName = "common";
TDequePriorityFIFO Waiting;
std::vector<TActorId> Workers;
std::deque<TActorId> Workers;
std::optional<NActors::TActorId> SlowWorkerId;
TCounters Counters;
THashMap<TString, std::shared_ptr<TTaskSignals>> Signals;

Expand Down
31 changes: 27 additions & 4 deletions ydb/core/tx/conveyor/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,35 @@

namespace NKikimr::NConveyor {

void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
auto& workerTask = ev->Get()->GetTask();
void TWorker::ExecuteTask(const TWorkerTask& workerTask) {
std::optional<TMonotonic> start;
if (CPUUsage < 1) {
start = TMonotonic::Now();
}
if (workerTask.GetTask()->Execute(workerTask.GetTaskSignals())) {
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()).SendTo(ev->Sender);
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()).SendTo(DistributorId);
} else {
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()->GetErrorMessage()).SendTo(DistributorId);
}
if (CPUUsage < 1) {
Schedule((TMonotonic::Now() - *start) * (1 - CPUUsage), new NActors::TEvents::TEvWakeup);
WaitWakeUp = true;
}
}

void TWorker::HandleMain(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) {
WaitWakeUp = false;
if (WaitTask) {
ExecuteTask(*WaitTask);
WaitTask.reset();
}
}

void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
if (!WaitWakeUp) {
ExecuteTask(ev->Get()->GetTask());
} else {
TBase::Sender<TEvInternal::TEvTaskProcessedResult>(workerTask, workerTask.GetTask()->GetErrorMessage()).SendTo(ev->Sender);
WaitTask = ev->Get()->GetTask();
}
}

Expand Down
15 changes: 12 additions & 3 deletions ydb/core/tx/conveyor/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,20 @@ struct TEvInternal {
class TWorker: public NActors::TActorBootstrapped<TWorker> {
private:
using TBase = NActors::TActorBootstrapped<TWorker>;
public:
const double CPUUsage = 1;
bool WaitWakeUp = false;
const NActors::TActorId DistributorId;
std::optional<TWorkerTask> WaitTask;
void ExecuteTask(const TWorkerTask& workerTask);
void HandleMain(TEvInternal::TEvNewTask::TPtr& ev);
void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
public:

STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvInternal::TEvNewTask, HandleMain);
default:
hFunc(NActors::TEvents::TEvWakeup, HandleMain);
default:
ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite();
break;
}
Expand All @@ -107,8 +114,10 @@ class TWorker: public NActors::TActorBootstrapped<TWorker> {
Become(&TWorker::StateMain);
}

TWorker(const TString& conveyorName)
TWorker(const TString& conveyorName, const double cpuUsage, const NActors::TActorId& distributorId)
: TBase("CONVEYOR::" + conveyorName + "::WORKER")
, CPUUsage(cpuUsage)
, DistributorId(distributorId)
{

}
Expand Down
19 changes: 17 additions & 2 deletions ydb/core/tx/conveyor/usage/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config)
if (config.HasQueueSizeLimit()) {
QueueSizeLimit = config.GetQueueSizeLimit();
}
if (config.HasWorkersCount()) {
if (config.HasWorkersCountDouble()) {
WorkersCount = config.GetWorkersCountDouble();
} else if (config.HasWorkersCount()) {
WorkersCount = config.GetWorkersCount();
}
if (config.HasDefaultFractionOfThreadsCount()) {
Expand All @@ -23,7 +25,7 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config)

ui32 TConfig::GetWorkersCountForConveyor(const ui32 poolThreadsCount) const {
if (WorkersCount) {
return *WorkersCount;
return std::ceil(*WorkersCount);
} else if (DefaultFractionOfThreadsCount) {
return Min<ui32>(poolThreadsCount, Max<ui32>(1, *DefaultFractionOfThreadsCount * poolThreadsCount));
} else {
Expand All @@ -44,4 +46,17 @@ TString TConfig::DebugString() const {
return sb;
}

double TConfig::GetWorkerCPUUsage(const ui32 workerIdx) const {
if (!WorkersCount) {
return 1;
}
double wholePart;
const double fractionalPart = std::modf(*WorkersCount, &wholePart);
if (workerIdx + 1 <= wholePart) {
return 1;
} else {
return fractionalPart;
}
}

}
3 changes: 2 additions & 1 deletion ydb/core/tx/conveyor/usage/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ namespace NKikimr::NConveyor {

class TConfig {
private:
YDB_OPT(ui32, WorkersCount);
YDB_OPT(double, WorkersCount);
YDB_READONLY(ui32, QueueSizeLimit, 256 * 1024);
YDB_READONLY_FLAG(Enabled, true);
YDB_OPT(double, DefaultFractionOfThreadsCount);
public:
bool DeserializeFromProto(const NKikimrConfig::TConveyorConfig& config);
ui32 GetWorkersCountForConveyor(const ui32 poolThreadsCount) const;
double GetWorkerCPUUsage(const ui32 workerIdx) const;
TString DebugString() const;
};

Expand Down

0 comments on commit af1f1ad

Please sign in to comment.