Skip to content

Commit

Permalink
Merge cfc4f0c into 04cdfda
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st authored May 3, 2024
2 parents 04cdfda + cfc4f0c commit 7051cbc
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <util/system/types.h>
#include <util/datetime/base.h>
#include <util/generic/size_literals.h>
#include <util/generic/set.h>

namespace NYql {

Expand Down Expand Up @@ -55,6 +56,7 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false;
constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false;
constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1;
constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120);
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg","int8", "uint8", "int16", "uint16", "int32", "uint32", "int64", "uint64", "string", "yson", "json", "bool", "double", "tuple"};

constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB;

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ TYtConfiguration::TYtConfiguration()
REGISTER_SETTING(*this, UseRPCReaderInDQ);
REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1);
REGISTER_SETTING(*this, DQRPCReaderTimeout);
REGISTER_SETTING(*this, BlockReaderSupportedTypes);
REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, ApplyStoredConstraints)
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ struct TYtSettings {
NCommon::TConfSetting<bool, true> UseRPCReaderInDQ;
NCommon::TConfSetting<size_t, true> DQRPCReaderInflight;
NCommon::TConfSetting<TDuration, true> DQRPCReaderTimeout;
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;

// Optimizers
NCommon::TConfSetting<bool, true> _EnableDq;
Expand Down
123 changes: 123 additions & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,124 @@ static const THashSet<TStringBuf> POOL_TREES_WHITELIST = {"physical", "cloud",

using namespace NNodes;

namespace {
void AddInfo(TExprContext& ctx, const TString& msg) {
TIssue info("Can't use block reader: " + msg);
info.Severity = TSeverityIds::S_INFO;
ctx.IssueManager.RaiseIssue(info);
}

bool CheckSupportedTypes(const TSet<TString>& list, const TStructExprType* types, TExprContext& ctx) {
TSet<ETypeAnnotationKind> supported;
TSet<NUdf::EDataSlot> dataTypesSupported;
for (const auto &e: list) {
if (e == "pg") {
supported.insert(ETypeAnnotationKind::Pg);
} else if (e == "int8") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int8);
} else if (e == "uint8") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint8);
} else if (e == "int16") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int16);
} else if (e == "uint16") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint16);
} else if (e == "int32") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int32);
} else if (e == "uint32") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint32);
} else if (e == "int64") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Int64);
} else if (e == "uint64") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Uint64);
} else if (e == "double") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Double);
} else if (e == "bool") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Bool);
} else if (e == "string") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::String);
} else if (e == "yson") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Yson);
} else if (e == "json") {
supported.emplace(ETypeAnnotationKind::Data);
dataTypesSupported.emplace(NUdf::EDataSlot::Json);
} else if (e == "tuple") {
supported.emplace(ETypeAnnotationKind::Tuple);
} else {
// Unknown type
AddInfo(ctx, TStringBuilder() << "unknown type: " << e);
return false;
}
}
auto checkType = [&] (const TTypeAnnotationNode* type) {
if (type->GetKind() == ETypeAnnotationKind::Data) {
if (!supported.contains(ETypeAnnotationKind::Data)) {
AddInfo(ctx, TStringBuilder() << "unsupported data types");
return false;
}
if (!dataTypesSupported.contains(type->Cast<TDataExprType>()->GetSlot())) {
AddInfo(ctx, TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
return false;
}
} else if (type->GetKind() == ETypeAnnotationKind::Pg) {
if (!supported.contains(ETypeAnnotationKind::Pg)) {
AddInfo(ctx, TStringBuilder() << "unsupported pg");
return false;
}
} else {
AddInfo(ctx, TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
return false;
}
return true;
};

for (auto sub: types->GetItems()) {
auto subT = sub->GetItemType();
while (subT->GetKind() == ETypeAnnotationKind::Optional) {
subT = subT->Cast<TOptionalExprType>()->GetItemType();
}
if (subT->GetKind() == ETypeAnnotationKind::Tuple) {
if (!supported.contains(ETypeAnnotationKind::Tuple)) {
AddInfo(ctx, TStringBuilder() << "unsupported tuples");
return false;
}
TVector<const TTypeAnnotationNode*> stack;
stack.push_back(subT);
while (!stack.empty()) {
auto el = stack.back();
stack.pop_back();
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
while (e->GetKind() == ETypeAnnotationKind::Optional) {
e = e->Cast<TOptionalExprType>()->GetItemType();
}
stack.push_back(e);
}
continue;
}
if (!checkType(el)) {
return false;
}
}
} else if (!checkType(subT)) {
return false;
}
}
return true;
}
};

class TYtDqIntegration: public TDqIntegrationBase {
public:
TYtDqIntegration(TYtState* state)
Expand Down Expand Up @@ -375,8 +493,13 @@ class TYtDqIntegration: public TDqIntegrationBase {
if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
return false;
}

auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);

const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
if (!CheckSupportedTypes(supportedTypes, structType, ctx)) {
return false;
}
TVector<const TTypeAnnotationNode*> subTypeAnn(Reserve(structType->GetItems().size()));
for (const auto& type: structType->GetItems()) {
subTypeAnn.emplace_back(type->GetItemType());
Expand Down

0 comments on commit 7051cbc

Please sign in to comment.