Skip to content

Commit

Permalink
Add type white-list for supported types on blockreader (ydb-platform#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st committed May 24, 2024
1 parent 4137016 commit b339096
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 1 deletion.
14 changes: 14 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_configuration.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#pragma once

#include <ydb/library/yql/public/udf/udf_data_type.h>

#include <util/system/types.h>
#include <util/datetime/base.h>
#include <util/generic/size_literals.h>
#include <util/generic/set.h>

namespace NYql {

Expand Down Expand Up @@ -55,6 +58,17 @@ constexpr bool DEFAULT_JOIN_COMMON_USE_MULTI_OUT = false;
constexpr bool DEFAULT_USE_RPC_READER_IN_DQ = false;
constexpr size_t DEFAULT_RPC_READER_INFLIGHT = 1;
constexpr TDuration DEFAULT_RPC_READER_TIMEOUT = TDuration::Seconds(120);
const TSet<TString> DEFAULT_BLOCK_READER_SUPPORTED_TYPES = {"pg", "tuple"};
const TSet<NUdf::EDataSlot> DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES =
{
NUdf::EDataSlot::Int8, NUdf::EDataSlot::Uint8,
NUdf::EDataSlot::Int16, NUdf::EDataSlot::Uint16,
NUdf::EDataSlot::Int32, NUdf::EDataSlot::Uint32,
NUdf::EDataSlot::Int64, NUdf::EDataSlot::Uint64,
NUdf::EDataSlot::Bool, NUdf::EDataSlot::Double,
NUdf::EDataSlot::String, NUdf::EDataSlot::Json,
NUdf::EDataSlot::Yson, NUdf::EDataSlot::Utf8
};

constexpr auto DEFAULT_SWITCH_MEMORY_LIMIT = 128_MB;

Expand Down
12 changes: 12 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>

#include <library/cpp/yson/node/node_io.h>

Expand Down Expand Up @@ -460,6 +461,17 @@ TYtConfiguration::TYtConfiguration()
REGISTER_SETTING(*this, UseRPCReaderInDQ);
REGISTER_SETTING(*this, DQRPCReaderInflight).Lower(1);
REGISTER_SETTING(*this, DQRPCReaderTimeout);
REGISTER_SETTING(*this, BlockReaderSupportedTypes);
REGISTER_SETTING(*this, BlockReaderSupportedDataTypes)
.Parser([](const TString& v) {
TSet<TString> vec;
StringSplitter(v).SplitBySet(",").AddTo(&vec);
TSet<NUdf::EDataSlot> res;
for (auto& s: vec) {
res.emplace(NUdf::GetDataSlot(s));
}
return res;
});
REGISTER_SETTING(*this, MaxCpuUsageToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, MaxReplicationFactorToFuseMultiOuts).Lower(1.0);
REGISTER_SETTING(*this, ApplyStoredConstraints)
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ struct TYtSettings {
NCommon::TConfSetting<bool, true> UseRPCReaderInDQ;
NCommon::TConfSetting<size_t, true> DQRPCReaderInflight;
NCommon::TConfSetting<TDuration, true> DQRPCReaderTimeout;
NCommon::TConfSetting<TSet<TString>, true> BlockReaderSupportedTypes;
NCommon::TConfSetting<TSet<NUdf::EDataSlot>, true> BlockReaderSupportedDataTypes;

// Optimizers
NCommon::TConfSetting<bool, true> _EnableDq;
Expand Down
113 changes: 112 additions & 1 deletion ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,108 @@ static const THashSet<TStringBuf> POOL_TREES_WHITELIST = {"physical", "cloud",

using namespace NNodes;

namespace {
void BlockReaderAddInfo(TExprContext& ctx, const TPosition& pos, const TString& msg) {
ctx.IssueManager.RaiseIssue(YqlIssue(pos, EYqlIssueCode::TIssuesIds_EIssueCode_INFO, "Can't use block reader: " + msg));
}

bool CheckBlockReaderSupportedTypes(const TSet<TString>& list, const TSet<NUdf::EDataSlot>& dataTypesSupported, const TStructExprType* types, TExprContext& ctx, const TPosition& pos) {
TSet<ETypeAnnotationKind> supported;
for (const auto &e: list) {
if (e == "pg") {
supported.insert(ETypeAnnotationKind::Pg);
} else if (e == "tuple") {
supported.emplace(ETypeAnnotationKind::Tuple);
} else if (e == "struct") {
supported.emplace(ETypeAnnotationKind::Struct);
} else if (e == "dict") {
supported.emplace(ETypeAnnotationKind::Dict);
} else if (e == "list") {
supported.emplace(ETypeAnnotationKind::List);
} else if (e == "variant") {
supported.emplace(ETypeAnnotationKind::Variant);
} else {
// Unknown type
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unknown type: " << e);
return false;
}
}
if (dataTypesSupported.size()) {
supported.emplace(ETypeAnnotationKind::Data);
}
auto checkType = [&] (const TTypeAnnotationNode* type) {
if (type->GetKind() == ETypeAnnotationKind::Data) {
if (!supported.contains(ETypeAnnotationKind::Data)) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported data types");
return false;
}
if (!dataTypesSupported.contains(type->Cast<TDataExprType>()->GetSlot())) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
return false;
}
} else if (type->GetKind() == ETypeAnnotationKind::Pg) {
if (!supported.contains(ETypeAnnotationKind::Pg)) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported pg");
return false;
}
auto name = type->Cast<TPgExprType>()->GetName();
if (name == "float4" && !dataTypesSupported.contains(NUdf::EDataSlot::Float)) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "PgFloat4 unsupported yet since float is no supported");
return false;
}
} else {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
return false;
}
return true;
};

TVector<const TTypeAnnotationNode*> stack;

for (auto sub: types->GetItems()) {
auto subT = sub->GetItemType();
stack.push_back(subT);
}
while (!stack.empty()) {
auto el = stack.back();
stack.pop_back();
if (el->GetKind() == ETypeAnnotationKind::Optional) {
stack.push_back(el->Cast<TOptionalExprType>()->GetItemType());
continue;
}
if (!supported.contains(el->GetKind())) {
BlockReaderAddInfo(ctx, pos, TStringBuilder() << "unsupported " << el->GetKind());
return false;
}
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
stack.push_back(e);
}
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Struct) {
for (auto e: el->Cast<TStructExprType>()->GetItems()) {
stack.push_back(e->GetItemType());
}
continue;
} else if (el->GetKind() == ETypeAnnotationKind::List) {
stack.push_back(el->Cast<TListExprType>()->GetItemType());
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Dict) {
stack.push_back(el->Cast<TDictExprType>()->GetKeyType());
stack.push_back(el->Cast<TDictExprType>()->GetPayloadType());
continue;
} else if (el->GetKind() == ETypeAnnotationKind::Variant) {
stack.push_back(el->Cast<TVariantExprType>()->GetUnderlyingType());
continue;
}
if (!checkType(el)) {
return false;
}
}
return true;
}
};

class TYtDqIntegration: public TDqIntegrationBase {
public:
TYtDqIntegration(TYtState* state)
Expand Down Expand Up @@ -375,25 +477,34 @@ class TYtDqIntegration: public TDqIntegrationBase {
if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) {
return false;
}


auto supportedTypes = State_->Configuration->BlockReaderSupportedTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_TYPES);
auto supportedDataTypes = State_->Configuration->BlockReaderSupportedDataTypes.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_BLOCK_READER_SUPPORTED_DATA_TYPES);
const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back())->Cast<TStructExprType>();
if (!CheckBlockReaderSupportedTypes(supportedTypes, supportedDataTypes, structType, ctx, ctx.GetPosition(node.Pos()))) {
return false;
}

TVector<const TTypeAnnotationNode*> subTypeAnn(Reserve(structType->GetItems().size()));
for (const auto& type: structType->GetItems()) {
subTypeAnn.emplace_back(type->GetItemType());
}

if (!State_->Types->ArrowResolver) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "no arrow resolver provided");
return false;
}

if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "arrow resolver don't support these types");
return false;
}

const TYtSectionList& sectionList = wrap.Input().Cast<TYtReadTable>().Input();
for (size_t i = 0; i < sectionList.Size(); ++i) {
auto section = sectionList.Item(i);
if (!NYql::GetSettingAsColumnList(section.Settings().Ref(), EYtSettingType::SysColumns).empty()) {
BlockReaderAddInfo(ctx, ctx.GetPosition(node.Pos()), "system column");
return false;
}
}
Expand Down

0 comments on commit b339096

Please sign in to comment.