Skip to content

Commit

Permalink
native memory control (#11559)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 13, 2024
1 parent f847a52 commit 7d51c2b
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 16 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ message TLimiterConfig {
message TGroupedMemoryLimiterConfig {
optional bool Enabled = 1 [default = true];
optional uint64 MemoryLimit = 2;
optional uint64 HardMemoryLimit = 3;
}

message TExternalIndexConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TReadContext {
const TActorId ResourceSubscribeActorId;
const TActorId ReadCoordinatorActorId;
const TComputeShardingPolicy ComputeShardingPolicy;
TAtomic AbortFlag = 0;

public:
template <class T>
Expand All @@ -61,6 +62,13 @@ class TReadContext {
return result;
}

void AbortWithError(const TString& errorMessage) {
if (AtomicCas(&AbortFlag, 1, 0)) {
NActors::TActivationContext::Send(
ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail(errorMessage)));
}
}

bool IsReverse() const {
return ReadMetadata->IsDescSorted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) {
}

void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
auto sourcePtr = Source.lock();
if (sourcePtr) {
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
}
}

TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class TAllocateMemoryStep: public IFetchingStep {
NColumnShard::TCounterGuard TasksGuard;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;

virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class TBaseMergeTask: public IDataTasksProcessor::ITask, public NGroupedMemoryMa
virtual bool DoApply(IDataReader& indexedDataRead) const override;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override {
Context->GetCommonContext()->AbortWithError("cannot allocate memory for merge task: '" + errorMessage + "'");
}

public:
TBaseMergeTask(const std::shared_ptr<TMergingContext>& mergingContext, const std::shared_ptr<TSpecialReadContext>& readContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation(
, AccessorsManager(context->GetDataAccessorsManager())
, Request(request)
, WaitingCountersGuard(context->GetCounters().GetFetcherAcessorsGuard())
, OwnerId(context->GetScanActorId()) {
, OwnerId(context->GetScanActorId())
, Context(context) {
}

void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage);
}

} // namespace NKikimr::NOlap::NReader::NSysView::NChunks
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,15 @@ class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema
std::shared_ptr<TDataAccessorsRequest> Request;
NColumnShard::TCounterGuard WaitingCountersGuard;
const NActors::TActorId OwnerId;
const std::shared_ptr<NReader::TReadContext> Context;

virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*selfPtr*/) override {
Guard = std::move(guard);
AccessorsManager->AskData(std::move(Request));
return true;
}
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;

virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
if (result.HasErrors()) {
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/service/allocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ class TAllocationInfo: public NColumnShard::TMonitoringObjectsCounter<TAllocatio
AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
"allocation_internal_group_id", AllocationInternalGroupId);
auto allocationResult = Stage->Allocate(AllocatedVolume);
if (allocationResult.IsFail()) {
AllocationFailed = true;
Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
return false;
}
const bool result = Allocation->OnAllocated(
std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
if (result) {
Stage->Allocate(AllocatedVolume);
} else {
Stage->Free(AllocatedVolume, false);
if (!result) {
Stage->Free(AllocatedVolume, true);
AllocationFailed = true;
}
Allocation = nullptr;
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/limiter/grouped_memory/service/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ class TStageCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr AllocatedChunks;
NMonitoring::TDynamicCounters::TCounterPtr WaitingBytes;
NMonitoring::TDynamicCounters::TCounterPtr WaitingChunks;
NMonitoring::TDynamicCounters::TCounterPtr AllocationFailCount;

public:
TStageCounters(const TCommonCountersOwner& owner, const TString& name)
: TBase(owner, "stage", name)
, AllocatedBytes(TBase::GetValue("Allocated/Bytes"))
, AllocatedChunks(TBase::GetValue("Allocated/Count"))
, WaitingBytes(TBase::GetValue("Waiting/Bytes"))
, WaitingChunks(TBase::GetValue("Waiting/Count")) {
, WaitingChunks(TBase::GetValue("Waiting/Count"))
, AllocationFailCount(TBase::GetValue("AllocationFails/Count")) {
}

void OnCannotAllocate() {
AllocationFailCount->Add(1);
}

void Add(const ui64 volume, const bool allocated) {
Expand Down
25 changes: 21 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/usage/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NOlap::NGroupedMemoryManager {

Expand Down Expand Up @@ -95,6 +96,7 @@ class TStageFeatures {
private:
YDB_READONLY_DEF(TString, Name);
YDB_READONLY(ui64, Limit, 0);
YDB_READONLY(ui64, HardLimit, 0);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
std::shared_ptr<TStageFeatures> Owner;
Expand All @@ -114,24 +116,34 @@ class TStageFeatures {
return Usage.Val() + Waiting.Val();
}

TStageFeatures(
const TString& name, const ui64 limit, const std::shared_ptr<TStageFeatures>& owner, const std::shared_ptr<TStageCounters>& counters)
TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
const std::shared_ptr<TStageCounters>& counters)
: Name(name)
, Limit(limit)
, HardLimit(hardLimit)
, Owner(owner)
, Counters(counters) {
}

void Allocate(const ui64 volume) {
[[nodiscard]] TConclusionStatus Allocate(const ui64 volume) {
if (HardLimit < Usage.Val() + volume) {
Counters->OnCannotAllocate();
return TConclusionStatus::Fail(TStringBuilder() << "limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ";");
}
Waiting.Sub(volume);
Usage.Add(volume);
if (Counters) {
Counters->Add(volume, true);
Counters->Sub(volume, false);
}
if (Owner) {
Owner->Allocate(volume);
const auto ownerResult = Owner->Allocate(volume);
if (ownerResult.IsFail()) {
Free(volume, true);
return ownerResult;
}
}
return TConclusionStatus::Success();
}

void Free(const ui64 volume, const bool allocated) {
Expand Down Expand Up @@ -199,6 +211,7 @@ class IAllocation {
YDB_READONLY(ui64, Identifier, Counter.Inc());
YDB_READONLY(ui64, Memory, 0);
bool Allocated = false;
virtual void DoOnAllocationImpossible(const TString& errorMessage) = 0;
virtual bool DoOnAllocated(
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) = 0;

Expand All @@ -216,6 +229,10 @@ class IAllocation {
return Allocated;
}

void OnAllocationImpossible(const TString& errorMessage) {
DoOnAllocationImpossible(errorMessage);
}

[[nodiscard]] bool OnAllocated(
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation);
};
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/limiter/grouped_memory/usage/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TGroupedMemoryLimiterCon
if (config.HasMemoryLimit()) {
MemoryLimit = config.GetMemoryLimit();
}
if (config.HasHardMemoryLimit()) {
HardMemoryLimit = config.GetHardMemoryLimit();
}
Enabled = config.GetEnabled();
return true;
}

TString TConfig::DebugString() const {
TStringBuilder sb;
sb << "MemoryLimit=" << MemoryLimit << ";Enabled=" << Enabled << ";";
sb << "MemoryLimit=" << MemoryLimit << ";HardMemoryLimit=" << HardMemoryLimit << ";Enabled=" << Enabled << ";";
return sb;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/limiter/grouped_memory/usage/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TConfig {
private:
YDB_READONLY(bool, Enabled, true);
YDB_READONLY(ui64, MemoryLimit, ui64(3) << 30);
YDB_READONLY(ui64, HardMemoryLimit, ui64(10) << 30);

public:

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/limiter/grouped_memory/usage/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ class TServiceOperatorImpl {
private:
TConfig ServiceConfig = TConfig::BuildDisabledConfig();
std::shared_ptr<TCounters> Counters;
std::shared_ptr<TStageFeatures> DefaultStageFeatures = std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, nullptr, nullptr);
std::shared_ptr<TStageFeatures> DefaultStageFeatures =
std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, ((ui64)10) << 30, nullptr, nullptr);
using TSelf = TServiceOperatorImpl<TMemoryLimiterPolicy>;
static void Register(const TConfig& serviceConfig, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
Singleton<TSelf>()->Counters = std::make_shared<TCounters>(counters, TMemoryLimiterPolicy::Name);
Singleton<TSelf>()->ServiceConfig = serviceConfig;
Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>(
"GLOBAL", serviceConfig.GetMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general"));
Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>("GLOBAL", serviceConfig.GetMemoryLimit(),
serviceConfig.GetHardMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general"));
}
static const TString& GetMemoryLimiterName() {
Y_ABORT_UNLESS(TMemoryLimiterPolicy::Name.size() == 4);
Expand All @@ -35,7 +36,7 @@ class TServiceOperatorImpl {
} else {
AFL_VERIFY(Singleton<TSelf>()->DefaultStageFeatures);
return std::make_shared<TStageFeatures>(
name, limit, Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name));
name, limit, Max<ui64>(), Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name));
}
}

Expand Down

0 comments on commit 7d51c2b

Please sign in to comment.