From 7389dc6ce17c5bf34f3de66bae5b602981726747 Mon Sep 17 00:00:00 2001 From: zverevgeny Date: Tue, 28 May 2024 15:42:53 +0300 Subject: [PATCH] YQ-2068 YT emu lookup source actor (#4869) --- .../yql/providers/yt/actors/ut/ya.make | 24 ++ .../yt/actors/ut/yql_yt_lookup_actor_ut.cpp | 182 +++++++++++++ ydb/library/yql/providers/yt/actors/ya.make | 19 ++ .../yt/actors/yql_yt_lookup_actor.cpp | 239 ++++++++++++++++++ .../providers/yt/actors/yql_yt_lookup_actor.h | 26 ++ .../yql/providers/yt/gateway/file/ya.make | 1 + .../gateway/file/yql_yt_file_comp_nodes.cpp | 69 +---- .../yt/gateway/file/yql_yt_file_text_yson.cpp | 74 ++++++ .../yt/gateway/file/yql_yt_file_text_yson.h | 14 + .../yql/providers/yt/proto/source.proto | 9 + ydb/library/yql/providers/yt/proto/ya.make | 11 + ydb/library/yql/providers/yt/ya.make | 2 + 12 files changed, 602 insertions(+), 68 deletions(-) create mode 100644 ydb/library/yql/providers/yt/actors/ut/ya.make create mode 100644 ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp create mode 100644 ydb/library/yql/providers/yt/actors/ya.make create mode 100644 ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp create mode 100644 ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h create mode 100644 ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp create mode 100644 ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h create mode 100644 ydb/library/yql/providers/yt/proto/source.proto create mode 100644 ydb/library/yql/providers/yt/proto/ya.make diff --git a/ydb/library/yql/providers/yt/actors/ut/ya.make b/ydb/library/yql/providers/yt/actors/ut/ya.make new file mode 100644 index 000000000000..fb8b00ce3a84 --- /dev/null +++ b/ydb/library/yql/providers/yt/actors/ut/ya.make @@ -0,0 +1,24 @@ +UNITTEST_FOR(ydb/library/yql/providers/yt/actors) + +PEERDIR( + ydb/library/yql/providers/yt/codec/codegen/no_llvm + ydb/library/yql/providers/yt/comp_nodes/no_llvm + ydb/library/yql/providers/yt/gateway/file + ydb/library/yql/minikql/codegen/no_llvm + ydb/library/actors/testlib + ydb/library/yql/public/udf + library/cpp/testing/unittest + ydb/library/yql/sql/pg + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/parser/pg_wrapper/interface + +) + +SRCS( + yql_yt_lookup_actor_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +END() + diff --git a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp new file mode 100644 index 000000000000..f35aada9308a --- /dev/null +++ b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp @@ -0,0 +1,182 @@ + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +using namespace NYql; +using namespace NActors; + +Y_UNIT_TEST_SUITE(YtLookupActor) { + +NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list members) { + NUdf::TUnboxedValue* items; + NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items); + for (size_t i = 0; i != members.size(); ++i) { + items[i] = NKikimr::NMiniKQL::MakeString(*(members.begin() + i)); + } + return result; +} + +bool CheckStructValue(const NUdf::TUnboxedValue& v, std::initializer_list members) { + for (size_t i = 0; i != members.size(); ++i) { + NUdf::TUnboxedValue m = v.GetElement(i); + if (m.AsStringRef() != *(members.begin() + i)) { + return false; + } + } + return true; +} + +//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread +class TCallLookupActor: public TActorBootstrapped { +public: + TCallLookupActor( + std::shared_ptr alloc, + NYql::NDq::IDqAsyncLookupSource* lookupSource, + NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp) + : Alloc(alloc) + , LookupSource(lookupSource) + , KeysToLookUp(std::move(keysToLookUp)) + { + } + + void Bootstrap() { + LookupSource->AsyncLookup(std::move(KeysToLookUp)); + auto guard = Guard(*Alloc); + KeysToLookUp.clear(); + KeysToLookUp.shrink_to_fit(); + } + +private: + static constexpr char ActorName[] = "TEST"; + +private: + std::shared_ptr Alloc; + NYql::NDq::IDqAsyncLookupSource* LookupSource; + NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp; +}; + +Y_UNIT_TEST(Lookup) { + auto alloc = std::make_shared(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false); + TIntrusivePtr functionRegistry = CreateFunctionRegistry(NKikimr::NMiniKQL::IBuiltinFunctionRegistry::TPtr()); + NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage"); + NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage); + NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc); + NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv); + + auto loggerConfig = NYql::NProto::TLoggingConfig(); + loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE); + NYql::NLog::InitLogger(loggerConfig, false); + + TTestActorRuntimeBase runtime; + runtime.Initialize(); + auto edge = runtime.AllocateEdgeActor(); + + NYql::NYt::NSource::TLookupSource source; + source.SetCluster("Plato"); + source.SetTable("Lookup"); + source.SetRowSpec(R"( +{"_yql_row_spec"={ + "Type"=["StructType";[ + ["hostname";["DataType";"String"]]; + ["network";["DataType";"String"]]; + ["fqdn";["DataType";"String"]]; + ["ip4";["DataType";"String"]]; + ["ip6";["DataType";"String"]] + ]]; +}} + )"); + + NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv}; + keyTypeBuilder.Add("hostname", typeBuilder.NewDataType(NUdf::EDataSlot::String, false)); + keyTypeBuilder.Add("network", typeBuilder.NewDataType(NUdf::EDataSlot::String, false)); + NKikimr::NMiniKQL::TStructTypeBuilder payloadTypeBuilder{typeEnv}; + payloadTypeBuilder.Add("fqdn", typeBuilder.NewDataType(NUdf::EDataSlot::String, true)); + payloadTypeBuilder.Add("ip4", typeBuilder.NewDataType(NUdf::EDataSlot::String, true)); + + TTempFileHandle lookupTable("lookup.txt"); + TString lookupTableData = R"( +{"hostname"="host1";"network"="vpc1";"fqdn"="host1.vpc1.net";"ip4"="192.168.1.1"; "ip6"="[xxxx:xxxx:xxxx:1111]"}; +{"hostname"="host2";"network"="vpc1";"fqdn"="host2.vpc1.net";"ip4"="192.168.1.2"; "ip6"="[xxxx:xxxx:xxxx:2222]"}; +{"hostname"="host1";"network"="vpc2";"fqdn"="host2.vpc2.net";"ip4"="192.168.2.1"; "ip6"="[xxxx:xxxx:xxxx:3333]"}; +{"hostname"="very very long hostname to for test 1";"network"="vpc1";"fqdn"="very very long fqdn for test 1";"ip4"="192.168.100.1"; "ip6"="[xxxx:xxxx:XXXX:1111]"}; +{"hostname"="very very long hostname to for test 2";"network"="vpc2";"fqdn"="very very long fqdn for test 2";"ip4"="192.168.100.2"; "ip6"="[xxxx:xxxx:XXXX:2222]"}; + )"; + lookupTable.Write(lookupTableData.data(), lookupTableData.size()); + const THashMap mapping = { + {"yt.Plato.Lookup", lookupTable.Name()} + }; + auto ytServices = NFile::TYtFileServices::Make( + nullptr, + mapping + ); + auto guard = Guard(*alloc.get()); + auto [lookupSource, actor] = NYql::NDq::CreateYtLookupActor( + ytServices, + edge, + alloc, + *functionRegistry, + std::move(source), + keyTypeBuilder.Build(), + payloadTypeBuilder.Build(), + typeEnv, + holderFactory, + 1'000'000); + runtime.Register(actor); + + NKikimr::NMiniKQL::TUnboxedValueVector keys {\ + CreateStructValue(holderFactory, {"host1", "vpc1"}), + CreateStructValue(holderFactory, {"host2", "vpc1"}), + CreateStructValue(holderFactory, {"host2", "vpc2"}), //NOT_FOUND expected + CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}), + }; + + guard.Release(); //let actors use alloc + + auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys)); + runtime.Register(callLookupActor); + + auto ev = runtime.GrabEdgeEventRethrow(edge); + auto guard2 = Guard(*alloc.get()); + NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data); + UNIT_ASSERT_EQUAL(4, lookupResult.size()); + { + auto& [k, v] = lookupResult[0]; + UNIT_ASSERT(CheckStructValue(k, {"host1", "vpc1"})); + UNIT_ASSERT(CheckStructValue(v, {"host1.vpc1.net", "192.168.1.1"})); + } + { + auto& [k, v] = lookupResult[1]; + UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc1"})); + UNIT_ASSERT(CheckStructValue(v, {"host2.vpc1.net", "192.168.1.2"})); + } + { + auto& [k, v] = lookupResult[2]; + UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc2"})); + UNIT_ASSERT(!v); + } + { + auto& [k, v] = lookupResult[3]; + UNIT_ASSERT(CheckStructValue(k, {"very very long hostname to for test 2", "vpc2"})); + UNIT_ASSERT(CheckStructValue(v, {"very very long fqdn for test 2", "192.168.100.2"})); + } + +} + +} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor) \ No newline at end of file diff --git a/ydb/library/yql/providers/yt/actors/ya.make b/ydb/library/yql/providers/yt/actors/ya.make new file mode 100644 index 000000000000..696a761a023c --- /dev/null +++ b/ydb/library/yql/providers/yt/actors/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + yql_yt_lookup_actor.cpp +) + +PEERDIR( + ydb/library/yql/minikql/computation + ydb/library/yql/providers/yt/proto + ydb/library/yql/dq/actors/compute + ydb/library/yql/minikql/computation + ydb/library/yql/public/types +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS(ut) \ No newline at end of file diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp new file mode 100644 index 000000000000..b788f402fb07 --- /dev/null +++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp @@ -0,0 +1,239 @@ +#include "yql_yt_lookup_actor.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace NYql::NDq { + +using namespace NActors; + +namespace { + + +enum class EColumnType { + None, + Key, + Payload +}; + +using TIndexedColumns = std::vector>; + + +//Note: TStringBuf is used, that refers to arg's memory. Be careful with object's lifetimes +THashMap MemberToIndex(const NKikimr::NMiniKQL::TStructType* s) { + THashMap result; + for (ui32 i = 0; i != s->GetMembersCount(); ++i) { + result[s->GetMemberName(i)] = i; + } + return result; +} + +} // namespace + +class TYtLookupActor + : public NYql::NDq::IDqAsyncLookupSource, + public NActors::TActorBootstrapped +{ + using TBase = NActors::TActorBootstrapped; +public: + TYtLookupActor( + NFile::TYtFileServices::TPtr ytServices, + NActors::TActorId parentId, + std::shared_ptr alloc, + const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + NYql::NYt::NSource::TLookupSource&& lookupSource, + const NKikimr::NMiniKQL::TStructType* keyType, + const NKikimr::NMiniKQL::TStructType* payloadType, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const size_t maxKeysInRequest) + : YtServices(ytServices) + , ParentId(std::move(parentId)) + , Alloc(alloc) + , FunctionRegistry(functionRegistry) + , LookupSource(std::move(lookupSource)) + , KeyType(keyType) + , PayloadType(payloadType) + , HolderFactory(holderFactory) + , TypeEnv(typeEnv) + , MaxKeysInRequest(maxKeysInRequest) + , KeyTypeHelper(keyType) + , Data(10, + KeyTypeHelper.GetValueHash(), + KeyTypeHelper.GetValueEqual() + ) + { + } + ~TYtLookupActor() { + auto guard = Guard(*Alloc); + KeyTypeHelper = TKeyTypeHelper{}; + Data = TTableData(0, KeyTypeHelper.GetValueHash(), KeyTypeHelper.GetValueEqual()); + } + + + void Bootstrap() { + YQL_CLOG(INFO, ProviderYt) << "New Yt proivider lookup source actor(ActorId=" << SelfId() << ") for" + << " cluster=" << LookupSource.GetCluster() + << ", table=" << LookupSource.GetTable(); + auto path = YtServices->GetTablePath(LookupSource.cluster(), LookupSource.table(), false); + + auto guard = Guard(*Alloc); + NCommon::TCodecContext codecCtx(TypeEnv, FunctionRegistry, &HolderFactory); + const auto tableCodecSpec = LookupSource.GetRowSpec(); + + const auto meta = TString("{\"") + YqlIOSpecTables + "\" = [" + tableCodecSpec + "]}"; + + TMkqlIOSpecs specs; + specs.Init(codecCtx, meta, {path}, TMaybe>{}); + TVector> files{{path, NYql::NFile::TColumnsInfo{}}}; + THolder input = MakeHolder(specs, HolderFactory, NYql::NFile::MakeTextYsonInputs(files), 0u, 1_MB); + + auto keyColumns = MemberToIndex(KeyType); + auto payloadColumns = MemberToIndex(PayloadType); + + std::vector> columnDestinations(specs.Inputs[0]->Fields.size()); + for (const auto& [k, f]: specs.Inputs[0]->Fields) { + if (const auto p = keyColumns.FindPtr(k)) { + columnDestinations[f.StructIndex] = {EColumnType::Key, *p}; + } else if (const auto p = payloadColumns.FindPtr(k)) { + columnDestinations[f.StructIndex] = {EColumnType::Payload, *p}; + } else { + columnDestinations[f.StructIndex] = {EColumnType::None, -1}; + } + } + NUdf::TUnboxedValue v; + //read all table data + for(;input->IsValid(); input->Next()) { + NUdf::TUnboxedValue inputValue = input->GetCurrent(); + NUdf::TUnboxedValue* keyItems; + NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(KeyType->GetMembersCount(), keyItems); + NUdf::TUnboxedValue* payloadItems; + NUdf::TUnboxedValue payload = HolderFactory.CreateDirectArrayHolder(PayloadType->GetMembersCount(), payloadItems); + for (size_t i = 0; i != columnDestinations.size(); ++i) { + switch(columnDestinations[i].first) { + case EColumnType::Key: + keyItems[columnDestinations[i].second] = inputValue.GetElement(i); + break; + case EColumnType::Payload: + payloadItems[columnDestinations[i].second] = inputValue.GetElement(i); + break; + case EColumnType::None: + break; + } + } + Data.emplace(std::move(key), std::move(payload)); + + } + Become(&TYtLookupActor::StateFunc); + } + + static constexpr char ActorName[] = "YT_PROVIDER_LOOKUP_ACTOR"; + +private: //IDqAsyncLookupSource + size_t GetMaxSupportedKeysInRequest() const override { + return MaxKeysInRequest; + } + void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override { + YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << keys.size() << " keys"; + Y_ABORT_IF(InProgress); + Y_ABORT_IF(keys.size() > MaxKeysInRequest); + InProgress = true; + auto guard = Guard(*Alloc); + NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult; + lookupResult.reserve(keys.size()); + for (const auto& k: keys) { + const auto it = Data.find(k); + lookupResult.emplace_back(k, it != Data.end() ? it->second : NUdf::TUnboxedValue{}); + } + auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(lookupResult)); + TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); + InProgress = false; + } + +private: //events + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvPoison, Handle); + ) + + void Handle(NActors::TEvents::TEvPoison::TPtr) { + PassAway(); + } + +private: + enum class EColumnDestination { + Key, + Payload + }; + +private: + NFile::TYtFileServices::TPtr YtServices; + const NActors::TActorId ParentId; + std::shared_ptr Alloc; + const NKikimr::NMiniKQL::IFunctionRegistry& FunctionRegistry; + NYql::NYt::NSource::TLookupSource LookupSource; + const NKikimr::NMiniKQL::TStructType* const KeyType; + const NKikimr::NMiniKQL::TStructType* const PayloadType; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; + const size_t MaxKeysInRequest; + std::atomic_bool InProgress; + using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper; + TKeyTypeHelper KeyTypeHelper; + using TTableData = std::unordered_map< + NUdf::TUnboxedValue, + NUdf::TUnboxedValue, + NKikimr::NMiniKQL::TValueHasher, + NKikimr::NMiniKQL::TValueEqual, + NKikimr::NMiniKQL::TMKQLAllocator> + >; + TTableData Data; +}; + +std::pair CreateYtLookupActor( + NFile::TYtFileServices::TPtr ytServices, + NActors::TActorId parentId, + std::shared_ptr alloc, + const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, + NYql::NYt::NSource::TLookupSource&& lookupSource, + const NKikimr::NMiniKQL::TStructType* keyType, + const NKikimr::NMiniKQL::TStructType* payloadType, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const size_t maxKeysInRequest) +{ + const auto actor = new TYtLookupActor( + ytServices, + parentId, + alloc, + functionRegistry, + typeEnv, + std::move(lookupSource), + keyType, + payloadType, + holderFactory, + maxKeysInRequest); + return {actor, actor}; +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h new file mode 100644 index 000000000000..0135111faf1f --- /dev/null +++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include +#include +//#include +#include + +namespace NYql::NDq { + +std::pair CreateYtLookupActor( + NFile::TYtFileServices::TPtr ytServices, + NActors::TActorId parentId, + std::shared_ptr alloc, + const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, + NYql::NYt::NSource::TLookupSource&& lookupSource, + const NKikimr::NMiniKQL::TStructType* keyType, + const NKikimr::NMiniKQL::TStructType* payloadType, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const size_t maxKeysInRequest +); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/yt/gateway/file/ya.make b/ydb/library/yql/providers/yt/gateway/file/ya.make index bfa1989f36b3..a557128c994c 100644 --- a/ydb/library/yql/providers/yt/gateway/file/ya.make +++ b/ydb/library/yql/providers/yt/gateway/file/ya.make @@ -5,6 +5,7 @@ SRCS( yql_yt_file_mkql_compiler.cpp yql_yt_file_services.cpp yql_yt_file.cpp + yql_yt_file_text_yson.cpp ) PEERDIR( diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp index 6ced7a838286..84f527b83ea0 100644 --- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp @@ -1,5 +1,6 @@ #include "yql_yt_file_comp_nodes.h" #include "yql_yt_file.h" +#include "yql_yt_file_text_yson.h" #include #include @@ -54,74 +55,6 @@ using namespace NKikimr::NMiniKQL; namespace { -struct TColumnsInfo { - TMaybe Columns; - TMaybe RenameColumns; -}; - -class TTextYsonInput: public NYT::TRawTableReader { -public: - TTextYsonInput(const TString& file, const TColumnsInfo& columnsInfo) { - TIFStream in(file); - - TBinaryYsonWriter writer(&Input_, ::NYson::EYsonType::ListFragment); - writer.OnBeginAttributes(); - writer.OnKeyedItem("row_index"); - writer.OnInt64Scalar(0); - writer.OnEndAttributes(); - writer.OnEntity(); - writer.OnListItem(); - NYT::NYson::IYsonConsumer* consumer = &writer; - THolder filter; - if (columnsInfo.Columns || columnsInfo.RenameColumns) { - TMaybe> columns; - TMaybe> renames; - if (columnsInfo.Columns) { - columns.ConstructInPlace(columnsInfo.Columns->Parts_.begin(), columnsInfo.Columns->Parts_.end()); - } - if (columnsInfo.RenameColumns) { - renames.ConstructInPlace(columnsInfo.RenameColumns->begin(), columnsInfo.RenameColumns->end()); - } - - filter.Reset(new TColumnFilteringConsumer(consumer, columns, renames)); - consumer = filter.Get(); - } - NYson::TYsonParser parser(consumer, &in, ::NYson::EYsonType::ListFragment); - parser.Parse(); - } - - bool Retry(const TMaybe& /* rangeIndex */, const TMaybe& /* rowIndex */, const std::exception_ptr& /* error */) override { - return false; - } - - void ResetRetries() override { - } - - bool HasRangeIndices() const override { - return false; - } - -protected: - size_t DoRead(void* buf, size_t len) override { - return Input_.Read(buf, len); - } - -private: - TBufferStream Input_; -}; - -TVector MakeTextYsonInputs(const TVector>& files) { - TVector rawReaders; - for (auto& file: files) { - if (!NFs::Exists(file.first)) { - rawReaders.emplace_back(nullptr); - continue; - } - rawReaders.emplace_back(MakeIntrusive(file.first, file.second)); - } - return rawReaders; -} - /////////////////////////////////////////////////////////////////////////////////////////////////////// class TFileInputStateWithTableState: public TFileInputState { diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp new file mode 100644 index 000000000000..f481ab71c5c4 --- /dev/null +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp @@ -0,0 +1,74 @@ +#include "yql_yt_file_text_yson.h" +#include +#include +#include +#include +#include +#include + +namespace NYql::NFile { + +class TTextYsonInput: public NYT::TRawTableReader { +public: + TTextYsonInput(const TString& file, const TColumnsInfo& columnsInfo) { + TIFStream in(file); + + TBinaryYsonWriter writer(&Input_, ::NYson::EYsonType::ListFragment); + writer.OnBeginAttributes(); + writer.OnKeyedItem("row_index"); + writer.OnInt64Scalar(0); + writer.OnEndAttributes(); + writer.OnEntity(); + writer.OnListItem(); + NYT::NYson::IYsonConsumer* consumer = &writer; + THolder filter; + if (columnsInfo.Columns || columnsInfo.RenameColumns) { + TMaybe> columns; + TMaybe> renames; + if (columnsInfo.Columns) { + columns.ConstructInPlace(columnsInfo.Columns->Parts_.begin(), columnsInfo.Columns->Parts_.end()); + } + if (columnsInfo.RenameColumns) { + renames.ConstructInPlace(columnsInfo.RenameColumns->begin(), columnsInfo.RenameColumns->end()); + } + + filter.Reset(new TColumnFilteringConsumer(consumer, columns, renames)); + consumer = filter.Get(); + } + NYson::TYsonParser parser(consumer, &in, ::NYson::EYsonType::ListFragment); + parser.Parse(); + } + + bool Retry(const TMaybe& /* rangeIndex */, const TMaybe& /* rowIndex */, const std::exception_ptr& /* error */) override { + return false; + } + + void ResetRetries() override { + } + + bool HasRangeIndices() const override { + return false; + } + +protected: + size_t DoRead(void* buf, size_t len) override { + return Input_.Read(buf, len); + } + +private: + TBufferStream Input_; +}; + +TVector MakeTextYsonInputs(const TVector>& files) { + TVector rawReaders; + for (auto& file: files) { + if (!NFs::Exists(file.first)) { + rawReaders.emplace_back(nullptr); + continue; + } + rawReaders.emplace_back(MakeIntrusive(file.first, file.second)); + } + return rawReaders; +} + +} //namespace NYql::NFile \ No newline at end of file diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h new file mode 100644 index 000000000000..85b903c84b08 --- /dev/null +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include + +namespace NYql::NFile { + +struct TColumnsInfo { + TMaybe Columns; + TMaybe RenameColumns; +}; + +TVector MakeTextYsonInputs(const TVector>& files); + +}//namespace NYql::NFile \ No newline at end of file diff --git a/ydb/library/yql/providers/yt/proto/source.proto b/ydb/library/yql/providers/yt/proto/source.proto new file mode 100644 index 000000000000..dedd5df130a4 --- /dev/null +++ b/ydb/library/yql/providers/yt/proto/source.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package NYql.NYt.NSource; + +message TLookupSource { + string Cluster = 1; + string Table = 2; + string RowSpec = 3; +} \ No newline at end of file diff --git a/ydb/library/yql/providers/yt/proto/ya.make b/ydb/library/yql/providers/yt/proto/ya.make new file mode 100644 index 000000000000..6ef74a07fcbd --- /dev/null +++ b/ydb/library/yql/providers/yt/proto/ya.make @@ -0,0 +1,11 @@ +PROTO_LIBRARY() + +ONLY_TAGS(CPP_PROTO) + + +SRCS( + source.proto +) + +END() + diff --git a/ydb/library/yql/providers/yt/ya.make b/ydb/library/yql/providers/yt/ya.make index 1a788e638716..0097c6cbfce4 100644 --- a/ydb/library/yql/providers/yt/ya.make +++ b/ydb/library/yql/providers/yt/ya.make @@ -1,4 +1,5 @@ RECURSE( + actors codec common comp_nodes @@ -8,5 +9,6 @@ RECURSE( lib mkql_dq opt + proto provider )