diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h index 3dba02ca9456..33d977e08903 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h @@ -1,6 +1,7 @@ #pragma once +#include #include #include #include @@ -55,6 +56,18 @@ namespace NYql::NDq { } } + void TrySetIncreaseMemoryLimitCallbackWithRSSControl(NKikimr::NMiniKQL::TScopedAlloc* alloc) { + if (CanAllocateExtraMemory) { + alloc->Ref().SetIncreaseMemoryLimitCallback([this, alloc](ui64 limit, ui64 required) { + RequestExtraMemory(required - limit, alloc); + auto currentRSS = NMemInfo::GetMemInfo().RSS; + if (currentRSS > 10_GB) { + alloc->SetMaximumLimitValueReached(true); + } + }); + } + } + void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) { if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) { alloc->ReleaseFreePages(); @@ -113,6 +126,7 @@ namespace NYql::NDq { private: void RequestExtraMemory(ui64 memory, NKikimr::NMiniKQL::TScopedAlloc* alloc) { + alloc->GetAllocated(); memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize); if (MemoryLimits.MkqlProgramHardMemoryLimit && MkqlMemoryLimit + memory > MemoryLimits.MkqlProgramHardMemoryLimit) { diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index c9fb46ad2d7c..a05960f1aa90 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -423,7 +423,11 @@ class TLocalTaskRunnerActor auto guard = TaskRunner->BindAllocator(MemoryQuota ? TMaybe(MemoryQuota->GetMkqlMemoryLimit()) : Nothing()); if (MemoryQuota) { - MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex()); + if (settings.GetEnableSpilling()) { + MemoryQuota->TrySetIncreaseMemoryLimitCallbackWithRSSControl(guard.GetMutex()); + } else { + MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex()); + } } TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx); diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index c63b5a486925..b3d3d620bb8c 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -213,6 +213,10 @@ class TScopedAlloc { Release(); } + void SetMaximumLimitValueReached(bool IsReached) { + MyState_.SetMaximumLimitValueReached(IsReached); + } + private: const bool InitiallyAcquired_; TAllocState MyState_;