From 002867f3e171735b65eaa57ecbd730b677557ce7 Mon Sep 17 00:00:00 2001 From: Evgeny Zverev Date: Wed, 31 Jan 2024 19:41:34 +0300 Subject: [PATCH] YQL-17542 move SaveState LoadState --- .../dq/actors/compute/dq_compute_actor_impl.h | 26 +------------------ .../compute/dq_sync_compute_actor_base.h | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5b4a3fa59793..6c45c15c8741 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -741,22 +741,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return true; } - void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override { - CA_LOG_D("Save state"); - NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram(); - mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); - NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData(); - data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); - data.SetBlob(TaskRunner->Save()); - - for (auto& [inputIndex, source] : SourcesMap) { - YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); - NDqProto::TSourceState& sourceState = *state.AddSources(); - source.AsyncInput->SaveState(checkpoint, sourceState); - sourceState.SetInputIndex(inputIndex); - } - } - void CommitState(const NDqProto::TCheckpoint& checkpoint) override { CA_LOG_D("Commit state"); for (auto& [inputIndex, source] : SourcesMap) { @@ -810,15 +794,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } - virtual void DoLoadRunnerState(TString&& blob) { - TMaybe error = Nothing(); - try { - TaskRunner->Load(blob); - } catch (const std::exception& e) { - error = e.what(); - } - Checkpoints->AfterStateLoading(error); - } + virtual void DoLoadRunnerState(TString&& blob) = 0; void LoadState(NDqProto::TComputeActorState&& state) override { CA_LOG_D("Load state"); diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 8d2ffb00924b..000b0d8564e9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -44,6 +44,32 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseMutableStateData(); + data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); + data.SetBlob(this->TaskRunner->Save()); + + for (auto& [inputIndex, source] : this->SourcesMap) { + YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); + NDqProto::TSourceState& sourceState = *state.AddSources(); + source.AsyncInput->SaveState(checkpoint, sourceState); + sourceState.SetInputIndex(inputIndex); + } + } + + void DoLoadRunnerState(TString&& blob) override { + TMaybe error = Nothing(); + try { + this->TaskRunner->Load(blob); + } catch (const std::exception& e) { + error = e.what(); + } + this->Checkpoints->AfterStateLoading(error); + } + void SetTaskRunner(const TIntrusivePtr& taskRunner) { this->TaskRunner = taskRunner; }