Skip to content

Commit

Permalink
YQL-17542 get rid of std::any in handling sources state (ydb-platform…
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny committed Feb 8, 2024
1 parent aef7c6c commit 7997234
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
8 changes: 2 additions & 6 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev)
}
}

void TKqpScanComputeActor::PollSources(std::any prev) {
void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) {
if (!ScanData || ScanData->IsFinished()) {
return;
}
const auto hasNewMemoryPred = [&]() {
if (!prev.has_value()) {
return false;
}
const ui64 freeSpace = CalculateFreeSpace();
const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
return freeSpace > prevFreeSpace;
};
if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) {
return;
}
const ui32 freeSpace = CalculateFreeSpace();
const ui64 freeSpace = CalculateFreeSpace();
CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace);
for (auto&& i : Fetchers) {
Send(i, new TEvScanExchange::TEvAckData(freeSpace));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
: 0ul;
}

std::any GetSourcesState() override {
ui64 GetSourcesState() {
if (!ScanData) {
return 0;
}
return CalculateFreeSpace();
}

void PollSources(std::any prev) override;
void PollSources(ui64 prevFreeSpace);

void PassAway() override {
if (TaskRunner) {
Expand Down
20 changes: 11 additions & 9 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

virtual void DoExecuteImpl() {
auto sourcesState = GetSourcesState();
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();

PollAsyncInput();
ERunStatus status = TaskRunner->Run();

CA_LOG_T("Resume execution, run status: " << status);

if (status != ERunStatus::Finished) {
PollSources(std::move(sourcesState));
static_cast<TDerived*>(this)->PollSources(std::move(sourcesState));
}

if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
Expand Down Expand Up @@ -1049,13 +1049,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
protected:
// virtual methods (TODO: replace with static_cast<TDerived*>(this)->Foo()

virtual std::any GetSourcesState() {
return nullptr;
}

virtual void PollSources(std::any /* state */) {
}

virtual void TerminateSources(const TIssues& /* issues */, bool /* success */) {
}

Expand All @@ -1071,6 +1064,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
return true;
}

protected:
// methods that are called via static_cast<TDerived*>(this) and may be overriden by a dervied class
void* GetSourcesState() const {
return nullptr;
}
void PollSources(void* /* state */) {
}


protected:
void HandleExecuteBase(TEvDqCompute::TEvResumeExecution::TPtr&) {
ResumeEventScheduled = false;
Expand Down

0 comments on commit 7997234

Please sign in to comment.