Skip to content

Commit

Permalink
Merge 51f5e62 into 434b4dd
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Aug 26, 2024
2 parents 434b4dd + 51f5e62 commit 01245d2
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 19 deletions.
28 changes: 21 additions & 7 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
Expand Down Expand Up @@ -64,7 +65,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
}

if (auto issues = Validate(schema, objectStorage, PathsLimit)) {
if (auto issues = Validate(schema, objectStorage, PathsLimit, general.location())) {
ythrow TExternalSourceException() << issues.ToString();
}

Expand Down Expand Up @@ -133,11 +134,18 @@ struct TObjectStorageExternalSource : public IExternalSource {
}

template<typename TScheme, typename TObjectStorage>
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit) {
static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit, const TString& location) {
NYql::TIssues issues;
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting()));
if (TString errorString = NYql::NS3::ValidateWildcards(location)) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Location '" << location << "' contains invalid wildcard: " << errorString));
}
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) {
if (hasPartitioning) {
if (NYql::NS3::HasWildcards(location)) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Location '" << location << "' contains wildcards"));
}
try {
TVector<TString> partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()};
issues.AddIssues(ValidateProjectionColumns(schema, partitionedBy));
Expand All @@ -157,11 +165,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) {
static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting, const TString& location, bool hasPartitioning) {
NYql::TIssues issues;
issues.AddIssues(ValidateDateFormatSetting(formatSetting));
for (const auto& [key, value]: formatSetting) {
if (key == "file_pattern"sv) {
if (TString errorString = NYql::NS3::ValidateWildcards(value)) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "File pattern '" << value << "' contains invalid wildcard: " << errorString));
}
if (value && !hasPartitioning && !location.EndsWith("/")) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Path pattern cannot be used with file_pattern"));
}
continue;
}

Expand Down Expand Up @@ -616,8 +630,8 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit);
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location) {
return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit, location);
}

NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegEx
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit, const TString& location);

NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);

Expand Down
26 changes: 26 additions & 0 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
general.mutable_attributes()->insert({"projection.h", "b"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Partition by must always be specified");
}

Y_UNIT_TEST(WildcardsValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false);
NKikimrExternalSources::TSchema schema;

{ // location
NKikimrExternalSources::TGeneral general;
general.set_location("{");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '{' contains invalid wildcard:");
}

{ // file pattern
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"file_pattern", "{"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "File pattern '{' contains invalid wildcard:");
general.set_location("/test_file");
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Path pattern cannot be used with file_pattern");
}

{ // partitioned by
NKikimrExternalSources::TGeneral general;
general.set_location("*");
general.mutable_attributes()->insert({"partitioned_by", "[year]"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Location '*' contains wildcards");
}
}
}

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
case FederatedQuery::BindingSetting::kObjectStorage:
const FederatedQuery::ObjectStorageBinding objectStorage = setting.object_storage();
for (const auto& subset: objectStorage.subset()) {
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit));
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit, subset.path_pattern()));
}
break;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/utils/scheme_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescriptio
columnDesc.SetNotNull(columnIt->second.NotNull);
}
NKikimrExternalSources::TGeneral general;
general.set_location(settings.Location);
auto& attributes = *general.mutable_attributes();
for (const auto& [key, value]: settings.SourceTypeParameters) {
attributes.insert({key, value});
Expand Down
58 changes: 56 additions & 2 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
const TString bucket = "test_bucket1";
const TString object = TStringBuilder() << "test_" << GetSymbolsString(' ', '~', "{}") << "_object";
const TString object = TStringBuilder() << "test_" << GetSymbolsString(' ', '~', "*?{}") << "_object";

CreateBucketWithObject(bucket, object, TEST_CONTENT);

Expand Down Expand Up @@ -1802,7 +1802,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {

Y_UNIT_TEST(TestReadEmptyFileWithCsvFormat) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString bucket = "test_bucket1";
const TString bucket = "test_bucket12";

CreateBucketWithObject(bucket, "test_object", "");

Expand Down Expand Up @@ -1840,6 +1840,60 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
}

Y_UNIT_TEST(TestWildcardValidation) {
const TString bucket = "test_bucket13";

CreateBucket(bucket);

auto kikimr = NTestUtils::MakeKikimrRunner();

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);)",
"location"_a = GetBucketLocation(bucket)
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto db = kikimr->GetQueryClient();

{ // path validation
const TString sql = R"(
SELECT * FROM `/Root/external_data_source`.`/{` WITH (
SCHEMA = (data String),
FORMAT = "csv_with_names"
))";

auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Path '/{' contains invalid wildcard:");
}

{ // file pattern validation
const TString sql = R"(
SELECT * FROM `/Root/external_data_source`.`/` WITH (
SCHEMA = (data String),
FORMAT = "csv_with_names",
FILE_PATTERN = "{"
))";

auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "File pattern '{' contains invalid wildcard:");
}
}
}

} // namespace NKikimr::NKqp
22 changes: 22 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,28 @@ Y_UNIT_TEST_SUITE(KqpFederatedSchemeTest) {
};
TestInvalidDropForExternalTableWithAuth(queryClientExecutor, "generic_query");
}

Y_UNIT_TEST(ExternalTableDdlLocationValidation) {
auto kikimr = NTestUtils::MakeKikimrRunner();
auto db = kikimr->GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto query = TStringBuilder() << R"(
CREATE EXTERNAL DATA SOURCE `/Root/ExternalDataSource` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="my-bucket",
AUTH_METHOD="NONE"
);
CREATE EXTERNAL TABLE `/Root/ExternalTable` (
Key Uint64,
Value String
) WITH (
DATA_SOURCE="/Root/ExternalDataSource",
LOCATION="{"
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Location '{' contains invalid wildcard:");
}
}

} // namespace NKikimr::NKqp
8 changes: 4 additions & 4 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5146,7 +5146,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
month Int64 NOT NULL
) WITH (
DATA_SOURCE=")" << externalDataSourceName << R"(",
LOCATION="/folder1/*",
LOCATION="/folder1/",
FORMAT="json_as_string",
`projection.enabled`="true",
`projection.year.type`="integer",
Expand All @@ -5171,7 +5171,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(externalTable.ExternalTableInfo);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 4);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetDataSourcePath(), externalDataSourceName);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/*");
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/");
}

Y_UNIT_TEST(CreateExternalTableWithUpperCaseSettings) {
Expand All @@ -5194,7 +5194,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Month Int64 NOT NULL
) WITH (
DATA_SOURCE=")" << externalDataSourceName << R"(",
LOCATION="/folder1/*",
LOCATION="/folder1/",
FORMAT="json_as_string",
`projection.enabled`="true",
`projection.Year.type`="integer",
Expand All @@ -5219,7 +5219,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(externalTable.ExternalTableInfo);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.ColumnsSize(), 4);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetDataSourcePath(), externalDataSourceName);
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/*");
UNIT_ASSERT_VALUES_EQUAL(externalTable.ExternalTableInfo->Description.GetLocation(), "/folder1/");
}

Y_UNIT_TEST(DoubleCreateExternalTable) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/external_sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ message TSchema {

message TGeneral {
map<string, string> attributes = 1 [(Ydb.size).le = 100];
optional string location = 2;
}

message TObjectStorage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,
} else {
re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
}
Y_ENSURE(re->ok());

const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(DEBUG, ProviderS3)
Expand Down
33 changes: 29 additions & 4 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,21 @@ TString RegexFromWildcards(const std::string_view& pattern) {
for (const char& c : escaped) {
switch (c) {
case '{':
result << "(?:";
group = true;
if (group) {
result << "\\{";
} else {
result << "(?:";
group = true;
}
slash = false;
break;
case '}':
result << ')';
group = false;
if (group) {
result << ')';
group = false;
} else {
result << "\\}";
}
slash = false;
break;
case ',':
Expand Down Expand Up @@ -89,7 +97,24 @@ TString RegexFromWildcards(const std::string_view& pattern) {
break;
}
}
Y_ENSURE(!group, "Found unterminated group");
Y_ENSURE(!slash, "Expected symbol after slash");
return result;
}

TString ValidateWildcards(const std::string_view& pattern) {
std::optional<size_t> groupStart;
for (size_t i = 0; i < pattern.size(); ++i) {
if (pattern[i] == '{' && !groupStart) {
groupStart = i;
} else if (pattern[i] == '}') {
groupStart = std::nullopt;
}
}
if (groupStart) {
return TStringBuilder() << "found unterminated group start at position " << *groupStart;
}
return {};
}

}
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ TString EscapeRegex(const TString& str);
TString EscapeRegex(const std::string_view& str);

TString RegexFromWildcards(const std::string_view& pattern);
TString ValidateWildcards(const std::string_view& pattern);

}
21 changes: 21 additions & 0 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ Y_UNIT_TEST_SUITE(TPathTests) {
UNIT_ASSERT_VALUES_EQUAL(NormalizePath("/a/b/c/"), "a/b/c/");
UNIT_ASSERT_VALUES_EQUAL(NormalizePath("///a/b/c///"), "a/b/c/");
}

void TestRegexFromWildcardsSuccess(const TString& wildcards, const TString& expectedRegex) {
TString errorString = ValidateWildcards(wildcards);
UNIT_ASSERT_C(errorString.empty(), errorString);
UNIT_ASSERT_VALUES_EQUAL(RegexFromWildcards(wildcards), expectedRegex);
}

void TestRegexFromWildcardsFail(const TString& wildcards, const TString& expectedException, const TString& expectedError) {
UNIT_ASSERT_STRING_CONTAINS(ValidateWildcards(wildcards), expectedError);
UNIT_ASSERT_EXCEPTION_CONTAINS(RegexFromWildcards(wildcards), yexception, expectedException);
}

Y_UNIT_TEST(TestRegexFromWildcards) {
TestRegexFromWildcardsSuccess("first,test\\_{alt1,alt2}_text", "first\\,test\\\\_(?:alt1|alt2)_text");
TestRegexFromWildcardsSuccess("hello.*world?str", "hello\\..*world.str");
TestRegexFromWildcardsSuccess("many_{},{alt1,al?t2,al*t3},{alt4}_alts", "many_(?:)\\,(?:alt1|al.t2|al.*t3)\\,(?:alt4)_alts");
TestRegexFromWildcardsSuccess("hello}{}}world", "hello\\}(?:)\\}world");
TestRegexFromWildcardsSuccess("hello{{{}world", "hello(?:\\{\\{)world");

TestRegexFromWildcardsFail("hello{}}{world", "Found unterminated group", "found unterminated group start at position 8");
}
}

}
9 changes: 9 additions & 0 deletions ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,10 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
if (!FindFilePattern(settings, ctx, filePattern)) {
return false;
}
if (TString errorString = NS3::ValidateWildcards(filePattern)) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "File pattern '" << filePattern << "' contains invalid wildcard: " << errorString));
return false;
}
const TString effectiveFilePattern = filePattern ? filePattern : "*";

TVector<TString> paths;
Expand Down Expand Up @@ -763,6 +767,11 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
}

for (const auto& path : paths) {
if (TString errorString = NS3::ValidateWildcards(path)) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "Path '" << path << "' contains invalid wildcard: " << errorString));
return false;
}

// each path in CONCAT() can generate multiple list requests for explicit partitioning
TVector<TListRequest> reqs;

Expand Down

0 comments on commit 01245d2

Please sign in to comment.