Skip to content

Commit

Permalink
YQL-17542 finalize split sync async CAs (#1689)
Browse files Browse the repository at this point in the history
* Yql 17542 simplify alloc in compute actor (#1452)

* YQL-17542 Simplify allocator usage in ComputeActors

* fix build

* fix dup

* YQL-17542 move SaveState LoadState (#1474)

* YQL-17703 always use sized allocator in CA (#1522)

* YQL-17542 split FillIoMaps (#1537)

* YQL-17755 fix drying input up (#1604)

* YQL-17542 split stat (#1553)

* YQL-17542 remove transition guards (#1610)

* YQL-17542 get rid of std::any in handling sources state (#1635)

* YQL-17755 ut for TComputeActorAsyncInputHelperTest::PollAsyncInput (#1626)

* YQL-17542 move TaskRunner dependent Execute to TDqSyncComputeActorBase (#1599)

* YQL-17542 move TaskRunner dependent Execute to TDqSyncComputeActorBase (#1666)
  • Loading branch information
zverevgeny authored Feb 9, 2024
1 parent 9608841 commit ae6ef0c
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 183 deletions.
8 changes: 2 additions & 6 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev)
}
}

void TKqpScanComputeActor::PollSources(std::any prev) {
void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) {
if (!ScanData || ScanData->IsFinished()) {
return;
}
const auto hasNewMemoryPred = [&]() {
if (!prev.has_value()) {
return false;
}
const ui64 freeSpace = CalculateFreeSpace();
const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
return freeSpace > prevFreeSpace;
};
if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) {
return;
}
const ui32 freeSpace = CalculateFreeSpace();
const ui64 freeSpace = CalculateFreeSpace();
CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace);
for (auto&& i : Fetchers) {
Send(i, new TEvScanExchange::TEvAckData(freeSpace));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
: 0ul;
}

std::any GetSourcesState() override {
ui64 GetSourcesState() {
if (!ScanData) {
return 0;
}
return CalculateFreeSpace();
}

void PollSources(std::any prev) override;
void PollSources(ui64 prevFreeSpace);

void PassAway() override {
if (TaskRunner) {
Expand Down
24 changes: 6 additions & 18 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,18 +461,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
return inputChannel->FreeSpace;
}

TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() override {
return TypeEnv->BindAllocator();
}

std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> MaybeBindAllocator() override {
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard;
if (TypeEnv) {
guard.emplace(TypeEnv->BindAllocator());
}
return guard;
}

void OnTaskRunnerCreated(NTaskRunnerActor::TEvTaskRunnerCreateFinished::TPtr& ev) {
const auto& secureParams = ev->Get()->SecureParams;
const auto& taskParams = ev->Get()->TaskParams;
Expand All @@ -483,7 +471,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
Stat->AddCounters2(ev->Get()->Sensors);
}
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges);
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);

{
// say "Hello" to executer
Expand Down Expand Up @@ -517,7 +505,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC

MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit;
ProfileStats = std::move(ev->Get()->ProfileStats);
auto sourcesState = GetSourcesState();
auto status = ev->Get()->RunStatus;

CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState
Expand All @@ -536,10 +523,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}
}

if (status != ERunStatus::Finished) {
PollSources(std::move(sourcesState));
}

if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
WatermarksTracker.PopPendingWatermark();
Expand Down Expand Up @@ -801,6 +784,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
return TaskRunnerStats.Get();
}

const NYql::NDq::TDqMeteringStats* GetMeteringStats() override {
// TODO: support async CA
return nullptr;
}

template<typename TSecond>
TVector<ui32> GetIds(const THashMap<ui64, TSecond>& collection) {
TVector<ui32> ids;
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,6 @@ struct TComputeMemoryLimits {
IMemoryQuotaManager::TPtr MemoryQuotaManager;
};

//temporary flag to integarate changes in interface
#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1

using TTaskRunnerFactory = std::function<
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ struct TComputeActorAsyncInputHelper {
Pause(*watermark);
}
}
const bool emptyBatch = batch.empty();
AsyncInputPush(std::move(batch), space, finished);
if (!batch.empty()) {
if (!emptyBatch) {
// If we have read some data, we must run such reading again
// to process the case when async input notified us about new data
// but we haven't read all of it.
Expand Down
Loading

0 comments on commit ae6ef0c

Please sign in to comment.