From 041ec57c39cfb096c87851e7077dad8ca321a9ff Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Wed, 25 Sep 2024 17:19:50 +0200 Subject: [PATCH] s3 listing strategy has been fixed (#9499) --- .../s3/provider/yql_s3_listing_strategy.cpp | 20 ++--- ydb/tests/fq/s3/conftest.py | 4 +- ydb/tests/fq/s3/test_s3_1.py | 78 +++++++++++++++++++ ydb/tests/tools/fq_runner/kikimr_utils.py | 12 +++ 4 files changed, 103 insertions(+), 11 deletions(-) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp index c448f8b14c6c..3fa5b08e6b4b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -27,6 +27,11 @@ IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& option namespace { +TString ParseBasePath(const TString& path) { + TString basePath = TString{TStringBuf{path}.RBefore('/')}; + return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath; +} + using namespace NThreading; using namespace NS3Lister; @@ -497,15 +502,10 @@ class TBFSDirectoryResolverIterator : public IS3Lister { return NextDirectoryListeningChunk; } - static TString ParseBasePath(const TString& path) { - TString basePath = TString{TStringBuf{path}.RBefore('/')}; - return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath; - } - void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) { result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)}); for (auto& directoryPrefix : DirectoryPrefixQueue) { - result.Directories.push_back({.Path = directoryPrefix}); + result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)}); } DirectoryPrefixQueue.clear(); } @@ -524,10 +524,10 @@ class TBFSDirectoryResolverIterator : public IS3Lister { } } else { for (auto& directoryPrefix : listingResult.Directories) { - result.Directories.push_back({.Path = directoryPrefix.Path}); + result.Directories.push_back({.Path = ParseBasePath(directoryPrefix.Path)}); } for (auto& directoryPrefix : DirectoryPrefixQueue) { - result.Directories.push_back({.Path = directoryPrefix}); + result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)}); } DirectoryPrefixQueue.clear(); } @@ -775,10 +775,10 @@ class TConcurrentBFSDirectoryResolverIterator : public IS3Lister { // TODO: add verification auto result = TListEntries{.Objects = Objects, .ListedObjectSize = ListedObjectSize}; for (auto& directoryPrefix : DirectoryPrefixQueue) { - result.Directories.push_back({.Path = directoryPrefix}); + result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)}); } for (auto& directoryPrefix: InProgressPaths) { - result.Directories.push_back({.Path = directoryPrefix}); + result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)}); } for (auto& directoryEntry : Directories) { result.Directories.push_back(directoryEntry); diff --git a/ydb/tests/fq/s3/conftest.py b/ydb/tests/fq/s3/conftest.py index 42965ba92bab..5393ad70d666 100644 --- a/ydb/tests/fq/s3/conftest.py +++ b/ydb/tests/fq/s3/conftest.py @@ -8,9 +8,10 @@ from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support -from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension +from ydb.tests.tools.fq_runner.kikimr_utils import AddAllowConcurrentListingsExtension from ydb.tests.tools.fq_runner.kikimr_utils import AddDataInflightExtension from ydb.tests.tools.fq_runner.kikimr_utils import AddFormatSizeLimitExtension +from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension @@ -89,6 +90,7 @@ def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external return [ AddFormatSizeLimitExtension(), AddInflightExtension(), + AddAllowConcurrentListingsExtension(), AddDataInflightExtension(), DefaultConfigExtension(s3.s3_url), YQv2Extension(yq_version, kikimr_settings.get("is_replace_if_exists", False)), diff --git a/ydb/tests/fq/s3/test_s3_1.py b/ydb/tests/fq/s3/test_s3_1.py index 0f4260bdf4cc..fc7c6868af0e 100644 --- a/ydb/tests/fq/s3/test_s3_1.py +++ b/ydb/tests/fq/s3/test_s3_1.py @@ -557,3 +557,81 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre assert result_set.rows[5].items[1].int32_value == 15 assert result_set.rows[5].items[2].int32_value == 33 assert sum(kikimr.control_plane.get_metering(1)) == 10 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) + @pytest.mark.parametrize("kikimr_params", [{"allow_concurrent_listings": True}], indirect=True) + def test_top_level_listing_2(self, kikimr, s3, client, runtime_listing, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Fruit,Price,Weight +Banana,3,100 +Apple,2,22 +Pear,15,33''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-09.csv', ContentType='text/plain') + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-09-08.csv', ContentType='text/plain') + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-08.csv', ContentType='text/plain') + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/a/2024-08-08.csv', ContentType='text/plain') + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/b/test.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "test_top_level_listing_2" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; + + SELECT * + FROM `{storage_connection_name}`.`/2024-08-*` + WITH (format=csv_with_names, SCHEMA ( + Fruit String NOT NULL, + Price Int NOT NULL, + Weight Int NOT NULL + ) + ); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "Fruit" + assert result_set.columns[0].type.type_id == ydb.Type.STRING + assert result_set.columns[1].name == "Price" + assert result_set.columns[1].type.type_id == ydb.Type.INT32 + assert result_set.columns[2].name == "Weight" + assert result_set.columns[2].type.type_id == ydb.Type.INT32 + assert len(result_set.rows) == 6 + assert result_set.rows[0].items[0].bytes_value == b"Banana" + assert result_set.rows[0].items[1].int32_value == 3 + assert result_set.rows[0].items[2].int32_value == 100 + assert result_set.rows[1].items[0].bytes_value == b"Apple" + assert result_set.rows[1].items[1].int32_value == 2 + assert result_set.rows[1].items[2].int32_value == 22 + assert result_set.rows[2].items[0].bytes_value == b"Pear" + assert result_set.rows[2].items[1].int32_value == 15 + assert result_set.rows[2].items[2].int32_value == 33 + assert result_set.rows[3].items[0].bytes_value == b"Banana" + assert result_set.rows[3].items[1].int32_value == 3 + assert result_set.rows[3].items[2].int32_value == 100 + assert result_set.rows[4].items[0].bytes_value == b"Apple" + assert result_set.rows[4].items[1].int32_value == 2 + assert result_set.rows[4].items[2].int32_value == 22 + assert result_set.rows[5].items[0].bytes_value == b"Pear" + assert result_set.rows[5].items[1].int32_value == 15 + assert result_set.rows[5].items[2].int32_value == 33 + assert sum(kikimr.control_plane.get_metering(1)) == 10 diff --git a/ydb/tests/tools/fq_runner/kikimr_utils.py b/ydb/tests/tools/fq_runner/kikimr_utils.py index 773f76080379..b404a3b9942f 100644 --- a/ydb/tests/tools/fq_runner/kikimr_utils.py +++ b/ydb/tests/tools/fq_runner/kikimr_utils.py @@ -50,6 +50,18 @@ def apply_to_kikimr(self, request, kikimr): del request.param["inflight"] +class AddAllowConcurrentListingsExtension(ExtensionPoint): + def is_applicable(self, request): + return (hasattr(request, 'param') + and isinstance(request.param, dict) + and "allow_concurrent_listings" in request.param) + + def apply_to_kikimr(self, request, kikimr): + kikimr.allow_concurrent_listings = request.param["allow_concurrent_listings"] + kikimr.compute_plane.fq_config['gateways']['s3']['allow_concurrent_listings'] = kikimr.allow_concurrent_listings + del request.param["allow_concurrent_listings"] + + class AddDataInflightExtension(ExtensionPoint): def is_applicable(self, request): return (hasattr(request, 'param')