Skip to content

Commit

Permalink
YQ-3010 Check fields count and types in raw format / to ydb stable (y…
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored and uzhastik committed Jun 20, 2024
1 parent 65c72c4 commit 57dd8de
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 20 deletions.
59 changes: 59 additions & 0 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit) {
NYql::TIssues issues;
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting()));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) {
try {
TVector<TString> partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()};
Expand Down Expand Up @@ -223,6 +224,37 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

template<typename TScheme>
static NYql::TIssues ValidateRawFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
if (format != "raw"sv) {
return issues;
}

ui64 realSchemaColumnsCount = 0;
Ydb::Column lastColumn;
TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};

for (const auto& column: schema.column()) {
if (partitionedBySet.contains(column.name())) {
continue;
}
if (!ValidateStringType(column.type())) {
issues.AddIssue(MakeErrorIssue(
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder{} << TStringBuilder() << "Only string type column in schema supported in raw format (you have '"
<< column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
}
++realSchemaColumnsCount;
}

if (realSchemaColumnsCount != 1) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder{} << TStringBuilder() << "Only one column in schema supported in raw format (you have "
<< realSchemaColumnsCount << " fields)"));
}
return issues;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -416,6 +448,29 @@ struct TObjectStorageExternalSource : public IExternalSource {
return dataSlotColumns;
}

static std::vector<NYdb::TType> GetStringTypes() {
NYdb::TType stringType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::String).Build();
NYdb::TType utf8Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Utf8).Build();
NYdb::TType ysonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Yson).Build();
NYdb::TType jsonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Json).Build();
const std::vector<NYdb::TType> result {
stringType,
utf8Type,
ysonType,
jsonType,
NYdb::TTypeBuilder{}.Optional(stringType).Build(),
NYdb::TTypeBuilder{}.Optional(utf8Type).Build(),
NYdb::TTypeBuilder{}.Optional(ysonType).Build(),
NYdb::TTypeBuilder{}.Optional(jsonType).Build()
};
return result;
}

static bool ValidateStringType(const NYdb::TType& columnType) {
static const std::vector<NYdb::TType> availableTypes = GetStringTypes();
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
}

private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
Expand All @@ -435,4 +490,8 @@ NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TSt
return TObjectStorageExternalSource::ValidateDateFormatSetting(formatSetting, matchAllSettings);
}

NYql::TIssues ValidateRawFormat(const TString& format, const FederatedQuery::Schema& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
return TObjectStorageExternalSource::ValidateRawFormat(format, schema, partitionedBy);
}

}
2 changes: 2 additions & 0 deletions ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuer

NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);

NYql::TIssues ValidateRawFormat(const TString& format, const FederatedQuery::Schema& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
}
issues.AddIssues(NKikimr::NExternalSource::ValidateDateFormatSetting(dataStreams.format_setting(), true));
issues.AddIssues(NKikimr::NExternalSource::ValidateRawFormat(dataStreams.format(), dataStreams.schema(), google::protobuf::RepeatedPtrField<TString>()));
break;
}
case FederatedQuery::BindingSetting::BINDING_NOT_SET: {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/common/mkql/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ TRuntimeNode BuildParseCall(
if (parseItemStructType->GetMembersCount() == 0) {
return ctx.ProgramBuilder.NewStruct(parseItemType, {});
}
MKQL_ENSURE(parseItemStructType->GetMembersCount() == 1, "Only one column in schema supported in raw format");

bool isOptional;
const auto schemeType = UnpackOptionalData(
Expand Down
46 changes: 41 additions & 5 deletions ydb/library/yql/providers/common/provider/yql_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1378,13 +1378,49 @@ bool ValidateCompressionForOutput(std::string_view format, std::string_view comp
return false;
}

bool ValidateFormatForInput(std::string_view format, TExprContext& ctx) {
if (format.empty() || IsIn(FormatsForInput, format)) {
bool ValidateFormatForInput(
std::string_view format,
const TStructExprType* schemaStructRowType,
const std::function<bool(TStringBuf)>& excludeFields,
TExprContext& ctx) {
if (format.empty()) {
return true;
}
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
<< ". Use one of: " << JoinSeq(", ", FormatsForInput)));
return false;

if (!IsIn(FormatsForInput, format)) {
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
<< ". Use one of: " << JoinSeq(", ", FormatsForInput)));
return false;
}

if (schemaStructRowType && format == TStringBuf("raw")) {
ui64 realSchemaColumnsCount = 0;

for (const TItemExprType* item : schemaStructRowType->GetItems()) {
if (excludeFields && excludeFields(item->GetName())) {
continue;
}
const TTypeAnnotationNode* rowType = item->GetItemType();
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
}

if (rowType->GetKind() != ETypeAnnotationKind::Data
|| !IsDataTypeString(rowType->Cast<TDataExprType>()->GetSlot())) {
ctx.AddError(TIssue(TStringBuilder() << "Only string type column in schema supported in raw format (you have '"
<< item->GetName() << " " << FormatType(rowType) << "' field)"));
return false;
}
++realSchemaColumnsCount;
}

if (realSchemaColumnsCount != 1) {
ctx.AddError(TIssue(TStringBuilder() << "Only one column in schema supported in raw format (you have "
<< realSchemaColumnsCount << " fields)"));
return false;
}
}
return true;
}

bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/common/provider/yql_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);
bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx);

bool ValidateFormatForInput(std::string_view format, TExprContext& ctx);
bool ValidateFormatForInput(std::string_view format, const TStructExprType* schemaStructRowType, const std::function<bool(TStringBuf)>& excludeFields, TExprContext& ctx);
bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx);

bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,27 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

auto format = read.Format().Ref().Content();
if (!NCommon::ValidateFormatForInput(format, ctx)) {
TPqTopic topic = read.Topic();
if (!EnsureCallable(topic.Ref(), ctx)) {
return TStatus::Error;
}

if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) {
TVector<TString> columnOrder;
auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder);
if (!schema) {
return TStatus::Error;
}

TPqTopic topic = read.Topic();
if (!EnsureCallable(topic.Ref(), ctx)) {
auto format = read.Format().Ref().Content();
if (!NCommon::ValidateFormatForInput(
format,
schema->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(),
[](TStringBuf fieldName) {return FindPqMetaFieldDescriptorBySysColumn(TString(fieldName)); },
ctx)) {
return TStatus::Error;
}

TVector<TString> columnOrder;
auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder);
if (!schema) {
if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) ||
!NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx))
!NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), nullptr, nullptr, ctx))
{
return TStatus::Error;
}
Expand Down Expand Up @@ -438,13 +438,15 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
std::vector<TString> partitionedBy;
TString projection;
{
THashSet<TStringBuf> columns;
TS3Object s3Object(input->Child(TS3ReadObject::idx_Object));
auto format = s3Object.Format().Ref().Content();
const TStructExprType* structRowType = rowType->Cast<TStructExprType>();

THashSet<TStringBuf> columns;
for (const TItemExprType* item : structRowType->GetItems()) {
columns.emplace(item->GetName());
}

TS3Object s3Object(input->Child(TS3ReadObject::idx_Object));

if (TMaybeNode<TExprBase> settings = s3Object.Settings()) {
for (auto& settingNode : settings.Raw()->ChildrenList()) {
const TStringBuf name = settingNode->Head().Content();
Expand All @@ -461,13 +463,21 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "Table contains no columns except partitioning columns"));
return TStatus::Error;
}

}
if (name == "projection"sv) {
projection = settingNode->Tail().Content();
}
}
}

TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};
if (!NCommon::ValidateFormatForInput(
format,
structRowType,
[partitionedBySet](TStringBuf fieldName) {return partitionedBySet.contains(fieldName); },
ctx)) {
return TStatus::Error;
}
}

if (!ValidateProjectionTypes(
Expand Down Expand Up @@ -550,7 +560,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

const auto format = input->Child(TS3Object::idx_Format)->Content();
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, ctx)) {
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, nullptr, nullptr, ctx)) {
return TStatus::Error;
}

Expand Down
14 changes: 14 additions & 0 deletions ydb/tests/fq/s3/test_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,17 @@ def test_ast_in_failed_query_compilation(self, kikimr, s3, client):

ast = client.describe_query(query_id).result.query.ast.data
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix):
kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")
binding_response = client.create_object_storage_binding(name=unique_prefix + "my_binding",
path="fruits.csv",
format="raw",
connection_id=connection_response.result.connection_id,
columns=[],
check_issues=False)
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
binding_response.issues)
17 changes: 17 additions & 0 deletions ydb/tests/fq/s3/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,20 @@ def test_precompute(self, kikimr, s3, client):
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].bytes_value == b"Pear"
assert result_set.rows[0].items[1].int32_value == 15

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_raw_empty_schema_query(self, kikimr, s3, client, unique_prefix):
self.create_bucket_and_upload_file("test.parquet", s3, kikimr)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")
sql = f'''
SELECT * FROM `{storage_connection_name}`.`*`
WITH (format=raw, SCHEMA ());
'''

query_id = client.create_query("test_raw_empty_schema", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
describe_string = "{}".format(describe_result)
assert r"Only one column in schema supported in raw format" in describe_string
15 changes: 15 additions & 0 deletions ydb/tests/fq/yds/test_yds_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,18 @@ def test_yds_insert(self, client):
assert result_set.rows[0].items[1].text_value == 'xxx'
assert result_set.rows[1].items[0].int32_value == 456
assert result_set.rows[1].items[1].text_value == 'yyy'

@yq_v1
def test_raw_empty_schema_binding(self, kikimr, client, yq_version):
self.init_topics(f"pq_test_raw_empty_schema_binding_{yq_version}")
connection_response = client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"),
os.getenv("YDB_ENDPOINT"))
assert not connection_response.issues, str(connection_response.issues)
binding_response = client.create_yds_binding(name="my_binding",
stream=self.input_topic,
format="raw",
connection_id=connection_response.result.connection_id,
columns=[],
check_issues=False)
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
binding_response.issues)

0 comments on commit 57dd8de

Please sign in to comment.