Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3010 Check fields count and types in raw format / to ydb stable #5071

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit) {
NYql::TIssues issues;
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting()));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) {
try {
TVector<TString> partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()};
Expand Down Expand Up @@ -223,6 +224,37 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

template<typename TScheme>
static NYql::TIssues ValidateRawFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
if (format != "raw"sv) {
return issues;
}

ui64 realSchemaColumnsCount = 0;
Ydb::Column lastColumn;
TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};

for (const auto& column: schema.column()) {
if (partitionedBySet.contains(column.name())) {
continue;
}
if (!ValidateStringType(column.type())) {
issues.AddIssue(MakeErrorIssue(
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder{} << TStringBuilder() << "Only string type column in schema supported in raw format (you have '"
<< column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
}
++realSchemaColumnsCount;
}

if (realSchemaColumnsCount != 1) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder{} << TStringBuilder() << "Only one column in schema supported in raw format (you have "
<< realSchemaColumnsCount << " fields)"));
}
return issues;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -416,6 +448,29 @@ struct TObjectStorageExternalSource : public IExternalSource {
return dataSlotColumns;
}

static std::vector<NYdb::TType> GetStringTypes() {
NYdb::TType stringType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::String).Build();
NYdb::TType utf8Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Utf8).Build();
NYdb::TType ysonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Yson).Build();
NYdb::TType jsonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Json).Build();
const std::vector<NYdb::TType> result {
stringType,
utf8Type,
ysonType,
jsonType,
NYdb::TTypeBuilder{}.Optional(stringType).Build(),
NYdb::TTypeBuilder{}.Optional(utf8Type).Build(),
NYdb::TTypeBuilder{}.Optional(ysonType).Build(),
NYdb::TTypeBuilder{}.Optional(jsonType).Build()
};
return result;
}

static bool ValidateStringType(const NYdb::TType& columnType) {
static const std::vector<NYdb::TType> availableTypes = GetStringTypes();
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
}

private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
Expand All @@ -435,4 +490,8 @@ NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TSt
return TObjectStorageExternalSource::ValidateDateFormatSetting(formatSetting, matchAllSettings);
}

NYql::TIssues ValidateRawFormat(const TString& format, const FederatedQuery::Schema& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
return TObjectStorageExternalSource::ValidateRawFormat(format, schema, partitionedBy);
}

}
2 changes: 2 additions & 0 deletions ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuer

NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);

NYql::TIssues ValidateRawFormat(const TString& format, const FederatedQuery::Schema& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
}
issues.AddIssues(NKikimr::NExternalSource::ValidateDateFormatSetting(dataStreams.format_setting(), true));
issues.AddIssues(NKikimr::NExternalSource::ValidateRawFormat(dataStreams.format(), dataStreams.schema(), google::protobuf::RepeatedPtrField<TString>()));
break;
}
case FederatedQuery::BindingSetting::BINDING_NOT_SET: {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/common/mkql/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ TRuntimeNode BuildParseCall(
if (parseItemStructType->GetMembersCount() == 0) {
return ctx.ProgramBuilder.NewStruct(parseItemType, {});
}
MKQL_ENSURE(parseItemStructType->GetMembersCount() == 1, "Only one column in schema supported in raw format");

bool isOptional;
const auto schemeType = UnpackOptionalData(
Expand Down
46 changes: 41 additions & 5 deletions ydb/library/yql/providers/common/provider/yql_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1378,13 +1378,49 @@ bool ValidateCompressionForOutput(std::string_view format, std::string_view comp
return false;
}

bool ValidateFormatForInput(std::string_view format, TExprContext& ctx) {
if (format.empty() || IsIn(FormatsForInput, format)) {
bool ValidateFormatForInput(
std::string_view format,
const TStructExprType* schemaStructRowType,
const std::function<bool(TStringBuf)>& excludeFields,
TExprContext& ctx) {
if (format.empty()) {
return true;
}
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
<< ". Use one of: " << JoinSeq(", ", FormatsForInput)));
return false;

if (!IsIn(FormatsForInput, format)) {
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
<< ". Use one of: " << JoinSeq(", ", FormatsForInput)));
return false;
}

if (schemaStructRowType && format == TStringBuf("raw")) {
ui64 realSchemaColumnsCount = 0;

for (const TItemExprType* item : schemaStructRowType->GetItems()) {
if (excludeFields && excludeFields(item->GetName())) {
continue;
}
const TTypeAnnotationNode* rowType = item->GetItemType();
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
}

if (rowType->GetKind() != ETypeAnnotationKind::Data
|| !IsDataTypeString(rowType->Cast<TDataExprType>()->GetSlot())) {
ctx.AddError(TIssue(TStringBuilder() << "Only string type column in schema supported in raw format (you have '"
<< item->GetName() << " " << FormatType(rowType) << "' field)"));
return false;
}
++realSchemaColumnsCount;
}

if (realSchemaColumnsCount != 1) {
ctx.AddError(TIssue(TStringBuilder() << "Only one column in schema supported in raw format (you have "
<< realSchemaColumnsCount << " fields)"));
return false;
}
}
return true;
}

bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/common/provider/yql_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);
bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx);

bool ValidateFormatForInput(std::string_view format, TExprContext& ctx);
bool ValidateFormatForInput(std::string_view format, const TStructExprType* schemaStructRowType, const std::function<bool(TStringBuf)>& excludeFields, TExprContext& ctx);
bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx);

bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,27 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

auto format = read.Format().Ref().Content();
if (!NCommon::ValidateFormatForInput(format, ctx)) {
TPqTopic topic = read.Topic();
if (!EnsureCallable(topic.Ref(), ctx)) {
return TStatus::Error;
}

if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) {
TVector<TString> columnOrder;
auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder);
if (!schema) {
return TStatus::Error;
}

TPqTopic topic = read.Topic();
if (!EnsureCallable(topic.Ref(), ctx)) {
auto format = read.Format().Ref().Content();
if (!NCommon::ValidateFormatForInput(
format,
schema->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(),
[](TStringBuf fieldName) {return FindPqMetaFieldDescriptorBySysColumn(TString(fieldName)); },
ctx)) {
return TStatus::Error;
}

TVector<TString> columnOrder;
auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder);
if (!schema) {
if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) {
return TStatus::Error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) ||
!NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx))
!NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), nullptr, nullptr, ctx))
{
return TStatus::Error;
}
Expand Down Expand Up @@ -438,13 +438,15 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
std::vector<TString> partitionedBy;
TString projection;
{
THashSet<TStringBuf> columns;
TS3Object s3Object(input->Child(TS3ReadObject::idx_Object));
auto format = s3Object.Format().Ref().Content();
const TStructExprType* structRowType = rowType->Cast<TStructExprType>();

THashSet<TStringBuf> columns;
for (const TItemExprType* item : structRowType->GetItems()) {
columns.emplace(item->GetName());
}

TS3Object s3Object(input->Child(TS3ReadObject::idx_Object));

if (TMaybeNode<TExprBase> settings = s3Object.Settings()) {
for (auto& settingNode : settings.Raw()->ChildrenList()) {
const TStringBuf name = settingNode->Head().Content();
Expand All @@ -461,13 +463,21 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "Table contains no columns except partitioning columns"));
return TStatus::Error;
}

}
if (name == "projection"sv) {
projection = settingNode->Tail().Content();
}
}
}

TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};
if (!NCommon::ValidateFormatForInput(
format,
structRowType,
[partitionedBySet](TStringBuf fieldName) {return partitionedBySet.contains(fieldName); },
ctx)) {
return TStatus::Error;
}
}

if (!ValidateProjectionTypes(
Expand Down Expand Up @@ -550,7 +560,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

const auto format = input->Child(TS3Object::idx_Format)->Content();
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, ctx)) {
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, nullptr, nullptr, ctx)) {
return TStatus::Error;
}

Expand Down
14 changes: 14 additions & 0 deletions ydb/tests/fq/s3/test_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,17 @@ def test_ast_in_failed_query_compilation(self, kikimr, s3, client):

ast = client.describe_query(query_id).result.query.ast.data
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix):
kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")
binding_response = client.create_object_storage_binding(name=unique_prefix + "my_binding",
path="fruits.csv",
format="raw",
connection_id=connection_response.result.connection_id,
columns=[],
check_issues=False)
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
binding_response.issues)
17 changes: 17 additions & 0 deletions ydb/tests/fq/s3/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,20 @@ def test_precompute(self, kikimr, s3, client):
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].bytes_value == b"Pear"
assert result_set.rows[0].items[1].int32_value == 15

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_raw_empty_schema_query(self, kikimr, s3, client, unique_prefix):
self.create_bucket_and_upload_file("test.parquet", s3, kikimr)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")
sql = f'''
SELECT * FROM `{storage_connection_name}`.`*`
WITH (format=raw, SCHEMA ());
'''

query_id = client.create_query("test_raw_empty_schema", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
describe_string = "{}".format(describe_result)
assert r"Only one column in schema supported in raw format" in describe_string
15 changes: 15 additions & 0 deletions ydb/tests/fq/yds/test_yds_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,18 @@ def test_yds_insert(self, client):
assert result_set.rows[0].items[1].text_value == 'xxx'
assert result_set.rows[1].items[0].int32_value == 456
assert result_set.rows[1].items[1].text_value == 'yyy'

@yq_v1
def test_raw_empty_schema_binding(self, kikimr, client, yq_version):
self.init_topics(f"pq_test_raw_empty_schema_binding_{yq_version}")
connection_response = client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"),
os.getenv("YDB_ENDPOINT"))
assert not connection_response.issues, str(connection_response.issues)
binding_response = client.create_yds_binding(name="my_binding",
stream=self.input_topic,
format="raw",
connection_id=connection_response.result.connection_id,
columns=[],
check_issues=False)
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
binding_response.issues)
Loading