Skip to content

Commit

Permalink
Collect and send backtraces for tx max/failed allocations in error me…
Browse files Browse the repository at this point in the history
…ssage (#10419)
  • Loading branch information
abyss7 authored Oct 15, 2024
1 parent f3a9d80 commit b3d9c13
Showing 1 changed file with 51 additions and 5 deletions.
56 changes: 51 additions & 5 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class TTxState : public TAtomicRefCount<TTxState> {
std::atomic<ui64> TxMaxAllocation = 0;
std::atomic<ui64> TxFailedAllocation = 0;

// TODO(ilezhankin): it's better to use std::atomic<std::shared_ptr<>> which is not supported at the moment.
std::atomic<TBackTrace*> TxMaxAllocationBacktrace = nullptr;
std::atomic<TBackTrace*> TxFailedAllocationBacktrace = nullptr;

public:
explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent,
const TString& database)
Expand All @@ -137,6 +141,11 @@ class TTxState : public TAtomicRefCount<TTxState> {
, Database(database)
{}

~TTxState() {
delete TxMaxAllocationBacktrace.load();
delete TxFailedAllocationBacktrace.load();
}

std::pair<TString, TString> MakePoolId() const {
return std::make_pair(Database, PoolId);
}
Expand All @@ -157,7 +166,14 @@ class TTxState : public TAtomicRefCount<TTxState> {
<< ", tx largest failed memory allocation: " << HumanReadableSize(TxFailedAllocation.load(), SF_BYTES)
<< ", tx total execution units: " << TxExecutionUnits.load()
<< ", started at: " << CreatedAt
<< " }";
<< " }" << Endl;

if (TxMaxAllocationBacktrace.load()) {
res << "TxMaxAllocationBacktrace:" << Endl << TxMaxAllocationBacktrace.load()->PrintToString();
}
if (TxFailedAllocationBacktrace.load()) {
res << "TxFailedAllocationBacktrace:" << Endl << TxFailedAllocationBacktrace.load()->PrintToString();
}

return res;
}
Expand All @@ -167,8 +183,23 @@ class TTxState : public TAtomicRefCount<TTxState> {
}

void AckFailedMemoryAlloc(ui64 memory) {
auto* oldBacktrace = TxFailedAllocationBacktrace.load();
ui64 maxAlloc = TxFailedAllocation.load();
while(maxAlloc < memory && !TxFailedAllocation.compare_exchange_weak(maxAlloc, memory));
bool exchanged = false;

while(maxAlloc < memory && !exchanged) {
exchanged = TxFailedAllocation.compare_exchange_weak(maxAlloc, memory);
}

if (exchanged) {
auto* newBacktrace = new TBackTrace();
newBacktrace->Capture();
if (TxFailedAllocationBacktrace.compare_exchange_strong(oldBacktrace, newBacktrace)) {
delete oldBacktrace;
} else {
delete newBacktrace;
}
}
}

void Released(TIntrusivePtr<TTaskState>& taskState, const TKqpResourcesRequest& resources) {
Expand All @@ -186,9 +217,6 @@ class TTxState : public TAtomicRefCount<TTxState> {
taskState->ScanQueryMemory -= resources.Memory;
Counters->RmMemory->Sub(resources.Memory);

ui64 maxAlloc = TxMaxAllocation.load();
while(maxAlloc < resources.Memory && !TxMaxAllocation.compare_exchange_weak(maxAlloc, resources.Memory));

TxExecutionUnits.fetch_sub(resources.ExecutionUnits);
taskState->ExecutionUnits -= resources.ExecutionUnits;
Counters->RmComputeActors->Sub(resources.ExecutionUnits);
Expand All @@ -210,6 +238,24 @@ class TTxState : public TAtomicRefCount<TTxState> {
Counters->RmExtraMemAllocs->Inc();
}

auto* oldBacktrace = TxMaxAllocationBacktrace.load();
ui64 maxAlloc = TxMaxAllocation.load();
bool exchanged = false;

while(maxAlloc < resources.Memory && !exchanged) {
exchanged = TxMaxAllocation.compare_exchange_weak(maxAlloc, resources.Memory);
}

if (exchanged) {
auto* newBacktrace = new TBackTrace();
newBacktrace->Capture();
if (TxMaxAllocationBacktrace.compare_exchange_strong(oldBacktrace, newBacktrace)) {
delete oldBacktrace;
} else {
delete newBacktrace;
}
}

TxExecutionUnits.fetch_add(resources.ExecutionUnits);
taskState->ExecutionUnits += resources.ExecutionUnits;
Counters->RmComputeActors->Add(resources.ExecutionUnits);
Expand Down

0 comments on commit b3d9c13

Please sign in to comment.