Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ RD enable LLVM in purecalc filters #11442

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {


Expand All @@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const override {

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#pragma once

#include "common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq::NRowDispatcher {

Expand All @@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
};
Expand Down
42 changes: 42 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "common.h"

#include <util/system/mutex.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq {

namespace {

class TPureCalcProgramFactory : public IPureCalcProgramFactory {
public:
TPureCalcProgramFactory() {
CreateFactory({.EnabledLLVM = false});
CreateFactory({.EnabledLLVM = true});
}

NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
const auto it = ProgramFactories.find(settings);
Y_ENSURE(it != ProgramFactories.end());
return it->second;
}

private:
void CreateFactory(const TSettings& settings) {
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
NYql::NPureCalc::TProgramFactoryOptions()
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
)});
}

private:
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
};

} // anonymous namespace

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
return MakeIntrusive<TPureCalcProgramFactory>();
}

} // namespace NFq
25 changes: 25 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <util/generic/ptr.h>

#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

class IPureCalcProgramFactory : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;

struct TSettings {
bool EnabledLLVM = false;

std::strong_ordering operator<=>(const TSettings& other) const = default;
};

public:
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
};

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();

} // namespace NFq
20 changes: 12 additions & 8 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,15 @@ class TJsonFilter::TImpl {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Sql(GenerateSql(whereFilter)) {
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Sql(GenerateSql(whereFilter, factorySettings)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = pureCalcProgramFactory->MakePushStreamProgram(
Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
Expand All @@ -291,8 +292,9 @@ class TJsonFilter::TImpl {
}

private:
TString GenerateSql(const TString& whereFilter) {
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
TStringStream str;
str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n";
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";

str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
Expand All @@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) {
}

TJsonFilter::~TJsonFilter() {
Expand All @@ -332,8 +335,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings));
}

} // namespace NFq
9 changes: 6 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include "common.h"

#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>
#include <yql/essentials/public/udf/udf_data_type.h>
#include <yql/essentials/public/udf/udf_value.h>

Expand All @@ -17,7 +18,8 @@ class TJsonFilter {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings);

~TJsonFilter();

Expand All @@ -34,6 +36,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TJsonFilter::TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings);

} // namespace NFq
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "row_dispatcher.h"
#include "common.h"
#include "coordinator.h"

#include <ydb/library/actors/core/actorid.h>
Expand Down Expand Up @@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
TSet<TActorId> CoordinatorChangedSubscribers;
Expand Down Expand Up @@ -362,7 +363,7 @@ TRowDispatcher::TRowDispatcher(
const NYql::IPqGateway::TPtr& pqGateway)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, PureCalcProgramFactory(CreatePureCalcProgramFactory())
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
, LogPrefix("RowDispatcher: ")
Expand Down
17 changes: 10 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 PartitionId;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NYql::ITopicClient::TPtr TopicClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
const i64 BufferSize;
Expand Down Expand Up @@ -190,7 +190,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down Expand Up @@ -281,7 +281,7 @@ TTopicSession::TTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: TopicPath(topicPath)
Expand Down Expand Up @@ -737,10 +737,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
std::forward_as_tuple(ev)).first->second;
UpdateFieldsIds(clientInfo);

TString predicate = clientInfo.Settings.GetSource().GetPredicate();
const auto& source = clientInfo.Settings.GetSource();
TString predicate = source.GetPredicate();

// TODO: remove this when the re-parsing is removed from pq read actor
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
if (predicate.empty() && HasJsonColumns(source)) {
predicate = "WHERE TRUE";
}

Expand All @@ -752,7 +753,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
},
PureCalcProgramFactory);
PureCalcProgramFactory,
{.EnabledLLVM = source.GetEnabledLLVM()}
);
} else {
ClientsWithoutPredicate.insert(ev->Sender);
}
Expand Down Expand Up @@ -993,7 +996,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
Expand All @@ -8,7 +10,6 @@

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -25,7 +26,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/core/fq/libs/events/events.h>

#include <ydb/core/fq/libs/row_dispatcher/common.h>
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>

#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/basics/helpers.h>
#include <ydb/core/testlib/actor_helpers.h>

#include <yql/essentials/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <library/cpp/testing/unittest/registar.h>

Expand All @@ -23,7 +23,7 @@ class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
, Runtime(true)
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
{}
Expand Down Expand Up @@ -65,7 +65,8 @@ class TFixture : public NUnitTest::TBaseFixture {
types,
whereFilter,
callback,
PureCalcProgramFactory);
PureCalcProgramFactory,
{.EnabledLLVM = false});
}

const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {
Expand Down Expand Up @@ -100,7 +101,7 @@ class TFixture : public NUnitTest::TBaseFixture {
});
}

NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
std::unique_ptr<NFq::TJsonFilter> Filter;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
ui32 /*partitionId*/,
NYdb::TDriver /*driver*/,
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/,
IPureCalcProgramFactory::TPtr /*pureCalcProgramFactory*/,
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
auto actorId = Runtime.AllocateEdgeActor();
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace {

Expand All @@ -27,7 +26,7 @@ const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
class TFixture : public NUnitTest::TBaseFixture {
public:
TFixture()
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
, Runtime(true)
{}

Expand Down Expand Up @@ -158,7 +157,7 @@ class TFixture : public NUnitTest::TBaseFixture {
return eventHolder->Get()->Record.MessagesSize();
}

NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
NActors::TActorId TopicSession;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
actors_factory.cpp
common.cpp
coordinator.cpp
json_filter.cpp
json_parser.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/proto/dq_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message TDqPqTopicSource {
string Predicate = 14;
bool SharedReading = 15;
string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m
bool EnabledLLVM = 17;
}

message TDqPqTopicSink {
Expand Down
Loading
Loading