Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st committed May 7, 2024
1 parent cfc4f0c commit 1ff9882
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 46 deletions.
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/yt/common/yql_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false;
constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false;
constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1;
constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120);
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg","int8", "uint8", "int16", "uint16", "int32", "uint32", "int64", "uint64", "string", "yson", "json", "bool", "double", "tuple"};
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg", "tuple"};
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES = {"Int8", "Uint8", "Int16", "Uint16", "Int32", "Uint32", "Int64", "Uint64", "String", "Yson", "Json", "Bool", "Double"};

constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB;

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ TYtConfiguration::TYtConfiguration()
REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1);
REGISTER_SETTING(*this, DQRPCReaderTimeout);
REGISTER_SETTING(*this, BlockReaderSupportedTypes);
REGISTER_SETTING(*this, BlockReaderSupportedDataTypes);
REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, ApplyStoredConstraints)
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ struct TYtSettings {
NCommon::TConfSetting<size_t, true> DQRPCReaderInflight;
NCommon::TConfSetting<TDuration, true> DQRPCReaderTimeout;
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedDataTypes;

// Optimizers
NCommon::TConfSetting<bool, true> _EnableDq;
Expand Down
107 changes: 62 additions & 45 deletions ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,59 +46,34 @@ namespace {
ctx.IssueManager.RaiseIssue(info);
}

bool CheckSupportedTypes(const TSet<TString>& list, const TStructExprType* types, TExprContext& ctx) {
bool CheckSupportedTypes(const TSet<TString>& list, const TSet<TString>& dataTList, const TStructExprType* types, TExprContext& ctx) {
TSet<ETypeAnnotationKind> supported;
TSet<NUdf::EDataSlot> dataTypesSupported;
for (const auto &e: list) {
if (e == "pg") {
supported.insert(ETypeAnnotationKind::Pg);
} else if (e == "int8") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int8);
} else if (e == "uint8") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint8);
} else if (e == "int16") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int16);
} else if (e == "uint16") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint16);
} else if (e == "int32") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int32);
} else if (e == "uint32") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint32);
} else if (e == "int64") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int64);
} else if (e == "uint64") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint64);
} else if (e == "double") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Double);
} else if (e == "bool") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Bool);
} else if (e == "string") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::String);
} else if (e == "yson") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Yson);
} else if (e == "json") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Json);
} else if (e == "tuple") {
supported.emplace(ETypeAnnotationKind::Tuple);
} else if (e == "struct") {
supported.emplace(ETypeAnnotationKind::Struct);
} else if (e == "dict") {
supported.emplace(ETypeAnnotationKind::Dict);
} else if (e == "list") {
supported.emplace(ETypeAnnotationKind::List);
} else if (e == "variant") {
supported.emplace(ETypeAnnotationKind::Variant);
} else {
// Unknown type
AddInfo(ctx, TStringBuilder() << "unknown type: " << e);
return false;
}
}
if (dataTList.size()) {
supported.emplace(ETypeAnnotationKind::Data);
}
for (const auto &e: dataTList) {
dataTypesSupported.emplace(NUdf::GetDataSlot(e));
}
auto checkType = [&] (const TTypeAnnotationNode* type) {
if (type->GetKind() == ETypeAnnotationKind::Data) {
if (!supported.contains(ETypeAnnotationKind::Data)) {
Expand Down Expand Up @@ -126,16 +101,22 @@ namespace {
while (subT->GetKind() == ETypeAnnotationKind::Optional) {
subT = subT->Cast<TOptionalExprType>()->GetItemType();
}
if (subT->GetKind() == ETypeAnnotationKind::Tuple) {
if (!supported.contains(ETypeAnnotationKind::Tuple)) {
AddInfo(ctx, TStringBuilder() << "unsupported tuples");
if (subT->GetKind() == ETypeAnnotationKind::Tuple || subT->GetKind() == ETypeAnnotationKind::Struct
|| subT->GetKind() == ETypeAnnotationKind::Dict || subT->GetKind() == ETypeAnnotationKind::Variant
|| subT->GetKind() == ETypeAnnotationKind::List) {
if (!supported.contains(subT->GetKind())) {
AddInfo(ctx, TStringBuilder() << "unsupported " << subT->GetKind());
return false;
}
TVector<const TTypeAnnotationNode*> stack;
stack.push_back(subT);
while (!stack.empty()) {
auto el = stack.back();
stack.pop_back();
if (!supported.contains(el->GetKind())) {
AddInfo(ctx, TStringBuilder() << "unsupported " << el->GetKind());
return false;
}
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
while (e->GetKind() == ETypeAnnotationKind::Optional) {
Expand All @@ -144,6 +125,42 @@ namespace {
stack.push_back(e);
}
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Struct) {
for (auto e: el->Cast<TStructExprType>()->GetItems()) {
const TTypeAnnotationNode* c = e->GetItemType();
while (c->GetKind() == ETypeAnnotationKind::Optional) {
c = c->Cast<TOptionalExprType>()->GetItemType();
}
stack.push_back(c);
}
continue;

} else if (el->GetKind() == ETypeAnnotationKind::List) {
const TTypeAnnotationNode* c = el->Cast<TListExprType>()->GetItemType();
while (c->GetKind() == ETypeAnnotationKind::Optional) {
c = c->Cast<TOptionalExprType>()->GetItemType();
}
stack.push_back(c);
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Dict) {
const TTypeAnnotationNode* c = el->Cast<TDictExprType>()->GetKeyType();
while (c->GetKind() == ETypeAnnotationKind::Optional) {
c = c->Cast<TOptionalExprType>()->GetItemType();
}
stack.push_back(c);
c = el->Cast<TDictExprType>()->GetPayloadType();
while (c->GetKind() == ETypeAnnotationKind::Optional) {
c = c->Cast<TOptionalExprType>()->GetItemType();
}
stack.push_back(c);
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Variant) {
const TTypeAnnotationNode* c = el->Cast<TVariantExprType>()->GetUnderlyingType();
while (c->GetKind() == ETypeAnnotationKind::Optional) {
c = c->Cast<TOptionalExprType>()->GetItemType();
}
stack.push_back(c);
continue;
}
if (!checkType(el)) {
return false;
Expand Down Expand Up @@ -495,9 +512,9 @@ class TYtDqIntegration: public TDqIntegrationBase {
}

auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);

auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES);
const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
if (!CheckSupportedTypes(supportedTypes, structType, ctx)) {
if (!CheckSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx)) {
return false;
}
TVector<const TTypeAnnotationNode*> subTypeAnn(Reserve(structType->GetItems().size()));
Expand Down

0 comments on commit 1ff9882

Please sign in to comment.