Skip to content

Commit

Permalink
Merge 6c8320c into ac47d76
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Dec 23, 2024
2 parents ac47d76 + 6c8320c commit b61d2b4
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 26 deletions.
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ message TJsonParserConfig {
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
}

message TCompileServiceConfig {
uint64 ParallelCompilationLimit = 1; // 1 by default
}

message TRowDispatcherConfig {
bool Enabled = 1;
uint64 TimeoutBeforeStartSessionSec = 2;
uint64 SendStatusPeriodSec = 3;
uint64 MaxSessionUsedMemory = 4;
bool WithoutConsumer = 5;
TJsonParserConfig JsonParser = 7;
TCompileServiceConfig CompileService = 8;
TRowDispatcherCoordinatorConfig Coordinator = 6;
}
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/events/data_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct TEvRowDispatcher {
EvGetInternalStateResponse,
EvPurecalcCompileRequest,
EvPurecalcCompileResponse,
EvPurecalcCompileAbort,
EvEnd,
};

Expand Down Expand Up @@ -197,6 +198,8 @@ struct TEvRowDispatcher {
NYql::NDqProto::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};

struct TEvPurecalcCompileAbort : public NActors::TEventLocal<TEvPurecalcCompileAbort, EEv::EvPurecalcCompileAbort> {};
};

} // namespace NFq
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TTopicFilters : public ITopicFilters {
NMonitoring::TDynamicCounters::TCounterPtr InFlightCompileRequests;
NMonitoring::TDynamicCounters::TCounterPtr CompileErrors;

TCounters(NMonitoring::TDynamicCounterPtr counters)
explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
Register();
Expand Down Expand Up @@ -83,6 +83,18 @@ class TTopicFilters : public ITopicFilters {
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId));
}

void AbortCompilation() {
if (!InFlightCompilationId) {
return;
}

LOG_ROW_DISPATCHER_TRACE("Send abort compile request with id " << InFlightCompilationId);
NActors::TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, new TEvRowDispatcher::TEvPurecalcCompileAbort(), 0, InFlightCompilationId));

InFlightCompilationId = 0;
Self.Counters.InFlightCompileRequests->Dec();
}

void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) {
if (ev->Cookie != InFlightCompilationId) {
LOG_ROW_DISPATCHER_DEBUG("Outdated compiler response ignored for id " << ev->Cookie << ", current compile id " << InFlightCompilationId);
Expand Down Expand Up @@ -205,7 +217,14 @@ class TTopicFilters : public ITopicFilters {

void RemoveFilter(NActors::TActorId filterId) override {
LOG_ROW_DISPATCHER_TRACE("Remove filter with id " << filterId);
Filters.erase(filterId);

const auto it = Filters.find(filterId);
if (it == Filters.end()) {
return;
}

it->second.AbortCompilation();
Filters.erase(it);
}

TFiltersStatistic GetStatistics() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void Handle(NActors::TEvents::TEvPoison::TPtr&) {
with_lock(Alloc) {
Clients.clear();
if (Filters) {
for (const auto& [clientId, _] : Clients) {
Filters->RemoveFilter(clientId);
}
Filters.Reset();
}
PassAway();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TFiterFixture : public TBaseFixture {
virtual void SetUp(NUnitTest::TTestContext& ctx) override {
TBase::SetUp(ctx);

CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService());
CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService({}, MakeIntrusive<NMonitoring::TDynamicCounters>()));
}

virtual void TearDown(NUnitTest::TTestContext& ctx) override {
Expand Down
177 changes: 158 additions & 19 deletions ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>

#include <yql/essentials/public/purecalc/common/interface.h>
Expand All @@ -12,26 +13,49 @@ namespace NFq::NRowDispatcher {

namespace {

class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
using TBase = NActors::TActor<TPurecalcCompileService>;
struct TEvPrivate {
// Event ids
enum EEv : ui32 {
EvCompileFinished = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");

// Events
struct TEvCompileFinished : public NActors::TEventLocal<TEvCompileFinished, EvCompileFinished> {
TEvCompileFinished(NActors::TActorId requestActor, ui64 requestId)
: RequestActor(requestActor)
, RequestId(requestId)
{}

const NActors::TActorId RequestActor;
const ui64 RequestId;
};
};

class TPurecalcCompileActor : public NActors::TActorBootstrapped<TPurecalcCompileActor> {
public:
TPurecalcCompileService()
: TBase(&TPurecalcCompileService::StateFunc)
, LogPrefix("TPurecalcCompileService: ")
TPurecalcCompileActor(NActors::TActorId owner, NYql::NPureCalc::IProgramFactoryPtr factory, TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr request)
: Owner(owner)
, Factory(factory)
, LogPrefix(TStringBuilder() << "TPurecalcCompileActor " << request->Sender << " [id " << request->Cookie << "]: ")
, Request(std::move(request))
{}

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
)
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_ACTOR";

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie);
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);
void Bootstrap() {
Y_DEFER {
Finish();
};

LOG_ROW_DISPATCHER_TRACE("Started compile request");
IProgramHolder::TPtr programHolder = std::move(Request->Get()->ProgramHolder);

TStatus status = TStatus::Success();
try {
programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings));
programHolder->CreateProgram(Factory);
} catch (const NYql::NPureCalc::TCompileError& error) {
status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues())
.AddIssue(TStringBuilder() << "Final yql: " << error.GetYql())
Expand All @@ -41,15 +65,122 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
}

if (status.IsFail()) {
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie);
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, ev->Cookie);
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request");
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorDescription()), 0, Request->Cookie);
} else {
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie);
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request");
Send(Request->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, Request->Cookie);
}
}

private:
void Finish() {
Send(Owner, new TEvPrivate::TEvCompileFinished(Request->Sender, Request->Cookie));
PassAway();
}

private:
const NActors::TActorId Owner;
const NYql::NPureCalc::IProgramFactoryPtr Factory;
const TString LogPrefix;

TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr Request;
};

class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService> {
using TBase = NActors::TActor<TPurecalcCompileService>;

struct TCounters {
const NMonitoring::TDynamicCounterPtr Counters;

NMonitoring::TDynamicCounters::TCounterPtr ActiveCompileActors;
NMonitoring::TDynamicCounters::TCounterPtr CompileQueueSize;

explicit TCounters(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
Register();
}

private:
void Register() {
ActiveCompileActors = Counters->GetCounter("ActiveCompileActors", false);
CompileQueueSize = Counters->GetCounter("CompileQueueSize", false);
}
};

public:
TPurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters)
: TBase(&TPurecalcCompileService::StateFunc)
, Config(config)
, InFlightLimit(Config.GetParallelCompilationLimit() ? Config.GetParallelCompilationLimit() : 1)
, LogPrefix("TPurecalcCompileService: ")
, Counters(counters)
{}

static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_COMPILE_SERVICE";

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
hFunc(TEvRowDispatcher::TEvPurecalcCompileAbort, Handle)
hFunc(TEvPrivate::TEvCompileFinished, Handle);
)

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
const auto requestActor = ev->Sender;
const ui64 requestId = ev->Cookie;
LOG_ROW_DISPATCHER_TRACE("Add to compile queue request with id " << requestId << " from " << requestActor);

// Remove old compile request
RemoveRequest(requestActor, requestId);

// Add new request
RequestsQueue.emplace_back(std::move(ev));
Y_ENSURE(RequestsIndex.emplace(std::make_pair(requestActor, requestId), --RequestsQueue.end()).second);
Counters.CompileQueueSize->Inc();

StartCompilation();
}

void Handle(TEvRowDispatcher::TEvPurecalcCompileAbort::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Abort compile request with id " << ev->Cookie << " from " << ev->Sender);

RemoveRequest(ev->Sender, ev->Cookie);
}

void Handle(TEvPrivate::TEvCompileFinished::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Compile finished for request with id " << ev->Get()->RequestId << " from " << ev->Get()->RequestActor);

InFlightCompilations.erase(ev->Sender);
Counters.ActiveCompileActors->Dec();

StartCompilation();
}

private:
void RemoveRequest(NActors::TActorId requestActor, ui64 requestId) {
const auto it = RequestsIndex.find(std::make_pair(requestActor, requestId));
if (it == RequestsIndex.end()) {
return;
}

RequestsQueue.erase(it->second);
RequestsIndex.erase(it);
Counters.CompileQueueSize->Dec();
}

void StartCompilation() {
while (!RequestsQueue.empty() && InFlightCompilations.size() < InFlightLimit) {
auto request = std::move(RequestsQueue.front());
RemoveRequest(request->Sender, request->Cookie);

const auto factory = GetOrCreateFactory(request->Get()->Settings);
const auto compileActor = Register(new TPurecalcCompileActor(SelfId(), factory, std::move(request)));
Y_ENSURE(InFlightCompilations.emplace(compileActor).second);
Counters.ActiveCompileActors->Inc();
}
}

NYql::NPureCalc::IProgramFactoryPtr GetOrCreateFactory(const TPurecalcCompileSettings& settings) {
const auto it = ProgramFactories.find(settings);
if (it != ProgramFactories.end()) {
Expand All @@ -62,15 +193,23 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
}

private:
const NConfig::TCompileServiceConfig Config;
const ui64 InFlightLimit;
const TString LogPrefix;

std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr> RequestsQueue;
THashMap<std::pair<NActors::TActorId, ui64>, std::list<TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr>::iterator> RequestsIndex;
std::unordered_set<NActors::TActorId> InFlightCompilations;

std::map<TPurecalcCompileSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;

const TCounters Counters;
};

} // namespace {
} // anonymous namespace

NActors::IActor* CreatePurecalcCompileService() {
return new TPurecalcCompileService();
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters) {
return new TPurecalcCompileService(config, counters);
}

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>

#include <ydb/library/actors/core/actor.h>

namespace NFq::NRowDispatcher {

NActors::IActor* CreatePurecalcCompileService();
NActors::IActor* CreatePurecalcCompileService(const NConfig::TCompileServiceConfig& config, NMonitoring::TDynamicCounterPtr counters);

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(

PEERDIR(
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/row_dispatcher/events
ydb/core/fq/libs/row_dispatcher/format_handler/common
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ void TRowDispatcher::Bootstrap() {
auto coordinatorId = Register(NewCoordinator(SelfId(), config, YqSharedResources, Tenant, Counters).release());
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());

CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService());
CompileServiceActorId = Register(NRowDispatcher::CreatePurecalcCompileService(Config.GetCompileService(), Counters));

Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Expand Down

0 comments on commit b61d2b4

Please sign in to comment.