Skip to content

Commit

Permalink
don't lock on zero memory change (#6926)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jul 24, 2024
1 parent bf2dd5d commit 4981a3b
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 365 deletions.
44 changes: 21 additions & 23 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,75 +12,71 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
, NRm::EKqpMemoryPool memoryPool
, std::shared_ptr<IKqpNodeState> state
, ui64 txId
, ui64 taskId
, TIntrusivePtr<NRm::TTxState> tx
, TIntrusivePtr<NRm::TTaskState> task
, ui64 limit
, ui64 reasonableSpillingTreshold)
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
, ResourceManager(std::move(resourceManager))
, MemoryPool(memoryPool)
, State(std::move(state))
, TxId(txId)
, TaskId(taskId)
, Tx(std::move(tx))
, Task(std::move(task))
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
{
}

~TMemoryQuotaManager() override {
if (State) {
State->OnTaskTerminate(TxId, TaskId, Success);
State->OnTaskTerminate(Tx->TxId, Task->TaskId, Success);
}

ResourceManager->FreeResources(TxId, TaskId);
ResourceManager->FreeResources(Tx, Task);
}

bool AllocateExtraQuota(ui64 extraSize) override {
auto result = ResourceManager->AllocateResources(TxId, TaskId,
auto result = ResourceManager->AllocateResources(Tx, Task,
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});

if (!result) {
AFL_WARN(NKikimrServices::KQP_COMPUTE)
("problem", "cannot_allocate_memory")
("tx_id", TxId)
("task_id", TaskId)
("tx_id", Tx->TxId)
("task_id", Task->TaskId)
("memory", extraSize);

return false;
}

TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory;

return true;
}

void FreeExtraQuota(ui64 extraSize) override {
ResourceManager->FreeResources(TxId, TaskId,
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
);
NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize};
ResourceManager->FreeResources(Tx, Task, Task->FitRequest(request));
}

bool IsReasonableToUseSpilling() const override {
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
}

TString MemoryConsumptionDetails() const override {
return ResourceManager->GetTxResourcesUsageDebugInfo(TxId);
return Tx->ToString();
}

void TerminateHandler(bool success, const NYql::TIssues& issues) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
("problem", "finish_compute_actor")
("tx_id", TxId)("task_id", TaskId)("success", success)("message", issues.ToOneLineString());
("tx_id", Tx->TxId)("task_id", Task->TaskId)("success", success)("message", issues.ToOneLineString());
Success = success;
}

std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
NRm::EKqpMemoryPool MemoryPool;
std::shared_ptr<IKqpNodeState> State;
ui64 TxId;
ui64 TaskId;
TIntrusivePtr<NRm::TTxState> Tx;
TIntrusivePtr<NRm::TTaskState> Task;
bool Success = true;
ui64 TotalQueryAllocationsSize = 0;
ui64 ReasonableSpillingTreshold = 0;
};

Expand Down Expand Up @@ -126,8 +122,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
resourcesRequest.ExecutionUnits = 1;
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit;

TIntrusivePtr<NRm::TTaskState> task = MakeIntrusive<NRm::TTaskState>(args.Task->GetId(), args.TxInfo->CreatedAt);

auto rmResult = ResourceManager_->AllocateResources(
args.TxId, args.Task->GetId(), resourcesRequest);
args.TxInfo, task, resourcesRequest);

if (!rmResult) {
return NRm::TKqpRMAllocateResult{rmResult};
Expand Down Expand Up @@ -158,8 +156,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
ResourceManager_,
args.MemoryPool,
std::move(args.State),
args.TxId,
args.Task->GetId(),
std::move(args.TxInfo),
std::move(task),
limit,
ReasonableSpillingTreshold.load());

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct IKqpNodeComputeActorFactory {
const NActors::TActorId& ExecuterId;
const ui64 TxId;
NYql::NDqProto::TDqTask* Task;
TIntrusivePtr<NRm::TTxState> TxInfo;
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
NWilson::TTraceId TraceId;
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,10 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
RmExternalMemory = KqpGroup->GetCounter("RM/ExternalMemory", false);
RmNotEnoughMemory = KqpGroup->GetCounter("RM/NotEnoughMemory", true);
RmNotEnoughComputeActors = KqpGroup->GetCounter("RM/NotEnoughComputeActors", true);
RmOnStartAllocs = KqpGroup->GetCounter("Rm/OnStartAllocs", true);
RmExtraMemAllocs = KqpGroup->GetCounter("RM/ExtraMemAllocs", true);
RmExtraMemFree = KqpGroup->GetCounter("RM/ExtraMemFree", true);
RmOnCompleteFree = KqpGroup->GetCounter("RM/OnCompleteFree", true);
RmInternalError = KqpGroup->GetCounter("RM/InternalError", true);
RmSnapshotLatency = KqpGroup->GetHistogram(
"RM/SnapshotLatency", NMonitoring::ExponentialHistogram(20, 2, 1));
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounterPtr WorkloadManagerGroup;

::NMonitoring::TDynamicCounters::TCounterPtr FullScansExecuted;

// Lease updates counters
::NMonitoring::THistogramPtr LeaseUpdateLatency;
::NMonitoring::THistogramPtr RunActorLeaseUpdateBacklog;
Expand All @@ -377,6 +377,9 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughMemory;
::NMonitoring::TDynamicCounters::TCounterPtr RmNotEnoughComputeActors;
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemAllocs;
::NMonitoring::TDynamicCounters::TCounterPtr RmOnStartAllocs;
::NMonitoring::TDynamicCounters::TCounterPtr RmExtraMemFree;
::NMonitoring::TDynamicCounters::TCounterPtr RmOnCompleteFree;
::NMonitoring::TDynamicCounters::TCounterPtr RmInternalError;
NMonitoring::THistogramPtr RmSnapshotLatency;
NMonitoring::THistogramPtr NodeServiceStartEventDelivery;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,16 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
auto& task = TasksGraph.GetTask(taskId);
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
NYql::NDq::TComputeRuntimeSettings settings;
if (!TxInfo) {
TxInfo = MakeIntrusive<NRm::TTxState>(
TxId, TInstant::Now(), ResourceManager_->GetCounters());
}

auto startResult = CaFactory_->CreateKqpComputeActor({
.ExecuterId = ExecuterId,
.TxId = TxId,
.Task = taskDesc,
.TxInfo = TxInfo,
.RuntimeSettings = settings,
.TraceId = NWilson::TTraceId(ExecuterSpan.GetTraceId()),
.Arena = TasksGraph.GetMeta().GetArenaIntrusivePtr(),
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class TKqpPlanner {
TString SerializedGUCSettings;
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory> CaFactory_;
TIntrusivePtr<NRm::TTxState> TxInfo;

public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
rlPath.ConstructInPlace(msgRtSettings.GetRlPath());
}

TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
txId, TInstant::Now(), ResourceManager_->GetCounters());

const ui32 tasksCount = msg.GetTasks().size();
for (auto& dqTask: *msg.MutableTasks()) {
auto result = CaFactory_->CreateKqpComputeActor({
.ExecuterId = request.Executer,
.TxId = txId,
.Task = &dqTask,
.TxInfo = txInfo,
.RuntimeSettings = runtimeSettingsBase,
.TraceId = NWilson::TTraceId(ev->TraceId),
.Arena = ev->Get()->Arena,
Expand Down
Loading

0 comments on commit 4981a3b

Please sign in to comment.