From 833b161ad58a61385170c69070c9e0073b18a34c Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 11 Dec 2024 11:23:52 +0000 Subject: [PATCH 1/3] Added parallel compilation --- .../libs/config/protos/row_dispatcher.proto | 5 + .../libs/row_dispatcher/events/data_plane.h | 3 + .../format_handler/filters/filters_set.cpp | 23 ++- .../format_handler/format_handler.cpp | 7 +- .../format_handler/ut/topic_filter_ut.cpp | 2 +- .../purecalc_compilation/compile_service.cpp | 173 ++++++++++++++++-- .../purecalc_compilation/compile_service.h | 4 +- .../purecalc_compilation/ya.make | 1 + .../fq/libs/row_dispatcher/row_dispatcher.cpp | 2 +- 9 files changed, 194 insertions(+), 26 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 2ee16c93ddef..02d4117a8018 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -25,6 +25,10 @@ message TJsonParserConfig { uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6 } +message TCompileServiceConfig { + uint64 ParallelCompilationLimit = 1; // 1 by default, 0 <=> unlimited +} + message TRowDispatcherConfig { bool Enabled = 1; uint64 TimeoutBeforeStartSessionSec = 2; @@ -32,5 +36,6 @@ message TRowDispatcherConfig { uint64 MaxSessionUsedMemory = 4; bool WithoutConsumer = 5; TJsonParserConfig JsonParser = 7; + TCompileServiceConfig CompileService = 8; TRowDispatcherCoordinatorConfig Coordinator = 6; } diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 891c729cf0f9..99610f5c5f66 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -51,6 +51,7 @@ struct TEvRowDispatcher { EvGetInternalStateResponse, EvPurecalcCompileRequest, EvPurecalcCompileResponse, + EvPurecalcCompileAbort, EvEnd, }; @@ -197,6 +198,8 @@ struct TEvRowDispatcher { NYql::NDqProto::StatusIds::StatusCode Status; NYql::TIssues Issues; }; + + struct TEvPurecalcCompileAbort : public NActors::TEventLocal {}; }; } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index ae88416712f3..571820fd6a59 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -14,7 +14,7 @@ class TTopicFilters : public ITopicFilters { NMonitoring::TDynamicCounters::TCounterPtr InFlightCompileRequests; NMonitoring::TDynamicCounters::TCounterPtr CompileErrors; - TCounters(NMonitoring::TDynamicCounterPtr counters) + explicit TCounters(NMonitoring::TDynamicCounterPtr counters) : Counters(counters) { Register(); @@ -83,6 +83,18 @@ class TTopicFilters : public ITopicFilters { NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId)); } + void AbortCompilation() { + if (!InFlightCompilationId) { + return; + } + + LOG_ROW_DISPATCHER_TRACE("Send abort compile request with id " << InFlightCompilationId); + NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, new TEvRowDispatcher::TEvPurecalcCompileAbort(), 0, InFlightCompilationId)); + + InFlightCompilationId = 0; + Self.Counters.InFlightCompileRequests->Dec(); + } + void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) { if (ev->Cookie != InFlightCompilationId) { LOG_ROW_DISPATCHER_DEBUG("Outdated compiler response ignored for id " << ev->Cookie << ", current compile id " << InFlightCompilationId); @@ -205,7 +217,14 @@ class TTopicFilters : public ITopicFilters { void RemoveFilter(NActors::TActorId filterId) override { LOG_ROW_DISPATCHER_TRACE("Remove filter with id " << filterId); - Filters.erase(filterId); + + const auto it = Filters.find(filterId); + if (it == Filters.end()) { + return; + } + + it->second.AbortCompilation(); + Filters.erase(it); } TFiltersStatistic GetStatistics() override { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index c0cdc70173d6..3ea677e990fa 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -333,8 +333,11 @@ class TTopicFormatHandler : public NActors::TActor, public } void Handle(NActors::TEvents::TEvPoison::TPtr&) { - with_lock(Alloc) { - Clients.clear(); + if (Filters) { + for (const auto& [clientId, _] : Clients) { + Filters->RemoveFilter(clientId); + } + Filters.Reset(); } PassAway(); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 469e4f87c5b2..55f6fd8f73d4 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -92,7 +92,7 @@ class TFiterFixture : public TBaseFixture { virtual void SetUp(NUnitTest::TTestContext& ctx) override { TBase::SetUp(ctx); - CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService()); + CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService({}, MakeIntrusive())); } virtual void TearDown(NUnitTest::TTestContext& ctx) override { diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp index 57e410f5ebb9..7589a6f38372 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -12,26 +13,47 @@ namespace NFq::NRowDispatcher { namespace { -class TPurecalcCompileService : public NActors::TActor { - using TBase = NActors::TActor; +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvCompileFinished = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + // Events + struct TEvCompileFinished : public NActors::TEventLocal { + TEvCompileFinished(NActors::TActorId requestActor, ui64 requestId) + : RequestActor(requestActor) + , RequestId(requestId) + {} + + const NActors::TActorId RequestActor; + const ui64 RequestId; + }; +}; +class TPurecalcCompileActor : public NActors::TActorBootstrapped { public: - TPurecalcCompileService() - : TBase(&TPurecalcCompileService::StateFunc) - , LogPrefix("TPurecalcCompileService: ") + TPurecalcCompileActor(NActors::TActorId owner, NYql::NPureCalc::IProgramFactoryPtr factory, TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr request) + : Owner(owner) + , Factory(factory) + , LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ") + , Request(std::move(request)) {} - STRICT_STFUNC(StateFunc, - hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); - ) + void Bootstrap() { + Y_DEFER { + Finish(); + }; - void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie); - IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder); + LOG_ROW_DISPATCHER_TRACE("Started compile request"); + IProgramHolder::TPtr programHolder = std::move(Request->Get()->ProgramHolder); TStatus status = TStatus::Success(); try { - programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings)); + programHolder->CreateProgram(Factory); } catch (const NYql::NPureCalc::TCompileError& error) { status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues()) .AddIssue(TStringBuilder() << "Final yql: " << error.GetYql()) @@ -41,15 +63,120 @@ class TPurecalcCompileService : public NActors::TActor } if (status.IsFail()) { - LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie); - Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, ev->Cookie); + LOG_ROW_DISPATCHER_ERROR("Compilation failed for request"); + Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, Request->Cookie); } else { - LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie); - Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie); + LOG_ROW_DISPATCHER_TRACE("Compilation completed for request"); + Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, Request->Cookie); } } private: + void Finish() { + Send(Owner, new TEvPrivate::TEvCompileFinished(Request->Sender, Request->Cookie)); + PassAway(); + } + +private: + const NActors::TActorId Owner; + const NYql::NPureCalc::IProgramFactoryPtr Factory; + const TString LogPrefix; + + TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr Request; +}; + +class TPurecalcCompileService : public NActors::TActor { + using TBase = NActors::TActor; + + struct TCounters { + const NMonitoring::TDynamicCounterPtr Counters; + + NMonitoring::TDynamicCounters::TCounterPtr ActiveCompileActors; + NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize; + + explicit TCounters(NMonitoring::TDynamicCounterPtr counters) + : Counters(counters) + { + Register(); + } + + private: + void Register() { + ActiveCompileActors = Counters->GetCounter("ActiveCompileActors", false); + CompileQueueSize = Counters->GetCounter("CompileQueueSize", false); + } + }; + +public: + TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) + : TBase(&TPurecalcCompileService::StateFunc) + , Config(config) + , InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : std::numeric_limits::max()) + , LogPrefix("TPurecalcCompileService: ") + , Counters(counters) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); + hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle) + hFunc(TEvPrivate::TEvCompileFinished, Handle); + ) + + void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { + const auto requestActor = ev->Sender; + const ui64 requestId = ev->Cookie; + LOG_ROW_DISPATCHER_TRACE("Add to compile queue request with id " << requestId << " from " << requestActor); + + // Remove old compile request + RemoveRequest(requestActor, requestId); + + // Add new request + RequestsQueue.emplace_back(std::move(ev)); + Y_ENSURE(RequestsIndex.emplace(std::make_pair(requestActor, requestId), --RequestsQueue.end()).second); + Counters.CompileQueueSize->Inc(); + + StartCompilation(); + } + + void Handle(TEvRowDispatcher::TEvPurecalcCompileAbort::TPtr& ev) { + LOG_ROW_DISPATCHER_TRACE("Abort compile request with id " << ev->Cookie << " from " << ev->Sender); + + RemoveRequest(ev->Sender, ev->Cookie); + } + + void Handle(TEvPrivate::TEvCompileFinished::TPtr& ev) { + LOG_ROW_DISPATCHER_TRACE("Compile finished for request with id " << ev->Get()->RequestId << " from " << ev->Get()->RequestActor); + + InFlightCompilations.erase(ev->Sender); + Counters.ActiveCompileActors->Dec(); + + StartCompilation(); + } + +private: + void RemoveRequest(NActors::TActorId requestActor, ui64 requestId) { + const auto it = RequestsIndex.find(std::make_pair(requestActor, requestId)); + if (it == RequestsIndex.end()) { + return; + } + + RequestsQueue.erase(it->second); + RequestsIndex.erase(it); + Counters.CompileQueueSize->Dec(); + } + + void StartCompilation() { + while (!RequestsQueue.empty() && InFlightCompilations.size() < InFlightLimit) { + auto request = std::move(RequestsQueue.front()); + RemoveRequest(request->Sender, request->Cookie); + + const auto factory = GetOrCreateFactory(request->Get()->Settings); + const auto compileActor = Register(new TPurecalcCompileActor(SelfId(), factory, std::move(request))); + Y_ENSURE(InFlightCompilations.emplace(compileActor).second); + Counters.ActiveCompileActors->Inc(); + } + } + NYql::NPureCalc::IProgramFactoryPtr GetOrCreateFactory(const TPurecalcCompileSettings& settings) { const auto it = ProgramFactories.find(settings); if (it != ProgramFactories.end()) { @@ -62,15 +189,23 @@ class TPurecalcCompileService : public NActors::TActor } private: + const NConfig::TCompileServiceConfig Config; + const ui64 InFlightLimit; const TString LogPrefix; + std::list RequestsQueue; + THashMap, std::list::iterator> RequestsIndex; + std::unordered_set InFlightCompilations; + std::map ProgramFactories; + + const TCounters Counters; }; -} // namespace { +} // anonymous namespace -NActors::IActor* CreatePurecalcCompileService() { - return new TPurecalcCompileService(); +NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) { + return new TPurecalcCompileService(config, counters); } } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h index 4675cf4d3be4..1ffd534ac99b 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h @@ -1,9 +1,11 @@ #pragma once +#include + #include namespace NFq::NRowDispatcher { -NActors::IActor* CreatePurecalcCompileService(); +NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters); } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make index a1508a8591b2..4f4b28718dfd 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( ydb/core/fq/libs/actors/logging + ydb/core/fq/libs/config/protos ydb/core/fq/libs/row_dispatcher/events ydb/core/fq/libs/row_dispatcher/format_handler/common ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 6e4aeebe0222..79820bfd75f7 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -471,7 +471,7 @@ void TRowDispatcher::Bootstrap() { auto coordinatorId = Register(NewCoordinator(SelfId(), config, YqSharedResources, Tenant, Counters).release()); Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release()); - CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService()); + CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService(Config.GetCompileService(), Counters)); Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing()); Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics()); From 0f21c64fd8f56dccaecee411a0e5e8470073ff21 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 11 Dec 2024 11:28:40 +0000 Subject: [PATCH 2/3] Fixed default value --- ydb/core/fq/libs/config/protos/row_dispatcher.proto | 2 +- .../row_dispatcher/purecalc_compilation/compile_service.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 02d4117a8018..e05dd9b82edc 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -26,7 +26,7 @@ message TJsonParserConfig { } message TCompileServiceConfig { - uint64 ParallelCompilationLimit = 1; // 1 by default, 0 <=> unlimited + uint64 ParallelCompilationLimit = 1; // 1 by default } message TRowDispatcherConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp index 7589a6f38372..8314b1c304be 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp @@ -111,7 +111,7 @@ class TPurecalcCompileService : public NActors::TActor TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) : TBase(&TPurecalcCompileService::StateFunc) , Config(config) - , InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : std::numeric_limits::max()) + , InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : 1) , LogPrefix("TPurecalcCompileService: ") , Counters(counters) {} From 6c8320c61c6d2058e5ee67fb0edd1430052717da Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Dec 2024 07:53:27 +0000 Subject: [PATCH 3/3] Added ActorName into compile actors --- .../row_dispatcher/purecalc_compilation/compile_service.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp index 8314b1c304be..e79c26c17ace 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp @@ -43,6 +43,8 @@ class TPurecalcCompileActor : public NActors::TActorBootstrapped , Counters(counters) {} + static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_SERVICE"; + STRICT_STFUNC(StateFunc, hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle)