Skip to content

Commit

Permalink
Merge e1ffce2 into a431c94
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Nov 12, 2024
2 parents a431c94 + e1ffce2 commit 3f39b08
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 35 deletions.
4 changes: 1 addition & 3 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {


Expand All @@ -19,7 +17,7 @@ struct TActorFactory : public IActorFactory {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const override {

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#pragma once

#include "common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq::NRowDispatcher {

Expand All @@ -21,7 +22,7 @@ struct IActorFactory : public TThrRefBase {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
};
Expand Down
38 changes: 38 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "common.h"

#include <util/system/mutex.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq {

namespace {

class TPureCalcProgramFactory : public IPureCalcProgramFactory {
public:
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) override {
TGuard<TMutex> guard(FactoriesMutex);

const auto it = ProgramFactories.find(settings);
if (it != ProgramFactories.end()) {
return it->second;
}

return ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
NYql::NPureCalc::TProgramFactoryOptions()
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
)}).first->second;
}

private:
TMutex FactoriesMutex;
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
};

} // anonymous namespace

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
return MakeIntrusive<TPureCalcProgramFactory>();
}

} // namespace NFq
25 changes: 25 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <util/generic/ptr.h>

#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

class IPureCalcProgramFactory : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;

struct TSettings {
bool EnabledLLVM = false;

std::strong_ordering operator<=>(const TSettings& other) const = default;
};

public:
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) = 0;
};

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();

} // namespace NFq
20 changes: 12 additions & 8 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,15 @@ class TJsonFilter::TImpl {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Sql(GenerateSql(whereFilter)) {
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Sql(GenerateSql(whereFilter, factorySettings)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = pureCalcProgramFactory->MakePushStreamProgram(
Program = pureCalcProgramFactory->GetFactory(factorySettings)->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
Expand All @@ -291,8 +292,9 @@ class TJsonFilter::TImpl {
}

private:
TString GenerateSql(const TString& whereFilter) {
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
TStringStream str;
str << "PRAGMA config.flags(\"LLVM\", \"" << (factorySettings.EnabledLLVM ? "ON" : "OFF") << "\");\n";
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";

str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
Expand All @@ -312,8 +314,9 @@ TJsonFilter::TJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings)) {
}

TJsonFilter::~TJsonFilter() {
Expand All @@ -332,8 +335,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory, factorySettings));
}

} // namespace NFq
9 changes: 6 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include "common.h"

#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

Expand All @@ -15,7 +16,8 @@ class TJsonFilter {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings);

~TJsonFilter();

Expand All @@ -32,6 +34,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& types,
const TString& whereFilter,
TJsonFilter::TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings);

} // namespace NFq
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "row_dispatcher.h"
#include "common.h"
#include "coordinator.h"

#include <ydb/library/actors/core/actorid.h>
Expand Down Expand Up @@ -214,7 +215,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
TSet<TActorId> CoordinatorChangedSubscribers;
Expand Down Expand Up @@ -362,7 +363,7 @@ TRowDispatcher::TRowDispatcher(
const NYql::IPqGateway::TPtr& pqGateway)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, PureCalcProgramFactory(CreatePureCalcProgramFactory())
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
, LogPrefix("RowDispatcher: ")
Expand Down
17 changes: 10 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 PartitionId;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NYql::ITopicClient::TPtr TopicClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
const i64 BufferSize;
Expand Down Expand Up @@ -182,7 +182,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down Expand Up @@ -273,7 +273,7 @@ TTopicSession::TTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: TopicPath(topicPath)
Expand Down Expand Up @@ -727,10 +727,11 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
std::forward_as_tuple(ev)).first->second;
UpdateFieldsIds(clientInfo);

TString predicate = clientInfo.Settings.GetSource().GetPredicate();
const auto& source = clientInfo.Settings.GetSource();
TString predicate = source.GetPredicate();

// TODO: remove this when the re-parsing is removed from pq read actor
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
if (predicate.empty() && HasJsonColumns(source)) {
predicate = "WHERE TRUE";
}

Expand All @@ -742,7 +743,9 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
},
PureCalcProgramFactory);
PureCalcProgramFactory,
{.EnabledLLVM = source.GetEnabledLLVM()}
);
} else {
ClientsWithoutPredicate.insert(ev->Sender);
}
Expand Down Expand Up @@ -967,7 +970,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
Expand All @@ -8,7 +10,6 @@

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -25,7 +26,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/core/fq/libs/events/events.h>

#include <ydb/core/fq/libs/row_dispatcher/common.h>
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>

#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/basics/helpers.h>
#include <ydb/core/testlib/actor_helpers.h>

#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <library/cpp/testing/unittest/registar.h>

Expand All @@ -23,7 +23,7 @@ class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
, Runtime(true)
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
{}
Expand Down Expand Up @@ -65,7 +65,8 @@ class TFixture : public NUnitTest::TBaseFixture {
types,
whereFilter,
callback,
PureCalcProgramFactory);
PureCalcProgramFactory,
{.EnabledLLVM = false});
}

const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {
Expand Down Expand Up @@ -100,7 +101,7 @@ class TFixture : public NUnitTest::TBaseFixture {
});
}

NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
std::unique_ptr<NFq::TJsonFilter> Filter;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
ui32 /*partitionId*/,
NYdb::TDriver /*driver*/,
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/,
IPureCalcProgramFactory::TPtr /*pureCalcProgramFactory*/,
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
auto actorId = Runtime.AllocateEdgeActor();
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace {

Expand All @@ -27,7 +26,7 @@ const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
class TFixture : public NUnitTest::TBaseFixture {
public:
TFixture()
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
, Runtime(true)
{}

Expand Down Expand Up @@ -158,7 +157,7 @@ class TFixture : public NUnitTest::TBaseFixture {
return eventHolder->Get()->Record.MessagesSize();
}

NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
NActors::TActorId TopicSession;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
actors_factory.cpp
common.cpp
coordinator.cpp
json_filter.cpp
json_parser.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/proto/dq_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message TDqPqTopicSource {
string Predicate = 14;
bool SharedReading = 15;
string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m
bool EnabledLLVM = 17;
}

message TDqPqTopicSink {
Expand Down
Loading

0 comments on commit 3f39b08

Please sign in to comment.