Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checkpoint support for streamlookup (backport #9299) #9719

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1369,8 +1369,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

void PollAsyncInput() {
if (!Running) {
CA_LOG_T("Skip polling inputs and sources because not running");
return;
}

CA_LOG_T("Poll inputs");
for (auto& [inputIndex, transform] : InputTransformsMap) {
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
ContinueExecute(*resume);
}
}

// Don't produce any input from sources if we're about to save checkpoint.
if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
CA_LOG_T("Skip polling sources because of pending checkpoint");
return;
}
Expand All @@ -1381,13 +1393,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ContinueExecute(*resume);
}
}

CA_LOG_T("Poll inputs");
for (auto& [inputIndex, transform] : InputTransformsMap) {
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
ContinueExecute(*resume);
}
}
}

void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TInputTransformStreamLookupBase
}
}
finished = IsFinished();
return 0;
return AwaitingQueue.RowCount();
}

TMaybe<google::protobuf::Any> ExtraData() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ class TLocalTaskRunnerActor
return false;
}
}
for (const auto transformId: InputTransforms) {
const auto t = TaskRunner->GetInputTransform(transformId);
if (t) {
auto [_, transform] = *t;
if (!transform->Empty()) {
return false;
}
if (transform->IsPending()) {
return false;
}
}
}
return true;
}

Expand Down Expand Up @@ -436,6 +448,7 @@ class TLocalTaskRunnerActor
for (auto i = 0; i != inputs.size(); ++i) {
if (auto t = TaskRunner->GetInputTransform(i)) {
inputTransforms[i] = *t;
InputTransforms.emplace(i);
}
}

Expand Down Expand Up @@ -488,6 +501,7 @@ class TLocalTaskRunnerActor
const TTxId TxId;
const ui64 TaskId;
THashSet<ui32> Inputs;
THashSet<ui32> InputTransforms;
THashSet<ui32> Sources;
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
THashSet<ui32> InputChannelsWithDisabledCheckpoints;
Expand Down
7 changes: 6 additions & 1 deletion ydb/library/yql/dq/runtime/dq_async_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NYql::NDq {
class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> {
using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>;
friend TBaseImpl;
bool Pending = false;
public:
TDqAsyncInputBufferStats PushStats;
TDqInputStats PopStats;
Expand All @@ -32,7 +33,7 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
}

void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
Y_ABORT_UNLESS(!batch.empty() || !space);
Pending = space != 0;
if (!batch.empty()) {
AddBatch(std::move(batch), space);
}
Expand All @@ -41,6 +42,10 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
virtual void Push(TDqSerializedBatch&&, i64) override {
YQL_ENSURE(!"Unimplemented");
}

bool IsPending() const override {
return Pending;
}
};

IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/runtime/dq_async_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class IDqAsyncInputBuffer : public IDqInput {
virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;

virtual void Finish() = 0;

virtual bool IsPending() const { return false; };
};

IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,
Expand Down
Loading