Skip to content

Commit

Permalink
Merge 002867f into a456eb6
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored Jan 31, 2024
2 parents a456eb6 + 002867f commit 60a4c00
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
26 changes: 1 addition & 25 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,22 +741,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
return true;
}

void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
CA_LOG_D("Save state");
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
data.SetBlob(TaskRunner->Save());

for (auto& [inputIndex, source] : SourcesMap) {
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
NDqProto::TSourceState& sourceState = *state.AddSources();
source.AsyncInput->SaveState(checkpoint, sourceState);
sourceState.SetInputIndex(inputIndex);
}
}

void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
CA_LOG_D("Commit state");
for (auto& [inputIndex, source] : SourcesMap) {
Expand Down Expand Up @@ -810,15 +794,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}
}

virtual void DoLoadRunnerState(TString&& blob) {
TMaybe<TString> error = Nothing();
try {
TaskRunner->Load(blob);
} catch (const std::exception& e) {
error = e.what();
}
Checkpoints->AfterStateLoading(error);
}
virtual void DoLoadRunnerState(TString&& blob) = 0;

void LoadState(NDqProto::TComputeActorState&& state) override {
CA_LOG_D("Load state");
Expand Down
26 changes: 26 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,32 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
return inputTransformInfo.Buffer.Get();
}
protected:
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
CA_LOG_D("Save state");
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
data.SetBlob(this->TaskRunner->Save());

for (auto& [inputIndex, source] : this->SourcesMap) {
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
NDqProto::TSourceState& sourceState = *state.AddSources();
source.AsyncInput->SaveState(checkpoint, sourceState);
sourceState.SetInputIndex(inputIndex);
}
}

void DoLoadRunnerState(TString&& blob) override {
TMaybe<TString> error = Nothing();
try {
this->TaskRunner->Load(blob);
} catch (const std::exception& e) {
error = e.what();
}
this->Checkpoints->AfterStateLoading(error);
}

void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
this->TaskRunner = taskRunner;
}
Expand Down

0 comments on commit 60a4c00

Please sign in to comment.