From 39c1baa54022ac00e70c9eddb7a628aeb35a5732 Mon Sep 17 00:00:00 2001 From: yumkam Date: Wed, 18 Sep 2024 11:15:22 +0300 Subject: [PATCH] [RFC] Add checkpoint support for streamlookup (#9299) (cherry picked from commit c5d3c1ad8e3650c570cbcff1b96277f32fcb0b27) --- .../dq/actors/compute/dq_compute_actor_impl.h | 21 ++++++++++++------- .../dq_input_transform_lookup.cpp | 2 +- .../task_runner/task_runner_actor_local.cpp | 14 +++++++++++++ ydb/library/yql/dq/runtime/dq_async_input.cpp | 7 ++++++- ydb/library/yql/dq/runtime/dq_async_input.h | 2 ++ 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e87acb1f7b7f..bcc9cc89784b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1369,8 +1369,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } void PollAsyncInput() { + if (!Running) { + CA_LOG_T("Skip polling inputs and sources because not running"); + return; + } + + CA_LOG_T("Poll inputs"); + for (auto& [inputIndex, transform] : InputTransformsMap) { + if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { + ContinueExecute(*resume); + } + } + // Don't produce any input from sources if we're about to save checkpoint. - if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) { + if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) { CA_LOG_T("Skip polling sources because of pending checkpoint"); return; } @@ -1381,13 +1393,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ContinueExecute(*resume); } } - - CA_LOG_T("Poll inputs"); - for (auto& [inputIndex, transform] : InputTransformsMap) { - if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { - ContinueExecute(*resume); - } - } } void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 77b53fff73fa..23f9cd0b702e 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -184,7 +184,7 @@ class TInputTransformStreamLookupBase } } finished = IsFinished(); - return 0; + return AwaitingQueue.RowCount(); } TMaybe ExtraData() override { diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 33cb50221d17..27cef00055d1 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -143,6 +143,18 @@ class TLocalTaskRunnerActor return false; } } + for (const auto transformId: InputTransforms) { + const auto t = TaskRunner->GetInputTransform(transformId); + if (t) { + auto [_, transform] = *t; + if (!transform->Empty()) { + return false; + } + if (transform->IsPending()) { + return false; + } + } + } return true; } @@ -436,6 +448,7 @@ class TLocalTaskRunnerActor for (auto i = 0; i != inputs.size(); ++i) { if (auto t = TaskRunner->GetInputTransform(i)) { inputTransforms[i] = *t; + InputTransforms.emplace(i); } } @@ -488,6 +501,7 @@ class TLocalTaskRunnerActor const TTxId TxId; const ui64 TaskId; THashSet Inputs; + THashSet InputTransforms; THashSet Sources; TIntrusivePtr TaskRunner; THashSet InputChannelsWithDisabledCheckpoints; diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp index 7d515e5cb31e..9f5c1704813c 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp @@ -6,6 +6,7 @@ namespace NYql::NDq { class TDqAsyncInputBuffer : public TDqInputImpl { using TBaseImpl = TDqInputImpl; friend TBaseImpl; + bool Pending = false; public: TDqAsyncInputBufferStats PushStats; TDqInputStats PopStats; @@ -32,7 +33,7 @@ class TDqAsyncInputBuffer : public TDqInputImpl