From 770e322d7cd3666dd677725071a01648f3f3f494 Mon Sep 17 00:00:00 2001 From: Evgeny Zverev Date: Wed, 7 Feb 2024 13:04:17 +0000 Subject: [PATCH] YQL-17542 move TaskRunner dependent Execute to TDqSyncComputeActorBase --- .../dq/actors/compute/dq_compute_actor_impl.h | 54 +++++++++---------- .../compute/dq_sync_compute_actor_base.h | 50 ++++++++--------- 2 files changed, 52 insertions(+), 52 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 0ee3ca32cce2..888a8456b33d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1458,33 +1458,32 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->RegisterWithSameMailbox(source.Actor); } for (auto& [inputIndex, transform] : InputTransformsMap) { - Y_ABORT_UNLESS(TaskRunner); - transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); - Y_ABORT_UNLESS(AsyncIoFactory); - const auto& inputDesc = Task.GetInputs(inputIndex); - CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString()); - try { - std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform( - IDqAsyncIoFactory::TInputTransformArguments { - .InputDesc = inputDesc, - .InputIndex = inputIndex, - .StatsLevel = collectStatsLevel, - .TxId = TxId, - .TaskId = Task.GetId(), - .TransformInput = transform.InputBuffer, - .SecureParams = secureParams, - .TaskParams = taskParams, - .ComputeActorId = this->SelfId(), - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder, - .Alloc = Alloc, - .TraceId = ComputeActorSpan.GetTraceId() - }); - } catch (const std::exception& ex) { - throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); - } - this->RegisterWithSameMailbox(transform.Actor); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); + Y_ABORT_UNLESS(AsyncIoFactory); + const auto& inputDesc = Task.GetInputs(inputIndex); + CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString()); + try { + std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform( + IDqAsyncIoFactory::TInputTransformArguments { + .InputDesc = inputDesc, + .InputIndex = inputIndex, + .StatsLevel = collectStatsLevel, + .TxId = TxId, + .TaskId = Task.GetId(), + .TransformInput = transform.InputBuffer, + .SecureParams = secureParams, + .TaskParams = taskParams, + .ComputeActorId = this->SelfId(), + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder, + .Alloc = Alloc, + .TraceId = ComputeActorSpan.GetTraceId() + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); + } + this->RegisterWithSameMailbox(transform.Actor); } for (auto& [outputIndex, transform] : OutputTransformsMap) { transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); @@ -2031,7 +2030,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped const IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; const NDqProto::ECheckpointingMode CheckpointingMode; - TIntrusivePtr TaskRunner; TDqComputeActorChannels* Channels = nullptr; TDqComputeActorCheckpoints* Checkpoints = nullptr; THashMap InputChannelsMap; // Channel id -> Channel info diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index f40d8c4b51e1..7e21910ca760 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -49,7 +49,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase(this)->GetSourcesState(); TBase::PollAsyncInput(); - ERunStatus status = this->TaskRunner->Run(); + ERunStatus status = TaskRunner->Run(); CA_LOG_T("Resume execution, run status: " << status); @@ -65,13 +65,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseTaskRunner.Reset(); + TaskRunner.Reset(); } void InvalidateMeminfo() override { - if (this->TaskRunner) { - this->TaskRunner->GetAllocator().InvalidateMemInfo(); - this->TaskRunner->GetAllocator().DisableStrictAllocationCheck(); + if (TaskRunner) { + TaskRunner->GetAllocator().InvalidateMemInfo(); + TaskRunner->GetAllocator().DisableStrictAllocationCheck(); } } @@ -81,7 +81,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseMutableStateData(); data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); - data.SetBlob(this->TaskRunner->Save()); + data.SetBlob(TaskRunner->Save()); for (auto& [inputIndex, source] : this->SourcesMap) { YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); @@ -94,7 +94,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase error = Nothing(); try { - this->TaskRunner->Load(blob); + TaskRunner->Load(blob); } catch (const std::exception& e) { error = e.what(); } @@ -102,11 +102,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase& taskRunner) { - this->TaskRunner = taskRunner; + TaskRunner = taskRunner; } void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { - YQL_ENSURE(this->TaskRunner); + YQL_ENSURE(TaskRunner); auto guard = TBase::BindAllocator(); auto* alloc = guard.GetMutex(); @@ -118,49 +118,49 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseMemoryLimits.ChannelBufferSize; limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - this->TaskRunner->Prepare(this->Task, limits, execCtx); + TaskRunner->Prepare(this->Task, limits, execCtx); for (auto& [channelId, channel] : this->InputChannelsMap) { - channel.Channel = this->TaskRunner->GetInputChannel(channelId); + channel.Channel = TaskRunner->GetInputChannel(channelId); } for (auto& [inputIndex, source] : this->SourcesMap) { - source.Buffer = this->TaskRunner->GetSource(inputIndex); + source.Buffer = TaskRunner->GetSource(inputIndex); Y_ABORT_UNLESS(source.Buffer); } for (auto& [inputIndex, transform] : this->InputTransformsMap) { - std::tie(transform.InputBuffer, transform.Buffer) = this->TaskRunner->GetInputTransform(inputIndex); + std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex); } for (auto& [channelId, channel] : this->OutputChannelsMap) { - channel.Channel = this->TaskRunner->GetOutputChannel(channelId); + channel.Channel = TaskRunner->GetOutputChannel(channelId); } for (auto& [outputIndex, transform] : this->OutputTransformsMap) { - std::tie(transform.Buffer, transform.OutputBuffer) = this->TaskRunner->GetOutputTransform(outputIndex); + std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex); } for (auto& [outputIndex, sink] : this->SinksMap) { - sink.Buffer = this->TaskRunner->GetSink(outputIndex); + sink.Buffer = TaskRunner->GetSink(outputIndex); } TBase::FillIoMaps( - this->TaskRunner->GetHolderFactory(), - this->TaskRunner->GetTypeEnv(), - this->TaskRunner->GetSecureParams(), - this->TaskRunner->GetTaskParams(), - this->TaskRunner->GetReadRanges(), - this->TaskRunner->GetRandomProvider() + TaskRunner->GetHolderFactory(), + TaskRunner->GetTypeEnv(), + TaskRunner->GetSecureParams(), + TaskRunner->GetTaskParams(), + TaskRunner->GetReadRanges(), + TaskRunner->GetRandomProvider() ); } const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override { - return this->TaskRunner ? this->TaskRunner->GetStats() : nullptr; + return TaskRunner ? TaskRunner->GetStats() : nullptr; } const NYql::NDq::TDqMeteringStats* GetMeteringStats() override { - return this->TaskRunner ? this->TaskRunner->GetMeteringStats() : nullptr; + return TaskRunner ? TaskRunner->GetMeteringStats() : nullptr; } protected: @@ -171,6 +171,8 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase TaskRunner; + }; } //namespace NYql::NDq