Skip to content

Commit

Permalink
[RFC] Add checkpoint support for streamlookup (#9299)
Browse files Browse the repository at this point in the history
(cherry picked from commit c5d3c1a)
  • Loading branch information
yumkam committed Sep 24, 2024
1 parent 1ec7315 commit 39c1baa
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 10 deletions.
21 changes: 13 additions & 8 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1369,8 +1369,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

void PollAsyncInput() {
if (!Running) {
CA_LOG_T("Skip polling inputs and sources because not running");
return;
}

CA_LOG_T("Poll inputs");
for (auto& [inputIndex, transform] : InputTransformsMap) {
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
ContinueExecute(*resume);
}
}

// Don't produce any input from sources if we're about to save checkpoint.
if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
CA_LOG_T("Skip polling sources because of pending checkpoint");
return;
}
Expand All @@ -1381,13 +1393,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ContinueExecute(*resume);
}
}

CA_LOG_T("Poll inputs");
for (auto& [inputIndex, transform] : InputTransformsMap) {
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
ContinueExecute(*resume);
}
}
}

void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TInputTransformStreamLookupBase
}
}
finished = IsFinished();
return 0;
return AwaitingQueue.RowCount();
}

TMaybe<google::protobuf::Any> ExtraData() override {
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ class TLocalTaskRunnerActor
return false;
}
}
for (const auto transformId: InputTransforms) {
const auto t = TaskRunner->GetInputTransform(transformId);
if (t) {
auto [_, transform] = *t;
if (!transform->Empty()) {
return false;
}
if (transform->IsPending()) {
return false;
}
}
}
return true;
}

Expand Down Expand Up @@ -436,6 +448,7 @@ class TLocalTaskRunnerActor
for (auto i = 0; i != inputs.size(); ++i) {
if (auto t = TaskRunner->GetInputTransform(i)) {
inputTransforms[i] = *t;
InputTransforms.emplace(i);
}
}

Expand Down Expand Up @@ -488,6 +501,7 @@ class TLocalTaskRunnerActor
const TTxId TxId;
const ui64 TaskId;
THashSet<ui32> Inputs;
THashSet<ui32> InputTransforms;
THashSet<ui32> Sources;
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
THashSet<ui32> InputChannelsWithDisabledCheckpoints;
Expand Down
7 changes: 6 additions & 1 deletion ydb/library/yql/dq/runtime/dq_async_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NYql::NDq {
class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> {
using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>;
friend TBaseImpl;
bool Pending = false;
public:
TDqAsyncInputBufferStats PushStats;
TDqInputStats PopStats;
Expand All @@ -32,7 +33,7 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
}

void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
Y_ABORT_UNLESS(!batch.empty() || !space);
Pending = space != 0;
if (!batch.empty()) {
AddBatch(std::move(batch), space);
}
Expand All @@ -41,6 +42,10 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
virtual void Push(TDqSerializedBatch&&, i64) override {
YQL_ENSURE(!"Unimplemented");
}

bool IsPending() const override {
return Pending;
}
};

IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/runtime/dq_async_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class IDqAsyncInputBuffer : public IDqInput {
virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;

virtual void Finish() = 0;

virtual bool IsPending() const { return false; };
};

IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,
Expand Down

0 comments on commit 39c1baa

Please sign in to comment.