From 1ff988218905e38d06396ab7d71b5a9dbf635f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D0=BE=D0=B6=D0=B8=D0=BB=D0=BE=D0=B2?= Date: Tue, 7 May 2024 12:03:29 +0300 Subject: [PATCH] fixes --- .../providers/yt/common/yql_configuration.h | 3 +- .../providers/yt/common/yql_yt_settings.cpp | 1 + .../yql/providers/yt/common/yql_yt_settings.h | 1 + .../yt/provider/yql_yt_dq_integration.cpp | 107 ++++++++++-------- 4 files changed, 66 insertions(+), 46 deletions(-) diff --git a/ydb/library/yql/providers/yt/common/yql_configuration.h b/ydb/library/yql/providers/yt/common/yql_configuration.h index 6dbb1733a1f5..1ac13148409b 100644 --- a/ydb/library/yql/providers/yt/common/yql_configuration.h +++ b/ydb/library/yql/providers/yt/common/yql_configuration.h @@ -56,7 +56,8 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false; constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false; constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1; constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120); -const TSet DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg","int8", "uint8", "int16", "uint16", "int32", "uint32", "int64", "uint64", "string", "yson", "json", "bool", "double", "tuple"}; +const TSet DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg", "tuple"}; +const TSet DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES = {"Int8", "Uint8", "Int16", "Uint16", "Int32", "Uint32", "Int64", "Uint64", "String", "Yson", "Json", "Bool", "Double"}; constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB; diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp index 3932bbff587f..a3ce0915840f 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp @@ -461,6 +461,7 @@ TYtConfiguration::TYtConfiguration() REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1); REGISTER_SETTING(*this, DQRPCReaderTimeout); REGISTER_SETTING(*this, BlockReaderSupportedTypes); + REGISTER_SETTING(*this, BlockReaderSupportedDataTypes); REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0); REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0); REGISTER_SETTING(*this, ApplyStoredConstraints) diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index fb6ef82030c1..cb0c7aac029b 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -197,6 +197,7 @@ struct TYtSettings { NCommon::TConfSetting DQRPCReaderInflight; NCommon::TConfSetting DQRPCReaderTimeout; NCommon::TConfSetting, true> BlockReaderSupportedTypes; + NCommon::TConfSetting, true> BlockReaderSupportedDataTypes; // Optimizers NCommon::TConfSetting _EnableDq; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 5117efabcfc3..8ba44ad97d8a 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -46,59 +46,34 @@ namespace { ctx.IssueManager.RaiseIssue(info); } - bool CheckSupportedTypes(const TSet& list, const TStructExprType* types, TExprContext& ctx) { + bool CheckSupportedTypes(const TSet& list, const TSet& dataTList, const TStructExprType* types, TExprContext& ctx) { TSet supported; TSet dataTypesSupported; for (const auto &e: list) { if (e == "pg") { supported.insert(ETypeAnnotationKind::Pg); - } else if (e == "int8") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Int8); - } else if (e == "uint8") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Uint8); - } else if (e == "int16") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Int16); - } else if (e == "uint16") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Uint16); - } else if (e == "int32") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Int32); - } else if (e == "uint32") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Uint32); - } else if (e == "int64") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Int64); - } else if (e == "uint64") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Uint64); - } else if (e == "double") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Double); - } else if (e == "bool") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Bool); - } else if (e == "string") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::String); - } else if (e == "yson") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Yson); - } else if (e == "json") { - supported.emplace(ETypeAnnotationKind::Data); - dataTypesSupported.emplace(NUdf::EDataSlot::Json); } else if (e == "tuple") { supported.emplace(ETypeAnnotationKind::Tuple); + } else if (e == "struct") { + supported.emplace(ETypeAnnotationKind::Struct); + } else if (e == "dict") { + supported.emplace(ETypeAnnotationKind::Dict); + } else if (e == "list") { + supported.emplace(ETypeAnnotationKind::List); + } else if (e == "variant") { + supported.emplace(ETypeAnnotationKind::Variant); } else { // Unknown type AddInfo(ctx, TStringBuilder() << "unknown type: " << e); return false; } } + if (dataTList.size()) { + supported.emplace(ETypeAnnotationKind::Data); + } + for (const auto &e: dataTList) { + dataTypesSupported.emplace(NUdf::GetDataSlot(e)); + } auto checkType = [&] (const TTypeAnnotationNode* type) { if (type->GetKind() == ETypeAnnotationKind::Data) { if (!supported.contains(ETypeAnnotationKind::Data)) { @@ -126,9 +101,11 @@ namespace { while (subT->GetKind() == ETypeAnnotationKind::Optional) { subT = subT->Cast()->GetItemType(); } - if (subT->GetKind() == ETypeAnnotationKind::Tuple) { - if (!supported.contains(ETypeAnnotationKind::Tuple)) { - AddInfo(ctx, TStringBuilder() << "unsupported tuples"); + if (subT->GetKind() == ETypeAnnotationKind::Tuple || subT->GetKind() == ETypeAnnotationKind::Struct + || subT->GetKind() == ETypeAnnotationKind::Dict || subT->GetKind() == ETypeAnnotationKind::Variant + || subT->GetKind() == ETypeAnnotationKind::List) { + if (!supported.contains(subT->GetKind())) { + AddInfo(ctx, TStringBuilder() << "unsupported " << subT->GetKind()); return false; } TVector stack; @@ -136,6 +113,10 @@ namespace { while (!stack.empty()) { auto el = stack.back(); stack.pop_back(); + if (!supported.contains(el->GetKind())) { + AddInfo(ctx, TStringBuilder() << "unsupported " << el->GetKind()); + return false; + } if (el->GetKind() == ETypeAnnotationKind::Tuple) { for (auto e: el->Cast()->GetItems()) { while (e->GetKind() == ETypeAnnotationKind::Optional) { @@ -144,6 +125,42 @@ namespace { stack.push_back(e); } continue; + } else if (el->GetKind() == ETypeAnnotationKind::Struct) { + for (auto e: el->Cast()->GetItems()) { + const TTypeAnnotationNode* c = e->GetItemType(); + while (c->GetKind() == ETypeAnnotationKind::Optional) { + c = c->Cast()->GetItemType(); + } + stack.push_back(c); + } + continue; + + } else if (el->GetKind() == ETypeAnnotationKind::List) { + const TTypeAnnotationNode* c = el->Cast()->GetItemType(); + while (c->GetKind() == ETypeAnnotationKind::Optional) { + c = c->Cast()->GetItemType(); + } + stack.push_back(c); + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Dict) { + const TTypeAnnotationNode* c = el->Cast()->GetKeyType(); + while (c->GetKind() == ETypeAnnotationKind::Optional) { + c = c->Cast()->GetItemType(); + } + stack.push_back(c); + c = el->Cast()->GetPayloadType(); + while (c->GetKind() == ETypeAnnotationKind::Optional) { + c = c->Cast()->GetItemType(); + } + stack.push_back(c); + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Variant) { + const TTypeAnnotationNode* c = el->Cast()->GetUnderlyingType(); + while (c->GetKind() == ETypeAnnotationKind::Optional) { + c = c->Cast()->GetItemType(); + } + stack.push_back(c); + continue; } if (!checkType(el)) { return false; @@ -495,9 +512,9 @@ class TYtDqIntegration: public TDqIntegrationBase { } auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES); - + auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES); const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast()->GetItems().back())->Cast(); - if (!CheckSupportedTypes(supportedTypes, structType, ctx)) { + if (!CheckSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx)) { return false; } TVector subTypeAnn(Reserve(structType->GetItems().size()));