From b33909680df2d7545c917ffb326d6d8e03035a69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D0=BE=D0=B6=D0=B8=D0=BB=D0=BE=D0=B2?= Date: Fri, 24 May 2024 15:20:48 +0300 Subject: [PATCH] Add type white-list for supported types on blockreader (#4268) --- .../providers/yt/common/yql_configuration.h | 14 +++ .../providers/yt/common/yql_yt_settings.cpp | 12 ++ .../yql/providers/yt/common/yql_yt_settings.h | 2 + .../yt/provider/yql_yt_dq_integration.cpp | 113 +++++++++++++++++- 4 files changed, 140 insertions(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/yt/common/yql_configuration.h b/ydb/library/yql/providers/yt/common/yql_configuration.h index c31fa10074e8..86e8a0dae5ea 100644 --- a/ydb/library/yql/providers/yt/common/yql_configuration.h +++ b/ydb/library/yql/providers/yt/common/yql_configuration.h @@ -1,8 +1,11 @@ #pragma once +#include + #include #include #include +#include namespace NYql { @@ -55,6 +58,17 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false; constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false; constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1; constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120); +const TSet DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg", "tuple"}; +const TSet DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES = + { + NUdf::EDataSlot::Int8, NUdf::EDataSlot::Uint8, + NUdf::EDataSlot::Int16, NUdf::EDataSlot::Uint16, + NUdf::EDataSlot::Int32, NUdf::EDataSlot::Uint32, + NUdf::EDataSlot::Int64, NUdf::EDataSlot::Uint64, + NUdf::EDataSlot::Bool, NUdf::EDataSlot::Double, + NUdf::EDataSlot::String, NUdf::EDataSlot::Json, + NUdf::EDataSlot::Yson, NUdf::EDataSlot::Utf8 + }; constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB; diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp index d7cf522a33f8..47ba43a6f559 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp @@ -2,6 +2,7 @@ #include #include +#include #include @@ -460,6 +461,17 @@ TYtConfiguration::TYtConfiguration() REGISTER_SETTING(*this, UseRPCReaderInDQ); REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1); REGISTER_SETTING(*this, DQRPCReaderTimeout); + REGISTER_SETTING(*this, BlockReaderSupportedTypes); + REGISTER_SETTING(*this, BlockReaderSupportedDataTypes) + .Parser([](const TString& v) { + TSet vec; + StringSplitter(v).SplitBySet(",").AddTo(&vec); + TSet res; + for (auto& s: vec) { + res.emplace(NUdf::GetDataSlot(s)); + } + return res; + }); REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0); REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0); REGISTER_SETTING(*this, ApplyStoredConstraints) diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index 5e8f5f2f8a8e..0ecda360bcec 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -196,6 +196,8 @@ struct TYtSettings { NCommon::TConfSetting UseRPCReaderInDQ; NCommon::TConfSetting DQRPCReaderInflight; NCommon::TConfSetting DQRPCReaderTimeout; + NCommon::TConfSetting, true> BlockReaderSupportedTypes; + NCommon::TConfSetting, true> BlockReaderSupportedDataTypes; // Optimizers NCommon::TConfSetting _EnableDq; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 87cde0e90a34..1a4ff22d8834 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -39,6 +39,108 @@ static const THashSet POOL_TREES_WHITELIST = {"physical", "cloud", using namespace NNodes; +namespace { + void BlockReaderAddInfo(TExprContext& ctx, const TPosition& pos, const TString& msg) { + ctx.IssueManager.RaiseIssue(YqlIssue(pos, EYqlIssueCode::TIssuesIds_EIssueCode_INFO, "Can't use block reader: " + msg)); + } + + bool CheckBlockReaderSupportedTypes(const TSet& list, const TSet& dataTypesSupported, const TStructExprType* types, TExprContext& ctx, const TPosition& pos) { + TSet supported; + for (const auto &e: list) { + if (e == "pg") { + supported.insert(ETypeAnnotationKind::Pg); + } else if (e == "tuple") { + supported.emplace(ETypeAnnotationKind::Tuple); + } else if (e == "struct") { + supported.emplace(ETypeAnnotationKind::Struct); + } else if (e == "dict") { + supported.emplace(ETypeAnnotationKind::Dict); + } else if (e == "list") { + supported.emplace(ETypeAnnotationKind::List); + } else if (e == "variant") { + supported.emplace(ETypeAnnotationKind::Variant); + } else { + // Unknown type + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unknown type: " << e); + return false; + } + } + if (dataTypesSupported.size()) { + supported.emplace(ETypeAnnotationKind::Data); + } + auto checkType = [&] (const TTypeAnnotationNode* type) { + if (type->GetKind() == ETypeAnnotationKind::Data) { + if (!supported.contains(ETypeAnnotationKind::Data)) { + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported data types"); + return false; + } + if (!dataTypesSupported.contains(type->Cast()->GetSlot())) { + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported data type: " << type->Cast()->GetSlot()); + return false; + } + } else if (type->GetKind() == ETypeAnnotationKind::Pg) { + if (!supported.contains(ETypeAnnotationKind::Pg)) { + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported pg"); + return false; + } + auto name = type->Cast()->GetName(); + if (name == "float4" && !dataTypesSupported.contains(NUdf::EDataSlot::Float)) { + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "PgFloat4 unsupported yet since float is no supported"); + return false; + } + } else { + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported annotation kind: " << type->GetKind()); + return false; + } + return true; + }; + + TVector stack; + + for (auto sub: types->GetItems()) { + auto subT = sub->GetItemType(); + stack.push_back(subT); + } + while (!stack.empty()) { + auto el = stack.back(); + stack.pop_back(); + if (el->GetKind() == ETypeAnnotationKind::Optional) { + stack.push_back(el->Cast()->GetItemType()); + continue; + } + if (!supported.contains(el->GetKind())) { + BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported " << el->GetKind()); + return false; + } + if (el->GetKind() == ETypeAnnotationKind::Tuple) { + for (auto e: el->Cast()->GetItems()) { + stack.push_back(e); + } + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Struct) { + for (auto e: el->Cast()->GetItems()) { + stack.push_back(e->GetItemType()); + } + continue; + } else if (el->GetKind() == ETypeAnnotationKind::List) { + stack.push_back(el->Cast()->GetItemType()); + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Dict) { + stack.push_back(el->Cast()->GetKeyType()); + stack.push_back(el->Cast()->GetPayloadType()); + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Variant) { + stack.push_back(el->Cast()->GetUnderlyingType()); + continue; + } + if (!checkType(el)) { + return false; + } + } + return true; + } +}; + class TYtDqIntegration: public TDqIntegrationBase { public: TYtDqIntegration(TYtState* state) @@ -375,18 +477,26 @@ class TYtDqIntegration: public TDqIntegrationBase { if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) { return false; } - + + auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES); + auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES); const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast()->GetItems().back())->Cast(); + if (!CheckBlockReaderSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx, ctx.GetPosition(node.Pos()))) { + return false; + } + TVector subTypeAnn(Reserve(structType->GetItems().size())); for (const auto& type: structType->GetItems()) { subTypeAnn.emplace_back(type->GetItemType()); } if (!State_->Types->ArrowResolver) { + BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "no arrow resolver provided"); return false; } if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { + BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "arrow resolver don't support these types"); return false; } @@ -394,6 +504,7 @@ class TYtDqIntegration: public TDqIntegrationBase { for (size_t i = 0; i < sectionList.Size(); ++i) { auto section = sectionList.Item(i); if (!NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::SysColumns).empty()) { + BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "system column"); return false; } }