diff --git a/ydb/library/yql/providers/yt/common/yql_configuration.h b/ydb/library/yql/providers/yt/common/yql_configuration.h index c31fa10074e8..6dbb1733a1f5 100644 --- a/ydb/library/yql/providers/yt/common/yql_configuration.h +++ b/ydb/library/yql/providers/yt/common/yql_configuration.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace NYql { @@ -55,6 +56,7 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false; constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false; constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1; constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120); +const TSet DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg","int8", "uint8", "int16", "uint16", "int32", "uint32", "int64", "uint64", "string", "yson", "json", "bool", "double", "tuple"}; constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB; diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp index d7cf522a33f8..3932bbff587f 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp @@ -460,6 +460,7 @@ TYtConfiguration::TYtConfiguration() REGISTER_SETTING(*this, UseRPCReaderInDQ); REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1); REGISTER_SETTING(*this, DQRPCReaderTimeout); + REGISTER_SETTING(*this, BlockReaderSupportedTypes); REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0); REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0); REGISTER_SETTING(*this, ApplyStoredConstraints) diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index 5e8f5f2f8a8e..fb6ef82030c1 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -196,6 +196,7 @@ struct TYtSettings { NCommon::TConfSetting UseRPCReaderInDQ; NCommon::TConfSetting DQRPCReaderInflight; NCommon::TConfSetting DQRPCReaderTimeout; + NCommon::TConfSetting, true> BlockReaderSupportedTypes; // Optimizers NCommon::TConfSetting _EnableDq; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 87cde0e90a34..f0a8af774dcb 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -39,6 +39,124 @@ static const THashSet POOL_TREES_WHITELIST = {"physical", "cloud", using namespace NNodes; +namespace { + void AddInfo(TExprContext& ctx, const TString& msg) { + TIssue info("Can't use block reader: " + msg); + info.Severity = TSeverityIds::S_INFO; + ctx.IssueManager.RaiseIssue(info); + } + + bool CheckSupportedTypes(const TSet& list, const TStructExprType* types, TExprContext& ctx) { + TSet supported; + TSet dataTypesSupported; + for (const auto &e: list) { + if (e == "pg") { + supported.insert(ETypeAnnotationKind::Pg); + } else if (e == "int8") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Int8); + } else if (e == "uint8") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Uint8); + } else if (e == "int16") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Int16); + } else if (e == "uint16") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Uint16); + } else if (e == "int32") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Int32); + } else if (e == "uint32") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Uint32); + } else if (e == "int64") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Int64); + } else if (e == "uint64") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Uint64); + } else if (e == "double") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Double); + } else if (e == "bool") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Bool); + } else if (e == "string") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::String); + } else if (e == "yson") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Yson); + } else if (e == "json") { + supported.emplace(ETypeAnnotationKind::Data); + dataTypesSupported.emplace(NUdf::EDataSlot::Json); + } else if (e == "tuple") { + supported.emplace(ETypeAnnotationKind::Tuple); + } else { + // Unknown type + AddInfo(ctx, TStringBuilder() << "unknown type: " << e); + return false; + } + } + auto checkType = [&] (const TTypeAnnotationNode* type) { + if (type->GetKind() == ETypeAnnotationKind::Data) { + if (!supported.contains(ETypeAnnotationKind::Data)) { + AddInfo(ctx, TStringBuilder() << "unsupported data types"); + return false; + } + if (!dataTypesSupported.contains(type->Cast()->GetSlot())) { + AddInfo(ctx, TStringBuilder() << "unsupported data type: " << type->Cast()->GetSlot()); + return false; + } + } else if (type->GetKind() == ETypeAnnotationKind::Pg) { + if (!supported.contains(ETypeAnnotationKind::Pg)) { + AddInfo(ctx, TStringBuilder() << "unsupported pg"); + return false; + } + } else { + AddInfo(ctx, TStringBuilder() << "unsupported annotation kind: " << type->GetKind()); + return false; + } + return true; + }; + + for (auto sub: types->GetItems()) { + auto subT = sub->GetItemType(); + while (subT->GetKind() == ETypeAnnotationKind::Optional) { + subT = subT->Cast()->GetItemType(); + } + if (subT->GetKind() == ETypeAnnotationKind::Tuple) { + if (!supported.contains(ETypeAnnotationKind::Tuple)) { + AddInfo(ctx, TStringBuilder() << "unsupported tuples"); + return false; + } + TVector stack; + stack.push_back(subT); + while (!stack.empty()) { + auto el = stack.back(); + stack.pop_back(); + if (el->GetKind() == ETypeAnnotationKind::Tuple) { + for (auto e: el->Cast()->GetItems()) { + while (e->GetKind() == ETypeAnnotationKind::Optional) { + e = e->Cast()->GetItemType(); + } + stack.push_back(e); + } + continue; + } + if (!checkType(el)) { + return false; + } + } + } else if (!checkType(subT)) { + return false; + } + } + return true; + } +}; + class TYtDqIntegration: public TDqIntegrationBase { public: TYtDqIntegration(TYtState* state) @@ -375,10 +493,16 @@ class TYtDqIntegration: public TDqIntegrationBase { if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) { return false; } + + auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES); const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast()->GetItems().back())->Cast(); + if (!CheckSupportedTypes(supportedTypes, structType, ctx)) { + return false; + } TVector subTypeAnn(Reserve(structType->GetItems().size())); for (const auto& type: structType->GetItems()) { + //if (type->GetKind() subTypeAnn.emplace_back(type->GetItemType()); }