diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index f6a48b208c7b..d4cd835e37fe 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev) } } -void TKqpScanComputeActor::PollSources(std::any prev) { +void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) { if (!ScanData || ScanData->IsFinished()) { return; } const auto hasNewMemoryPred = [&]() { - if (!prev.has_value()) { - return false; - } const ui64 freeSpace = CalculateFreeSpace(); - const ui64 prevFreeSpace = std::any_cast(prev); return freeSpace > prevFreeSpace; }; if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) { return; } - const ui32 freeSpace = CalculateFreeSpace(); + const ui64 freeSpace = CalculateFreeSpace(); CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace); for (auto&& i : Fetchers) { Send(i, new TEvScanExchange::TEvAckData(freeSpace)); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index ccf548819a8c..dd9d78091ec6 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBaseFreeSpace; } - TGuard BindAllocator() override { - return TypeEnv->BindAllocator(); - } - - std::optional> MaybeBindAllocator() override { - std::optional> guard; - if (TypeEnv) { - guard.emplace(TypeEnv->BindAllocator()); - } - return guard; - } - void OnTaskRunnerCreated(NTaskRunnerActor::TEvTaskRunnerCreateFinished::TPtr& ev) { const auto& secureParams = ev->Get()->SecureParams; const auto& taskParams = ev->Get()->TaskParams; @@ -483,7 +471,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseAddCounters2(ev->Get()->Sensors); } TypeEnv = const_cast(&typeEnv); - FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges); + FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr); { // say "Hello" to executer @@ -517,7 +505,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseGet()->MkqlMemoryLimit; ProfileStats = std::move(ev->Get()->ProfileStats); - auto sourcesState = GetSourcesState(); auto status = ev->Get()->RunStatus; CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState @@ -536,10 +523,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseGet()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) { ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark()); WatermarksTracker.PopPendingWatermark(); @@ -801,6 +784,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase TVector GetIds(const THashMap& collection) { TVector ids; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index c3710181e7e5..88c5d8e458e9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -364,9 +364,6 @@ struct TComputeMemoryLimits { IMemoryQuotaManager::TPtr MemoryQuotaManager; }; -//temporary flag to integarate changes in interface -#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1 - using TTaskRunnerFactory = std::function< TIntrusivePtr(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc) >; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index 1c41ac1ebd9c..95a8cc1ffd91 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -83,8 +83,9 @@ struct TComputeActorAsyncInputHelper { Pause(*watermark); } } + const bool emptyBatch = batch.empty(); AsyncInputPush(std::move(batch), space, finished); - if (!batch.empty()) { + if (!emptyBatch) { // If we have read some data, we must run such reading again // to process the case when async input notified us about new data // but we haven't read all of it. diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5b4a3fa59793..888a8456b33d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -186,7 +186,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped Alloc = std::make_shared( __LOCATION__, NKikimr::TAlignedPagePoolCounters(), - FunctionRegistry->SupportsSizedAllocators(), + true, false ); InitMonCounters(taskCounters); @@ -304,30 +304,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ReportStats(TInstant::Now(), ESendStats::IfPossible); } if (Terminated) { - TaskRunner.Reset(); + DoTerminateImpl(); MemoryQuota.Reset(); MemoryLimits.MemoryQuotaManager.reset(); } } - virtual void DoExecuteImpl() { - auto sourcesState = GetSourcesState(); - - PollAsyncInput(); - ERunStatus status = TaskRunner->Run(); - - CA_LOG_T("Resume execution, run status: " << status); - - if (status != ERunStatus::Finished) { - PollSources(std::move(sourcesState)); - } - - if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) { - Checkpoints->DoCheckpoint(); - } - - ProcessOutputsImpl(status); - } + virtual void DoExecuteImpl() = 0; + virtual void DoTerminateImpl() {} virtual bool DoHandleChannelsAfterFinishImpl() { Y_ABORT_UNLESS(Checkpoints); @@ -477,7 +461,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } { - auto guard = MaybeBindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound + auto guard = BindAllocator(); // Source/Sink could destroy mkql values inside PassAway, which requires allocator to be bound for (auto& [_, source] : SourcesMap) { if (source.Actor) { @@ -606,12 +590,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped InternalError(statusCode, TIssues({std::move(issue)})); } + virtual void InvalidateMeminfo() {} + void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) { CA_LOG_E(InternalErrorLogString(statusCode, issues)); - if (TaskRunner) { - TaskRunner->GetAllocator().InvalidateMemInfo(); - TaskRunner->GetAllocator().DisableStrictAllocationCheck(); - } + InvalidateMeminfo(); State = NDqProto::COMPUTE_STATE_FAILURE; ReportStateAndMaybeDie(statusCode, issues); } @@ -741,22 +724,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return true; } - void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override { - CA_LOG_D("Save state"); - NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram(); - mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); - NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData(); - data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); - data.SetBlob(TaskRunner->Save()); - - for (auto& [inputIndex, source] : SourcesMap) { - YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); - NDqProto::TSourceState& sourceState = *state.AddSources(); - source.AsyncInput->SaveState(checkpoint, sourceState); - sourceState.SetInputIndex(inputIndex); - } - } - void CommitState(const NDqProto::TCheckpoint& checkpoint) override { CA_LOG_D("Commit state"); for (auto& [inputIndex, source] : SourcesMap) { @@ -810,15 +777,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } - virtual void DoLoadRunnerState(TString&& blob) { - TMaybe error = Nothing(); - try { - TaskRunner->Load(blob); - } catch (const std::exception& e) { - error = e.what(); - } - Checkpoints->AfterStateLoading(error); - } + virtual void DoLoadRunnerState(TString&& blob) = 0; void LoadState(NDqProto::TComputeActorState&& state) override { CA_LOG_D("Load state"); @@ -1073,13 +1032,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped protected: // virtual methods (TODO: replace with static_cast(this)->Foo() - virtual std::any GetSourcesState() { - return nullptr; - } - - virtual void PollSources(std::any /* state */) { - } - virtual void TerminateSources(const TIssues& /* issues */, bool /* success */) { } @@ -1087,18 +1039,16 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped TerminateSources(TIssues({TIssue(message)}), success); } - virtual TGuard BindAllocator() { - return TaskRunner->BindAllocator(); - } - - virtual std::optional> MaybeBindAllocator() { - return TaskRunner->BindAllocator(); + TGuard BindAllocator() { + return Guard(GetAllocator()); } virtual bool SayHelloOnBootstrap() { return true; } + + protected: void HandleExecuteBase(TEvDqCompute::TEvResumeExecution::TPtr&) { ResumeEventScheduled = false; @@ -1469,19 +1419,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const THashMap& secureParams, const THashMap& taskParams, - const TVector& readRanges) + const TVector& readRanges, + IRandomProvider* randomProvider + ) { - if (TaskRunner) { - for (auto& [channelId, channel] : InputChannelsMap) { - channel.Channel = TaskRunner->GetInputChannel(channelId); - } - } auto collectStatsLevel = StatsModeToCollectStatsLevel(RuntimeSettings.StatsMode); for (auto& [inputIndex, source] : SourcesMap) { - if constexpr (!TDerived::HasAsyncTaskRunner) { - source.Buffer = TaskRunner->GetSource(inputIndex); - Y_ABORT_UNLESS(source.Buffer); - } Y_ABORT_UNLESS(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); Y_ABORT_UNLESS(inputDesc.HasSource()); @@ -1503,7 +1446,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TypeEnv = typeEnv, .HolderFactory = holderFactory, .TaskCounters = TaskCounters, - .Alloc = TaskRunner ? Alloc : nullptr, + .Alloc = Alloc, .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager, .SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr), .Arena = Task.GetArena(), @@ -1515,71 +1458,59 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped this->RegisterWithSameMailbox(source.Actor); } for (auto& [inputIndex, transform] : InputTransformsMap) { - if constexpr (!TDerived::HasAsyncTaskRunner) { - transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); - std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex); - Y_ABORT_UNLESS(AsyncIoFactory); - const auto& inputDesc = Task.GetInputs(inputIndex); - CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString()); - try { - std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform( - IDqAsyncIoFactory::TInputTransformArguments { - .InputDesc = inputDesc, - .InputIndex = inputIndex, - .StatsLevel = collectStatsLevel, - .TxId = TxId, - .TaskId = Task.GetId(), - .TransformInput = transform.InputBuffer, - .SecureParams = secureParams, - .TaskParams = taskParams, - .ComputeActorId = this->SelfId(), - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder, - .Alloc = Alloc, - .TraceId = ComputeActorSpan.GetTraceId() - }); - } catch (const std::exception& ex) { - throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); - } - this->RegisterWithSameMailbox(transform.Actor); - } - } - if (TaskRunner) { - for (auto& [channelId, channel] : OutputChannelsMap) { - channel.Channel = TaskRunner->GetOutputChannel(channelId); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); + Y_ABORT_UNLESS(AsyncIoFactory); + const auto& inputDesc = Task.GetInputs(inputIndex); + CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString()); + try { + std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform( + IDqAsyncIoFactory::TInputTransformArguments { + .InputDesc = inputDesc, + .InputIndex = inputIndex, + .StatsLevel = collectStatsLevel, + .TxId = TxId, + .TaskId = Task.GetId(), + .TransformInput = transform.InputBuffer, + .SecureParams = secureParams, + .TaskParams = taskParams, + .ComputeActorId = this->SelfId(), + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder, + .Alloc = Alloc, + .TraceId = ComputeActorSpan.GetTraceId() + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what(); } + this->RegisterWithSameMailbox(transform.Actor); } for (auto& [outputIndex, transform] : OutputTransformsMap) { - if (TaskRunner) { - transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); - std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex); - Y_ABORT_UNLESS(AsyncIoFactory); - const auto& outputDesc = Task.GetOutputs(outputIndex); - CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString()); - try { - std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( - IDqAsyncIoFactory::TOutputTransformArguments { - .OutputDesc = outputDesc, - .OutputIndex = outputIndex, - .StatsLevel = collectStatsLevel, - .TxId = TxId, - .TransformOutput = transform.OutputBuffer, - .Callback = static_cast(this), - .SecureParams = secureParams, - .TaskParams = taskParams, - .TypeEnv = typeEnv, - .HolderFactory = holderFactory, - .ProgramBuilder = *transform.ProgramBuilder - }); - } catch (const std::exception& ex) { - throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); - } - this->RegisterWithSameMailbox(transform.Actor); + transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); + Y_ABORT_UNLESS(AsyncIoFactory); + const auto& outputDesc = Task.GetOutputs(outputIndex); + CA_LOG_D("Create transform for output " << outputIndex << " " << outputDesc.ShortDebugString()); + try { + std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( + IDqAsyncIoFactory::TOutputTransformArguments { + .OutputDesc = outputDesc, + .OutputIndex = outputIndex, + .StatsLevel = collectStatsLevel, + .TxId = TxId, + .TransformOutput = transform.OutputBuffer, + .Callback = static_cast(this), + .SecureParams = secureParams, + .TaskParams = taskParams, + .TypeEnv = typeEnv, + .HolderFactory = holderFactory, + .ProgramBuilder = *transform.ProgramBuilder + }); + } catch (const std::exception& ex) { + throw yexception() << "Failed to create output transform " << outputDesc.GetTransform().GetType() << ": " << ex.what(); } + this->RegisterWithSameMailbox(transform.Actor); } for (auto& [outputIndex, sink] : SinksMap) { - if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); } Y_ABORT_UNLESS(AsyncIoFactory); const auto& outputDesc = Task.GetOutputs(outputIndex); Y_ABORT_UNLESS(outputDesc.HasSink()); @@ -1597,7 +1528,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TaskParams = taskParams, .TypeEnv = typeEnv, .HolderFactory = holderFactory, - .RandomProvider = TaskRunner ? TaskRunner->GetRandomProvider() : nullptr + .RandomProvider = randomProvider }); } catch (const std::exception& ex) { throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what(); @@ -1804,9 +1735,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } - virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() { - return TaskRunner ? TaskRunner->GetStats() : nullptr; - } + virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() = 0; + virtual const NYql::NDq::TDqMeteringStats* GetMeteringStats() = 0; virtual const IDqAsyncOutputBuffer* GetSink(ui64, const TAsyncOutputInfoBase& sinkInfo) const { return sinkInfo.Buffer.Get(); @@ -1866,8 +1796,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped for (auto& [inputIndex, sourceInfo] : SourcesMap) { if (auto* source = sourceInfo.AsyncInput) { - // TODO: support async CA - source->FillExtraStats(protoTask, last, TaskRunner ? TaskRunner->GetMeteringStats() : nullptr); + source->FillExtraStats(protoTask, last, GetMeteringStats()); } } FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, RuntimeSettings.GetCollectStatsLevel()); @@ -1972,8 +1901,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } if (auto* transform = transformInfo.AsyncInput) { - // TODO: support async CA - transform->FillExtraStats(protoTask, last, TaskRunner ? TaskRunner->GetMeteringStats() : 0); + transform->FillExtraStats(protoTask, last, GetMeteringStats()); } } @@ -2102,7 +2030,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped const IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; const NDqProto::ECheckpointingMode CheckpointingMode; - TIntrusivePtr TaskRunner; TDqComputeActorChannels* Channels = nullptr; TDqComputeActorCheckpoints* Checkpoints = nullptr; THashMap InputChannelsMap; // Channel id -> Channel info diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 8d2ffb00924b..7e21910ca760 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -44,15 +44,73 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase(this)->GetSourcesState(); + + TBase::PollAsyncInput(); + ERunStatus status = TaskRunner->Run(); + + CA_LOG_T("Resume execution, run status: " << status); + + if (status != ERunStatus::Finished) { + static_cast(this)->PollSources(std::move(sourcesState)); + } + + if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && this->Checkpoints && this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && TBase::ReadyToCheckpoint()) { + this->Checkpoints->DoCheckpoint(); + } + + TBase::ProcessOutputsImpl(status); + } + + void DoTerminateImpl() override { + TaskRunner.Reset(); + } + + void InvalidateMeminfo() override { + if (TaskRunner) { + TaskRunner->GetAllocator().InvalidateMemInfo(); + TaskRunner->GetAllocator().DisableStrictAllocationCheck(); + } + } + + void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override { + CA_LOG_D("Save state"); + NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram(); + mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); + NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData(); + data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); + data.SetBlob(TaskRunner->Save()); + + for (auto& [inputIndex, source] : this->SourcesMap) { + YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); + NDqProto::TSourceState& sourceState = *state.AddSources(); + source.AsyncInput->SaveState(checkpoint, sourceState); + sourceState.SetInputIndex(inputIndex); + } + } + + void DoLoadRunnerState(TString&& blob) override { + TMaybe error = Nothing(); + try { + TaskRunner->Load(blob); + } catch (const std::exception& e) { + error = e.what(); + } + this->Checkpoints->AfterStateLoading(error); + } + void SetTaskRunner(const TIntrusivePtr& taskRunner) { - this->TaskRunner = taskRunner; + TaskRunner = taskRunner; } void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { - YQL_ENSURE(this->TaskRunner); + YQL_ENSURE(TaskRunner); - auto guard = this->TaskRunner->BindAllocator(this->MemoryQuota->GetMkqlMemoryLimit()); + auto guard = TBase::BindAllocator(); auto* alloc = guard.GetMutex(); + alloc->SetLimit(this->MemoryQuota->GetMkqlMemoryLimit()); this->MemoryQuota->TrySetIncreaseMemoryLimitCallback(alloc); @@ -60,15 +118,61 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseMemoryLimits.ChannelBufferSize; limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - this->TaskRunner->Prepare(this->Task, limits, execCtx); + TaskRunner->Prepare(this->Task, limits, execCtx); + + for (auto& [channelId, channel] : this->InputChannelsMap) { + channel.Channel = TaskRunner->GetInputChannel(channelId); + } + + for (auto& [inputIndex, source] : this->SourcesMap) { + source.Buffer = TaskRunner->GetSource(inputIndex); + Y_ABORT_UNLESS(source.Buffer); + } + + for (auto& [inputIndex, transform] : this->InputTransformsMap) { + std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex); + } + + for (auto& [channelId, channel] : this->OutputChannelsMap) { + channel.Channel = TaskRunner->GetOutputChannel(channelId); + } + + for (auto& [outputIndex, transform] : this->OutputTransformsMap) { + std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex); + } + + for (auto& [outputIndex, sink] : this->SinksMap) { + sink.Buffer = TaskRunner->GetSink(outputIndex); + } TBase::FillIoMaps( - this->TaskRunner->GetHolderFactory(), - this->TaskRunner->GetTypeEnv(), - this->TaskRunner->GetSecureParams(), - this->TaskRunner->GetTaskParams(), - this->TaskRunner->GetReadRanges()); + TaskRunner->GetHolderFactory(), + TaskRunner->GetTypeEnv(), + TaskRunner->GetSecureParams(), + TaskRunner->GetTaskParams(), + TaskRunner->GetReadRanges(), + TaskRunner->GetRandomProvider() + ); + } + + const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override { + return TaskRunner ? TaskRunner->GetStats() : nullptr; } + + const NYql::NDq::TDqMeteringStats* GetMeteringStats() override { + return TaskRunner ? TaskRunner->GetMeteringStats() : nullptr; + } + +protected: + // methods that are called via static_cast(this) and may be overriden by a dervied class + void* GetSourcesState() const { + return nullptr; + } + void PollSources(void* /* state */) { + } + + TIntrusivePtr TaskRunner; + }; } //namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp new file mode 100644 index 000000000000..137816bcbe57 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/ut/dq_compute_actor_async_input_helper_ut.cpp @@ -0,0 +1,84 @@ + +#include +#include +#include +#include + +#undef IS_CTX_LOG_PRIORITY_ENABLED +#define IS_CTX_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component, sampleBy) false +#include + +namespace NYql::NDq { + +Y_UNIT_TEST_SUITE(TComputeActorAsyncInputHelperTest) { + + struct TDummyDqComputeActorAsyncInput: IDqComputeActorAsyncInput { + TDummyDqComputeActorAsyncInput() { + Batch.emplace_back(NUdf::TUnboxedValue{}); + Batch.emplace_back(NUdf::TUnboxedValue{}); + } + ui64 GetInputIndex() const override { + return 4; + } + + const TDqAsyncStats& GetIngressStats() const override{ + static TDqAsyncStats stats; + return stats; + } + + i64 GetAsyncInputData( + NKikimr::NMiniKQL::TUnboxedValueBatch& batch, + TMaybe& watermark, + bool& finished, + i64 freeSpace) override + { + Y_ABORT_IF(Batch.empty()); + batch = Batch; + Y_UNUSED(watermark); + Y_UNUSED(finished); + Y_UNUSED(freeSpace); + return 2; + } + + // Checkpointing. + void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) override { + Y_UNUSED(checkpoint); + Y_UNUSED(state); + } + void CommitState(const NDqProto::TCheckpoint& checkpoint) override { + Y_UNUSED(checkpoint); + } + void LoadState(const NDqProto::TSourceState& state) override { + Y_UNUSED(state); + } + + void PassAway() override {} + NKikimr::NMiniKQL::TUnboxedValueBatch Batch; + }; + + struct TDummyAsyncInputHelper: TComputeActorAsyncInputHelper{ + using TComputeActorAsyncInputHelper::TComputeActorAsyncInputHelper; + i64 GetFreeSpace() const override{ + return 10; + } + void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) override{ + batch.clear(); + Y_UNUSED(space); + Y_UNUSED(finished); + return; + } + }; + + Y_UNIT_TEST(PollAsyncInput) { + NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, true); + TDummyDqComputeActorAsyncInput input; + TDummyAsyncInputHelper helper("MyPrefix", 13, NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED); + helper.AsyncInput = &input; + TDqComputeActorMetrics metrics{NMonitoring::TDynamicCounterPtr{}}; + TDqComputeActorWatermarks watermarks(NActors::TActorIdentity{NActors::TActorId{}}, TTxId{}, 7); + auto result = helper.PollAsyncInput(metrics, watermarks, 20); + UNIT_ASSERT(result && EResumeSource::CAPollAsync == *result); + } +} + +} //namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/ut/ya.make b/ydb/library/yql/dq/actors/compute/ut/ya.make index 6dfca084a033..617f9e30f149 100644 --- a/ydb/library/yql/dq/actors/compute/ut/ya.make +++ b/ydb/library/yql/dq/actors/compute/ut/ya.make @@ -1,6 +1,7 @@ UNITTEST_FOR(ydb/library/yql/dq/actors/compute) SRCS( + dq_compute_actor_async_input_helper_ut.cpp dq_compute_issues_buffer_ut.cpp dq_source_watermark_tracker_ut.cpp ) @@ -12,4 +13,6 @@ PEERDIR( ydb/library/yql/sql/pg_dummy ) +YQL_LAST_ABI_VERSION() + END() diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h index 295e5ce693ca..f9acea3d2464 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h @@ -50,8 +50,6 @@ struct ITaskRunnerActorFactory { THolder&& memoryQuota = {}) = 0; }; -// temporary for YQL-17542 -#define Y_YQL_DQ_TASK_RUNNER_ACTOR_FACTORY_COMPATIBILITY_1 ITaskRunnerActorFactory::TPtr CreateLocalTaskRunnerActorFactory(const TTaskRunnerFactory& factory); } // namespace NTaskRunnerActor