Skip to content

Commit

Permalink
Merge f94d002 into 8d768df
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored May 28, 2024
2 parents 8d768df + f94d002 commit b02b1c7
Show file tree
Hide file tree
Showing 28 changed files with 63 additions and 64 deletions.
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
[=](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(alloc, task, statsMode);
});
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void TKqpComputeActor::DoBootstrap() {
settings.ReadRanges.push_back(readRange);
}

auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void TKqpScanComputeActor::DoBootstrap() {
};
}

auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
auto taskRunner = MakeDqTaskRunner(GetAllocatorPtr(), execCtx, settings, logger);
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch

TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
if (!TxResults.empty() && Y_LIKELY(AllocState)) {
with_lock(AllocState->Alloc) {
with_lock(*AllocState->Alloc) {
TxResults.crop(0);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TString BuildMemoryLimitExceptionMessage() const {
if (Request.TxAlloc) {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
<< ", current limit is " << Request.TxAlloc->Alloc.GetLimit() << " bytes.";
<< ", current limit is " << Request.TxAlloc->Alloc->GetLimit() << " bytes.";
}
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TKqpLiteralExecuter {
UpdateCounters();
}

void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
void RunTask(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

Expand Down Expand Up @@ -212,7 +212,7 @@ class TKqpLiteralExecuter {
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);

with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
with_lock (*alloc) { // allocator is used only by outputChannel->PopAll()
for (auto& taskOutput : task.Outputs) {
for (ui64 outputChannelId : taskOutput.Channels) {
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
Expand Down Expand Up @@ -277,7 +277,7 @@ class TKqpLiteralExecuter {

private:
void CleanupCtx() {
with_lock(Request.TxAlloc->Alloc) {
with_lock(*Request.TxAlloc->Alloc) {
TaskRunners.erase(TaskRunners.begin(), TaskRunners.end());
Request.Transactions.erase(Request.Transactions.begin(), Request.Transactions.end());
ComputeCtx.reset();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
return peepHoleStatus;
}

auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc);
auto guard = Guard(*SessionCtx->Query().QueryData->GetAllocState()->Alloc);

auto input = Build<TDqPhyStage>(ctx, pos)
.Inputs()
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/query_data/kqp_query_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,19 @@ void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsL

TTxAllocatorState::TTxAllocatorState(const IFunctionRegistry* functionRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators())
, TypeEnv(Alloc)
: Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators()))
, TypeEnv(*Alloc)
, MemInfo("TQueryData")
, HolderFactory(Alloc.Ref(), MemInfo, functionRegistry)
, HolderFactory(Alloc->Ref(), MemInfo, functionRegistry)
{
Alloc.Release();
Alloc->Release();
TimeProvider = timeProvider;
RandomProvider = randomProvider;
}

TTxAllocatorState::~TTxAllocatorState()
{
Alloc.Acquire();
Alloc->Acquire();
}

std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TTxAllocatorState::GetInternalBindingValue(
Expand Down Expand Up @@ -366,7 +366,7 @@ const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TStrin

auto it = Params.find(name);
if (it == Params.end()) {
with_lock(AllocState->Alloc) {
with_lock(*AllocState->Alloc) {
const auto& [type, uv] = GetParameterUnboxedValue(name);
NKikimrMiniKQL::TParams param;
ExportTypeToProto(type, *param.MutableType());
Expand All @@ -388,7 +388,7 @@ const Ydb::TypedValue* TQueryData::GetParameterTypedValue(const TString& name) {

auto it = ParamsProtobuf.find(name);
if (it == ParamsProtobuf.end()) {
with_lock(AllocState->Alloc) {
with_lock(*AllocState->Alloc) {
const auto& [type, uv] = GetParameterUnboxedValue(name);

auto& tv = ParamsProtobuf[name];
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/query_data/kqp_query_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct TTimeAndRandomProvider {

class TTxAllocatorState: public TTimeAndRandomProvider {
public:
NKikimr::NMiniKQL::TScopedAlloc Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo;
NKikimr::NMiniKQL::THolderFactory HolderFactory;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/runtime/kqp_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp


TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: LogFunc(logFunc)
, Alloc(alloc)
Expand Down Expand Up @@ -230,13 +230,13 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {

TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
if (memoryLimit) {
Alloc.SetLimit(*memoryLimit);
Alloc->SetLimit(*memoryLimit);
}
return TGuard(Alloc);
return TGuard(*Alloc);
}

TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
{
return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/runtime/kqp_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
public:
TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);

Expand Down Expand Up @@ -51,15 +51,15 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop
// otherwise use particular memory limit
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());

ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); }
ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }

const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }
private:
TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners;
TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
NYql::NDq::TLogFunc LogFunc;
NMiniKQL::TScopedAlloc& Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;

Expand All @@ -73,7 +73,7 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop


TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,11 +692,11 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ui64 mkqlMaxLimit = phaseLimitsProto.GetComputeNodeMemoryLimitBytes();
mkqlMaxLimit = mkqlMaxLimit ? mkqlMaxLimit : ui64(Settings.MkqlMaxMemoryLimit);

alloc->Alloc.SetLimit(mkqlInitialLimit);
alloc->Alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
alloc->Alloc->SetLimit(mkqlInitialLimit);
alloc->Alloc->Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
if (required < mkqlMaxLimit) {
LOG_D("Increase memory limit from " << currentLimit << " to " << required);
alloc->Alloc.SetLimit(required);
alloc->Alloc->SetLimit(required);
}
});

Expand Down Expand Up @@ -2325,7 +2325,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
TString BuildMemoryLimitExceptionMessage() const {
if (QueryState && QueryState->TxCtx) {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
<< ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc.GetLimit() << " bytes.";
<< ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc->GetLimit() << " bytes.";
} else {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorC
ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, GetUserDb(), EngineHost->GetSettings().DisableByKeyFilter);
ComputeCtx->Database = &txc.DB;

KqpAlloc = MakeHolder<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators());
KqpAlloc = std::make_shared<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators());
KqpTypeEnv = MakeHolder<TTypeEnvironment>(*KqpAlloc);
KqpAlloc->Release();

Expand Down Expand Up @@ -725,7 +725,7 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTra
settings.TerminateOnError = false;
Y_ABORT_UNLESS(KqpAlloc);
KqpAlloc->SetLimit(10_MB);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), *KqpAlloc.Get(), KqpExecCtx, settings, KqpLogFunc);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpAlloc, KqpExecCtx, settings, KqpLogFunc);
}

return *KqpTasksRunner;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TEngineBay : TNonCopyable {
auto guard = TGuard(*KqpAlloc);
KqpTypeEnv.Reset();
}
KqpAlloc.Reset();
KqpAlloc.reset();
}
KqpExecCtx = {};

Expand Down Expand Up @@ -128,7 +128,7 @@ class TEngineBay : TNonCopyable {
NYql::NDq::TLogFunc KqpLogFunc;
THolder<NUdf::IApplyContext> KqpApplyCtx;
THolder<NMiniKQL::TKqpDatashardComputeContext> ComputeCtx;
THolder<NMiniKQL::TScopedAlloc> KqpAlloc;
std::shared_ptr<NMiniKQL::TScopedAlloc> KqpAlloc;
THolder<NMiniKQL::TTypeEnvironment> KqpTypeEnv;
NYql::NDq::TDqTaskRunnerContext KqpExecCtx;
TIntrusivePtr<NKqp::TKqpTasksRunner> KqpTasksRunner;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TDqComputeActor : public TDqSyncComputeActorBase<TDqComputeActor> {
};
}

auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger);
auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ struct TComputeMemoryLimits {
};

using TTaskRunnerFactory = std::function<
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
TIntrusivePtr<IDqTaskRunner>(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
>;

void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class TLocalTaskRunnerActor
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
ParentId = ev->Sender;
auto settings = NDq::TDqTaskSettings(&ev->Get()->Task);
TaskRunner = Factory(*Alloc.get(), settings, ev->Get()->StatsMode, [this](const TString& message) {
TaskRunner = Factory(Alloc, settings, ev->Get()->StatsMode, [this](const TString& message) {
LOG_D(message);
});

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,10 +1011,10 @@ class TDqTaskRunner : public IDqTaskRunner {
}
};

TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
const TLogFunc& logFunc)
{
return new TDqTaskRunner(alloc, ctx, settings, logFunc);
return new TDqTaskRunner(*alloc, ctx, settings, logFunc);
}

} // namespace NYql::NDq
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyabl
};

TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(
NKikimr::NMiniKQL::TScopedAlloc& alloc,
const TDqTaskRunnerContext& ctx,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TDqTaskRunnerContext& ctx,
const TDqTaskRunnerSettings& settings,
const TLogFunc& logFunc
);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/actors/compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ IActor* CreateComputeActor(
}
}

auto taskRunnerFactory = [factory = options.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) {
auto taskRunnerFactory = [factory = options.Factory](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) {
Y_UNUSED(logger);
return factory->Get(alloc, task, statsMode, {});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class TLocalServiceHolder {
lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
[factory=lwmOptions.Factory](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
return factory->Get(alloc, task, statsMode);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class TLocalExecutor: public TCounters
? CreateDeterministicRandomProvider(1)
: State->RandomProvider;

TScopedAlloc alloc(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
State->FunctionRegistry->SupportsSizedAllocators(),
false);
NDq::TDqTaskRunnerContext executionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,14 @@ class TTaskCommandExecutor {

Y_ABORT_UNLESS(!Alloc);
Y_ABORT_UNLESS(FunctionRegistry);
Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
Alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
FunctionRegistry->SupportsSizedAllocators(),
false
);

Runner = MakeDqTaskRunner(*Alloc.get(), Ctx, settings, nullptr);
Runner = MakeDqTaskRunner(Alloc, Ctx, settings, nullptr);
});

auto guard = Runner->BindAllocator(DqConfiguration->MemoryLimit.Get().GetOrElse(0));
Expand Down Expand Up @@ -770,7 +770,7 @@ class TTaskCommandExecutor {
result.Save(&output);
}
private:
std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
TTaskTransformFactory TaskTransformFactory;
NKikimr::NMiniKQL::IStatsRegistry* JobStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@ class TLocalFactory: public IProxyFactory {
ExecutionContext.PatternCache = patternCache;
}

ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, const TString& traceId) override {
ITaskRunner::TPtr GetOld(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskSettings& task, const TString& traceId) override {
return new TLocalTaskRunner(task, Get(alloc, task, NDqProto::DQ_STATS_MODE_BASIC, traceId));
}

TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override {
TIntrusivePtr<NDq::IDqTaskRunner> Get(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override {
Y_UNUSED(traceId);
NDq::TDqTaskRunnerSettings settings;
settings.TerminateOnError = TerminateOnError;
Expand Down
Loading

0 comments on commit b02b1c7

Please sign in to comment.