From c94189536aa1f1b711b694407df51fa90a161800 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 12 Jun 2024 20:41:48 -0400 Subject: [PATCH 01/16] Add retry instance that records throttling metric. --- sdks/python/apache_beam/io/gcp/gcsio.py | 8 +- sdks/python/apache_beam/io/gcp/gcsio_retry.py | 62 +++++++++++++++ .../apache_beam/io/gcp/gcsio_retry_test.py | 75 +++++++++++++++++++ 3 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 sdks/python/apache_beam/io/gcp/gcsio_retry.py create mode 100644 sdks/python/apache_beam/io/gcp/gcsio_retry_test.py diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index b2f8bd4a2da7a..beffb18a17179 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -138,11 +138,11 @@ class GcsIO(object): """Google Cloud Storage I/O client.""" def __init__(self, storage_client=None, pipeline_options=None): # type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None + if not pipeline_options: + pipeline_options = PipelineOptions() + elif isinstance(pipeline_options, dict): + pipeline_options = PipelineOptions.from_dictionary(pipeline_options) if storage_client is None: - if not pipeline_options: - pipeline_options = PipelineOptions() - elif isinstance(pipeline_options, dict): - pipeline_options = PipelineOptions.from_dictionary(pipeline_options) storage_client = create_storage_client(pipeline_options) self.client = storage_client self._rewrite_cb = None diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py new file mode 100644 index 0000000000000..0f004d6400d5c --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Throttling Handler for GCSIO +""" + +import logging +import math +import random +import time + +from apache_beam.metrics.metric import Metrics +from google.api_core import retry +from google.api_core import exceptions as api_exceptions +from google.cloud.storage.retry import _should_retry + +_LOGGER = logging.getLogger(__name__) + +__all__ = ['DEFAULT_RETRY_WITH_THROTTLING'] + + +class ThrottlingHandler(object): + _THROTTLED_SECS = Metrics.counter('gcsio', "cumulativeThrottlingSeconds") + + def __init__(self, max_retries=10, max_retry_wait=600): + self._max_retries = max_retries + self._max_retry_wait = max_retry_wait + self._num_retries = 0 + self._total_retry_wait = 0 + + def _get_next_wait_time(self): + wait_time = 2**self._num_retries + max_jitter = wait_time / 4.0 + wait_time += random.uniform(-max_jitter, max_jitter) + return max(1, min(wait_time, self._max_retry_wait)) + + def __call__(self, exc): + if isinstance(exc, api_exceptions.TooManyRequests): + _LOGGER.debug('Caught GCS quota error (%s), retrying.', exc.reason) + self._num_retries += 1 + sleep_seconds = self._get_next_wait_time() + ThrottlingHandler._THROTTLED_SECS.inc(math.ceil(sleep_seconds)) + time.sleep(sleep_seconds) + + +DEFAULT_RETRY_WITH_THROTTLING = retry.Retry( + predicate=_should_retry, on_error=ThrottlingHandler()) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py new file mode 100644 index 0000000000000..b9dd777ac9eab --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for Throttling Handler of GCSIO.""" + +import unittest +from unittest.mock import Mock + +from apache_beam.metrics.execution import MetricsContainer +from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.metricbase import MetricName +from apache_beam.runners.worker import statesampler +from apache_beam.utils import counters + +try: + from apache_beam.io.gcp import gcsio_retry + from google.api_core import exceptions as api_exceptions +except ImportError: + gcsio_retry = None + api_exceptions = None + + +@unittest.skipIf((gcsio_retry is None or api_exceptions is None), 'GCP dependencies are not installed') +class TestGCSIORetry(unittest.TestCase): + def test_retry_on_non_retriable(self): + mock = Mock(side_effect=[ + Exception('Something wrong!'), + ]) + retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING + with self.assertRaises(Exception): + retry(mock)() + + def test_retry_on_throttling(self): + mock = Mock( + side_effect=[api_exceptions.TooManyRequests("Slow down!"), 12345]) + retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING + + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'my_step', 'my_state', metrics_container=MetricsContainer('my_step')) + try: + sampler.start() + with state: + container = MetricsEnvironment.current_container() + + self.assertEqual( + container.get_counter( + MetricName('gcsio', + "cumulativeThrottlingSeconds")).get_cumulative(), + 0) + + self.assertEqual(12345, retry(mock)()) + + self.assertGreater( + container.get_counter( + MetricName('gcsio', + "cumulativeThrottlingSeconds")).get_cumulative(), + 1) + finally: + sampler.stop() From c65d32c04a56b438f951b5a7c40e871c9a13c859 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 12 Jun 2024 21:37:38 -0400 Subject: [PATCH 02/16] Use retry with throttling counters by default. Add pipeline option. --- sdks/python/apache_beam/io/gcp/gcsio.py | 31 +++++++++++++------ sdks/python/apache_beam/io/gcp/gcsio_retry.py | 16 ++++++++-- .../apache_beam/io/gcp/gcsio_retry_test.py | 7 +++-- .../apache_beam/options/pipeline_options.py | 6 ++++ 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index beffb18a17179..67c3a4fb9d6a2 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -43,6 +43,7 @@ from apache_beam import version as beam_version from apache_beam.internal.gcp import auth +from apache_beam.io.gcp import gcsio_retry from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils import retry @@ -147,6 +148,7 @@ def __init__(self, storage_client=None, pipeline_options=None): self.client = storage_client self._rewrite_cb = None self.bucket_to_project_number = {} + self._storage_client_retry = gcsio_retry.get_retry(pipeline_options) def get_project_number(self, bucket): if bucket not in self.bucket_to_project_number: @@ -159,7 +161,8 @@ def get_project_number(self, bucket): def get_bucket(self, bucket_name): """Returns an object bucket from its name, or None if it does not exist.""" try: - return self.client.lookup_bucket(bucket_name) + return self.client.lookup_bucket( + bucket_name, retry=self._storage_client_retry) except NotFound: return None @@ -180,7 +183,7 @@ def create_bucket( bucket_or_name=bucket, project=project, location=location, - ) + retry=self._storage_client_retry) if kms_key: bucket.default_kms_key_name(kms_key) bucket.patch() @@ -213,10 +216,11 @@ def open( if mode == 'r' or mode == 'rb': blob = bucket.blob(blob_name) - return BeamBlobReader(blob, chunk_size=read_buffer_size) + return BeamBlobReader( + blob, chunk_size=read_buffer_size, retry=self._storage_client_retry) elif mode == 'w' or mode == 'wb': blob = bucket.blob(blob_name) - return BeamBlobWriter(blob, mime_type) + return BeamBlobWriter(blob, mime_type, retry=self._storage_client_retry) else: raise ValueError('Invalid file open mode: %s.' % mode) @@ -448,7 +452,7 @@ def _gcs_object(self, path): """ bucket_name, blob_name = parse_gcs_path(path) bucket = self.client.bucket(bucket_name) - blob = bucket.get_blob(blob_name) + blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) if blob: return blob else: @@ -496,7 +500,8 @@ def list_files(self, path, with_metadata=False): else: _LOGGER.debug("Starting the size estimation of the input") bucket = self.client.bucket(bucket_name) - response = self.client.list_blobs(bucket, prefix=prefix) + response = self.client.list_blobs( + bucket, prefix=prefix, retry=self._storage_client_retry) for item in response: file_name = 'gs://%s/%s' % (item.bucket.name, item.name) if file_name not in file_info: @@ -544,18 +549,24 @@ def is_soft_delete_enabled(self, gcs_path): class BeamBlobReader(BlobReader): - def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE): - super().__init__(blob, chunk_size=chunk_size) + def __init__( + self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, retry=DEFAULT_RETRY): + super().__init__(blob, chunk_size=chunk_size, retry=retry) self.mode = "r" class BeamBlobWriter(BlobWriter): def __init__( - self, blob, content_type, chunk_size=16 * 1024 * 1024, ignore_flush=True): + self, + blob, + content_type, + chunk_size=16 * 1024 * 1024, + ignore_flush=True, + retry=DEFAULT_RETRY): super().__init__( blob, content_type=content_type, chunk_size=chunk_size, ignore_flush=ignore_flush, - retry=DEFAULT_RETRY) + retry=retry) self.mode = "w" diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py index 0f004d6400d5c..04c7741a16a50 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -27,11 +27,14 @@ from apache_beam.metrics.metric import Metrics from google.api_core import retry from google.api_core import exceptions as api_exceptions -from google.cloud.storage.retry import _should_retry +from google.cloud.storage.retry import _should_retry # pylint: disable=protected-access +from google.cloud.storage.retry import DEFAULT_RETRY + +from apache_beam.options.pipeline_options import GoogleCloudOptions _LOGGER = logging.getLogger(__name__) -__all__ = ['DEFAULT_RETRY_WITH_THROTTLING'] +__all__ = ['DEFAULT_RETRY_WITH_THROTTLING_COUNTERS'] class ThrottlingHandler(object): @@ -58,5 +61,12 @@ def __call__(self, exc): time.sleep(sleep_seconds) -DEFAULT_RETRY_WITH_THROTTLING = retry.Retry( +DEFAULT_RETRY_WITH_THROTTLING_COUNTERS = retry.Retry( predicate=_should_retry, on_error=ThrottlingHandler()) + + +def get_retry(pipeline_options): + if pipeline_options.view_as(GoogleCloudOptions).no_gcsio_throttling_counters: + return DEFAULT_RETRY + else: + return DEFAULT_RETRY_WITH_THROTTLING_COUNTERS diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py index b9dd777ac9eab..c824c9917eb46 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -34,20 +34,21 @@ api_exceptions = None -@unittest.skipIf((gcsio_retry is None or api_exceptions is None), 'GCP dependencies are not installed') +@unittest.skipIf((gcsio_retry is None or api_exceptions is None), + 'GCP dependencies are not installed') class TestGCSIORetry(unittest.TestCase): def test_retry_on_non_retriable(self): mock = Mock(side_effect=[ Exception('Something wrong!'), ]) - retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING + retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTERS with self.assertRaises(Exception): retry(mock)() def test_retry_on_throttling(self): mock = Mock( side_effect=[api_exceptions.TooManyRequests("Slow down!"), 12345]) - retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING + retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTERS sampler = statesampler.StateSampler('', counters.CounterFactory()) statesampler.set_current_tracker(sampler) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 42aee47a957e8..35ff1a3b5d101 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -900,6 +900,12 @@ def _add_argparse_args(cls, parser): 'Controls the OAuth scopes that will be requested when creating ' 'GCP credentials. Note: If set programmatically, must be set as a ' 'list of strings')) + parser.add_argument( + '--no_gcsio_throttling_counters', + default='false', + action='store_true', + help='Throttling counters in GcsIO is enabled by default. Set ' + '--no_gcsio_throttling_counters to avoid it.') def _create_default_gcs_bucket(self): try: From f6979cc8c81a24796b9271c976ee6562ef8251b4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 12 Jun 2024 22:11:03 -0400 Subject: [PATCH 03/16] Fix lint --- sdks/python/apache_beam/io/gcp/gcsio_retry.py | 6 +++--- sdks/python/apache_beam/io/gcp/gcsio_retry_test.py | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py index 04c7741a16a50..c6cacf68d6639 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -24,12 +24,12 @@ import random import time -from apache_beam.metrics.metric import Metrics -from google.api_core import retry from google.api_core import exceptions as api_exceptions -from google.cloud.storage.retry import _should_retry # pylint: disable=protected-access +from google.api_core import retry from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import _should_retry # pylint: disable=protected-access +from apache_beam.metrics.metric import Metrics from apache_beam.options.pipeline_options import GoogleCloudOptions _LOGGER = logging.getLogger(__name__) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py index c824c9917eb46..fadde0ef89a3e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -74,3 +74,7 @@ def test_retry_on_throttling(self): 1) finally: sampler.stop() + + +if __name__ == '__main__': + unittest.main() From ba5aa42ef59533373bbaaa3ec0c4d02654c2d677 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 13 Jun 2024 10:28:43 -0400 Subject: [PATCH 04/16] Fix broken tests. --- sdks/python/apache_beam/io/gcp/gcsio_test.py | 10 +++++++--- sdks/python/apache_beam/options/pipeline_options.py | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index c1356b53095a7..2db5a44db18fe 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -31,6 +31,7 @@ try: from apache_beam.io.gcp import gcsio + from apache_beam.io.gcp.gcsio_retry import DEFAULT_RETRY_WITH_THROTTLING_COUNTERS from google.cloud.exceptions import BadRequest, NotFound except ImportError: NotFound = None @@ -80,7 +81,7 @@ def get_file(self, bucket, blob): holder = folder.get_blob(blob.name) return holder - def list_blobs(self, bucket_or_path, prefix=None): + def list_blobs(self, bucket_or_path, prefix=None, **unused_kwargs): bucket = self.get_bucket(bucket_or_path.name) if not prefix: return list(bucket.blobs.values()) @@ -124,7 +125,7 @@ def copy_blob(self, blob, dest, new_name=None): dest.add_blob(new_blob) return new_blob - def get_blob(self, blob_name): + def get_blob(self, blob_name, **unused_kwargs): bucket = self._get_canonical_bucket() if blob_name in bucket.blobs: return bucket.blobs[blob_name] @@ -472,7 +473,10 @@ def test_file_buffered_read_call(self): with mock.patch('apache_beam.io.gcp.gcsio.BeamBlobReader') as reader: self.gcs.open(file_name, read_buffer_size=read_buffer_size) - reader.assert_called_with(blob, chunk_size=read_buffer_size) + reader.assert_called_with( + blob, + chunk_size=read_buffer_size, + retry=DEFAULT_RETRY_WITH_THROTTLING_COUNTERS) def test_file_write_call(self): file_name = 'gs://gcsio-test/write_file' diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 35ff1a3b5d101..7b001a1ac9afd 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -902,7 +902,7 @@ def _add_argparse_args(cls, parser): 'list of strings')) parser.add_argument( '--no_gcsio_throttling_counters', - default='false', + default=False, action='store_true', help='Throttling counters in GcsIO is enabled by default. Set ' '--no_gcsio_throttling_counters to avoid it.') From 588b5d3f8b77291f380435217019ef7fff9eea26 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Sep 2024 00:36:04 -0400 Subject: [PATCH 05/16] Retrieve a more accurate throttling time from the caller frame. --- sdks/python/apache_beam/io/gcp/gcsio_retry.py | 33 +++++++++---------- .../apache_beam/io/gcp/gcsio_retry_test.py | 4 ++- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py index c6cacf68d6639..d7f85d41390ee 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -19,10 +19,9 @@ Throttling Handler for GCSIO """ +import inspect import logging import math -import random -import time from google.api_core import exceptions as api_exceptions from google.api_core import retry @@ -40,25 +39,25 @@ class ThrottlingHandler(object): _THROTTLED_SECS = Metrics.counter('gcsio', "cumulativeThrottlingSeconds") - def __init__(self, max_retries=10, max_retry_wait=600): - self._max_retries = max_retries - self._max_retry_wait = max_retry_wait - self._num_retries = 0 - self._total_retry_wait = 0 - - def _get_next_wait_time(self): - wait_time = 2**self._num_retries - max_jitter = wait_time / 4.0 - wait_time += random.uniform(-max_jitter, max_jitter) - return max(1, min(wait_time, self._max_retry_wait)) - def __call__(self, exc): if isinstance(exc, api_exceptions.TooManyRequests): _LOGGER.debug('Caught GCS quota error (%s), retrying.', exc.reason) - self._num_retries += 1 - sleep_seconds = self._get_next_wait_time() + # TODO: revist the logic here when gcs client library supports error + # callbacks + frame = inspect.currentframe() + if frame is None: + _LOGGER.warning('cannot inspect the current stack frame') + return + + prev_frame = frame.f_back + if prev_frame is None: + _LOGGER.warning('cannot inspect the caller stack frame') + return + + # next_sleep is one of the arguments in the caller + # i.e. _retry_error_helper() in google/api_core/retry/retry_base.py + sleep_seconds = prev_frame.f_locals.get("next_sleep", 0) ThrottlingHandler._THROTTLED_SECS.inc(math.ceil(sleep_seconds)) - time.sleep(sleep_seconds) DEFAULT_RETRY_WITH_THROTTLING_COUNTERS = retry.Retry( diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py index fadde0ef89a3e..29f060090b962 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -47,7 +47,9 @@ def test_retry_on_non_retriable(self): def test_retry_on_throttling(self): mock = Mock( - side_effect=[api_exceptions.TooManyRequests("Slow down!"), 12345]) + side_effect=[api_exceptions.TooManyRequests("Slow down!"), + api_exceptions.TooManyRequests("Slow down again!"), + 12345]) retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTERS sampler = statesampler.StateSampler('', counters.CounterFactory()) From a82e9f6d9cac776a583b83a7bcb7a2c43ee975b3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Sep 2024 23:35:33 -0400 Subject: [PATCH 06/16] Apply yapf and linter --- sdks/python/apache_beam/io/gcp/gcsio.py | 6 +++--- sdks/python/apache_beam/io/gcp/gcsio_retry_test.py | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 9d7f42adc760b..2e0efcaa7808e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -227,7 +227,7 @@ def open( return BeamBlobReader( blob, chunk_size=read_buffer_size, - enable_read_bucket_metric=self.enable_read_bucket_metric, + enable_read_bucket_metric=self.enable_read_bucket_metric, retry=self._storage_client_retry) elif mode == 'w' or mode == 'wb': blob = bucket.blob(blob_name) @@ -569,7 +569,7 @@ def __init__( self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, - enable_read_bucket_metric=False, + enable_read_bucket_metric=False, retry=DEFAULT_RETRY): super().__init__(blob, chunk_size=chunk_size, retry=retry) self.enable_read_bucket_metric = enable_read_bucket_metric @@ -587,7 +587,7 @@ def read(self, size=-1): class BeamBlobWriter(BlobWriter): def __init__( - self, + self, blob, content_type, chunk_size=16 * 1024 * 1024, diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py index 29f060090b962..618361ea45723 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -47,9 +47,11 @@ def test_retry_on_non_retriable(self): def test_retry_on_throttling(self): mock = Mock( - side_effect=[api_exceptions.TooManyRequests("Slow down!"), - api_exceptions.TooManyRequests("Slow down again!"), - 12345]) + side_effect=[ + api_exceptions.TooManyRequests("Slow down!"), + api_exceptions.TooManyRequests("Slow down again!"), + 12345 + ]) retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTERS sampler = statesampler.StateSampler('', counters.CounterFactory()) From e9c246bff77d98e4473b6e57872ba16939296859 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 12:09:23 -0400 Subject: [PATCH 07/16] Refactoring copy and delete - Remove extra retries for copy, delete, _gcs_object. - Remove the use of client.batch() as the function has no built-in retry. --- sdks/python/apache_beam/io/gcp/gcsio.py | 97 +++++++------------- sdks/python/apache_beam/io/gcp/gcsio_test.py | 6 +- 2 files changed, 39 insertions(+), 64 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 2e0efcaa7808e..5f3cef4366121 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -47,7 +47,6 @@ from apache_beam.metrics.metric import Metrics from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.utils import retry from apache_beam.utils.annotations import deprecated __all__ = ['GcsIO', 'create_storage_client'] @@ -239,8 +238,6 @@ def open( else: raise ValueError('Invalid file open mode: %s.' % mode) - @retry.with_exponential_backoff( - retry_filter=retry.retry_on_server_errors_and_timeout_filter) def delete(self, path): """Deletes the object at the given GCS path. @@ -250,7 +247,11 @@ def delete(self, path): bucket_name, blob_name = parse_gcs_path(path) try: bucket = self.client.bucket(bucket_name) - bucket.delete_blob(blob_name) + blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) + generation = getattr(blob, "generation", None) + bucket.delete_blob(blob_name, + if_generation_match=generation, + retry=self._storage_client_retry) except NotFound: return @@ -267,33 +268,18 @@ def delete_batch(self, paths): succeeded or the relevant exception if the operation failed. """ final_results = [] - s = 0 - if not isinstance(paths, list): paths = list(iter(paths)) - while s < len(paths): - if (s + MAX_BATCH_OPERATION_SIZE) < len(paths): - current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE] - else: - current_paths = paths[s:] - current_batch = self.client.batch(raise_exception=False) - with current_batch: - for path in current_paths: - bucket_name, blob_name = parse_gcs_path(path) - bucket = self.client.bucket(bucket_name) - bucket.delete_blob(blob_name) - - for i, path in enumerate(current_paths): - error_code = None - resp = current_batch._responses[i] - if resp.status_code >= 400 and resp.status_code != 404: - error_code = resp.status_code - final_results.append((path, error_code)) - - s += MAX_BATCH_OPERATION_SIZE - + for path in paths: + error_code = None + try: + self.delete(path) + except Exception as e: + error_code = getattr(e, "code", None) + if error_code is None: + error_code = getattr(e, "status_code", None) + + final_results.append((path, error_code)) return final_results - @retry.with_exponential_backoff( - retry_filter=retry.retry_on_server_errors_and_timeout_filter) def copy(self, src, dest): """Copies the given GCS object from src to dest. @@ -302,16 +288,22 @@ def copy(self, src, dest): dest: GCS file path pattern in the form gs:///. Raises: - TimeoutError: on timeout. + Any exceptions during copying """ src_bucket_name, src_blob_name = parse_gcs_path(src) dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True) src_bucket = self.client.bucket(src_bucket_name) - src_blob = src_bucket.blob(src_blob_name) + src_blob = src_bucket.get_blob(src_blob_name) + if src_blob is None: + raise NotFound("source blob %s not found during copying" % src) + src_generation = getattr(src_blob, "generation", None) dest_bucket = self.client.bucket(dest_bucket_name) if not dest_blob_name: dest_blob_name = None - src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name) + src_bucket.copy_blob(src_blob, dest_bucket, + new_name=dest_blob_name, + source_generation=src_generation, + retry=self._storage_client_retry) def copy_batch(self, src_dest_pairs): """Copies the given GCS objects from src to dest. @@ -326,32 +318,16 @@ def copy_batch(self, src_dest_pairs): succeeded or the relevant exception if the operation failed. """ final_results = [] - s = 0 - while s < len(src_dest_pairs): - if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs): - current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE] - else: - current_pairs = src_dest_pairs[s:] - current_batch = self.client.batch(raise_exception=False) - with current_batch: - for pair in current_pairs: - src_bucket_name, src_blob_name = parse_gcs_path(pair[0]) - dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1]) - src_bucket = self.client.bucket(src_bucket_name) - src_blob = src_bucket.blob(src_blob_name) - dest_bucket = self.client.bucket(dest_bucket_name) - - src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name) - - for i, pair in enumerate(current_pairs): - error_code = None - resp = current_batch._responses[i] - if resp.status_code >= 400: - error_code = resp.status_code - final_results.append((pair[0], pair[1], error_code)) - - s += MAX_BATCH_OPERATION_SIZE - + for src, dest in src_dest_pairs: + error_code = None + try: + self.copy(src, dest) + except Exception as e: + error_code = getattr(e, "code", None) + if error_code is None: + error_code = getattr(e, "status_code", None) + + final_results.append((src, dest, error_code)) return final_results # We intentionally do not decorate this method with a retry, since the @@ -455,8 +431,6 @@ def _status(self, path): file_status['size'] = gcs_object.size return file_status - @retry.with_exponential_backoff( - retry_filter=retry.retry_on_server_errors_and_timeout_filter) def _gcs_object(self, path): """Returns a gcs object for the given path @@ -552,8 +526,7 @@ def _updated_to_seconds(updated): def is_soft_delete_enabled(self, gcs_path): try: bucket_name, _ = parse_gcs_path(gcs_path) - # set retry timeout to 5 seconds when checking soft delete policy - bucket = self.get_bucket(bucket_name, retry=DEFAULT_RETRY.with_timeout(5)) + bucket = self.get_bucket(bucket_name) if (bucket.soft_delete_policy is not None and bucket.soft_delete_policy.retention_duration_seconds > 0): return True diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 2a06d168dd4ba..116091e725a93 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -20,6 +20,7 @@ import logging import os +import random import unittest from datetime import datetime @@ -121,7 +122,7 @@ def add_blob(self, blob): def blob(self, name): return self._create_blob(name) - def copy_blob(self, blob, dest, new_name=None): + def copy_blob(self, blob, dest, new_name=None, **kwargs): if self.get_blob(blob.name) is None: raise NotFound("source blob not found") if not new_name: @@ -147,7 +148,7 @@ def lookup_blob(self, name): def set_default_kms_key_name(self, name): self.default_kms_key_name = name - def delete_blob(self, name): + def delete_blob(self, name, **kwargs): bucket = self._get_canonical_bucket() if name in bucket.blobs: del bucket.blobs[name] @@ -176,6 +177,7 @@ def __init__( self.updated = updated self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading + self.generation = random.randint(0, (1 << 63) - 1)) def delete(self): self.bucket.delete_blob(self.name) From 5ab098dc4f33f300b48dfb4af79fdd67a37f5570 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 13:28:01 -0400 Subject: [PATCH 08/16] Fix a typo and apply yapf --- sdks/python/apache_beam/io/gcp/gcsio.py | 7 +++++-- sdks/python/apache_beam/io/gcp/gcsio_test.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 5f3cef4366121..419b1faf7e731 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -249,7 +249,8 @@ def delete(self, path): bucket = self.client.bucket(bucket_name) blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) generation = getattr(blob, "generation", None) - bucket.delete_blob(blob_name, + bucket.delete_blob( + blob_name, if_generation_match=generation, retry=self._storage_client_retry) except NotFound: @@ -300,7 +301,9 @@ def copy(self, src, dest): dest_bucket = self.client.bucket(dest_bucket_name) if not dest_blob_name: dest_blob_name = None - src_bucket.copy_blob(src_blob, dest_bucket, + src_bucket.copy_blob( + src_blob, + dest_bucket, new_name=dest_blob_name, source_generation=src_generation, retry=self._storage_client_retry) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 116091e725a93..d42fd7d1f2a9b 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -177,7 +177,7 @@ def __init__( self.updated = updated self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading - self.generation = random.randint(0, (1 << 63) - 1)) + self.generation = random.randint(0, (1 << 63) - 1) def delete(self): self.bucket.delete_blob(self.name) From f82230d3b52f3bc2b8bbbd92d5fe33d3aa1daf0c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 19:44:58 -0400 Subject: [PATCH 09/16] Use counter instead of counters in pipeline option. Additionally, the variable name for the new retry object is changed. Add a new pipeline option to enable the use of blob generation to mitigate race conditions (at the expense of more http requests) --- sdks/python/apache_beam/io/gcp/gcsio.py | 21 +++++++++++++------ sdks/python/apache_beam/io/gcp/gcsio_retry.py | 8 +++---- sdks/python/apache_beam/io/gcp/gcsio_test.py | 4 ++-- .../apache_beam/options/pipeline_options.py | 12 ++++++++--- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 419b1faf7e731..aa62335f2396c 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -156,6 +156,8 @@ def __init__(self, storage_client=None, pipeline_options=None): self._rewrite_cb = None self.bucket_to_project_number = {} self._storage_client_retry = gcsio_retry.get_retry(pipeline_options) + self._use_blob_generation = getattr( + google_cloud_options, 'enable_gcsio_blob_generation', False) def get_project_number(self, bucket): if bucket not in self.bucket_to_project_number: @@ -247,8 +249,11 @@ def delete(self, path): bucket_name, blob_name = parse_gcs_path(path) try: bucket = self.client.bucket(bucket_name) - blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) - generation = getattr(blob, "generation", None) + if self._use_blob_generation: + blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) + generation = getattr(blob, "generation", None) + else: + generation = None bucket.delete_blob( blob_name, if_generation_match=generation, @@ -294,10 +299,14 @@ def copy(self, src, dest): src_bucket_name, src_blob_name = parse_gcs_path(src) dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True) src_bucket = self.client.bucket(src_bucket_name) - src_blob = src_bucket.get_blob(src_blob_name) - if src_blob is None: - raise NotFound("source blob %s not found during copying" % src) - src_generation = getattr(src_blob, "generation", None) + if self._use_blob_generation: + src_blob = src_bucket.get_blob(src_blob_name) + if src_blob is None: + raise NotFound("source blob %s not found during copying" % src) + src_generation = getattr(src_blob, "generation", None) + else: + src_blob = src_bucket.blob(src_blob_name) + src_generation = None dest_bucket = self.client.bucket(dest_bucket_name) if not dest_blob_name: dest_blob_name = None diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py index d7f85d41390ee..87be6157763e5 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -33,7 +33,7 @@ _LOGGER = logging.getLogger(__name__) -__all__ = ['DEFAULT_RETRY_WITH_THROTTLING_COUNTERS'] +__all__ = ['DEFAULT_RETRY_WITH_THROTTLING_COUNTER'] class ThrottlingHandler(object): @@ -60,12 +60,12 @@ def __call__(self, exc): ThrottlingHandler._THROTTLED_SECS.inc(math.ceil(sleep_seconds)) -DEFAULT_RETRY_WITH_THROTTLING_COUNTERS = retry.Retry( +DEFAULT_RETRY_WITH_THROTTLING_COUNTER = retry.Retry( predicate=_should_retry, on_error=ThrottlingHandler()) def get_retry(pipeline_options): - if pipeline_options.view_as(GoogleCloudOptions).no_gcsio_throttling_counters: + if pipeline_options.view_as(GoogleCloudOptions).no_gcsio_throttling_counter: return DEFAULT_RETRY else: - return DEFAULT_RETRY_WITH_THROTTLING_COUNTERS + return DEFAULT_RETRY_WITH_THROTTLING_COUNTER diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index d42fd7d1f2a9b..19df15dcf7fab 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -37,7 +37,7 @@ try: from apache_beam.io.gcp import gcsio - from apache_beam.io.gcp.gcsio_retry import DEFAULT_RETRY_WITH_THROTTLING_COUNTERS + from apache_beam.io.gcp.gcsio_retry import DEFAULT_RETRY_WITH_THROTTLING_COUNTER from google.cloud.exceptions import BadRequest, NotFound except ImportError: NotFound = None @@ -538,7 +538,7 @@ def test_file_buffered_read_call(self): blob, chunk_size=read_buffer_size, enable_read_bucket_metric=False, - retry=DEFAULT_RETRY_WITH_THROTTLING_COUNTERS) + retry=DEFAULT_RETRY_WITH_THROTTLING_COUNTER) def test_file_write_call(self): file_name = 'gs://gcsio-test/write_file' diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index cbfde02393d6f..69f65307937c7 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -940,11 +940,17 @@ def _add_argparse_args(cls, parser): 'Create metrics reporting the approximate number of bytes written per ' 'bucket.') parser.add_argument( - '--no_gcsio_throttling_counters', + '--no_gcsio_throttling_counter', default=False, action='store_true', - help='Throttling counters in GcsIO is enabled by default. Set ' - '--no_gcsio_throttling_counters to avoid it.') + help='Throttling counter in GcsIO is enabled by default. Set ' + '--no_gcsio_throttling_counter to avoid it.') + parser.add_argument( + '--enable_gcsio_blob_generation', + default=False, + action='store_true', + help='Use blob generation when mutating blobs in GCSIO to ' + 'overcome race conditions.') def _create_default_gcs_bucket(self): try: From cfe3ec7559336e40ad21d696e30722363963254a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 22:17:42 -0400 Subject: [PATCH 10/16] Parameterize existing tests for the new pipeline options. --- .../io/gcp/gcsio_integration_test.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index fad638136804e..4d0cf6e6c8d23 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -33,6 +33,7 @@ import uuid import mock +from parameterized import parameterized_class import pytest from apache_beam.io.filesystems import FileSystems @@ -51,6 +52,8 @@ @unittest.skipIf(gcsio is None, 'GCP dependencies are not installed') +@parameterized_class(('no_gcsio_throttling_counter', 'enable_gcsio_blob_generation'), + [(False, False), (False, True), (True, False), (True, True)]) class GcsIOIntegrationTest(unittest.TestCase): INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt' @@ -92,14 +95,41 @@ def _verify_copy(self, src, dest, dest_kms_key_name=None): @pytest.mark.it_postcommit def test_copy(self): + self.gcsio = gcsio.GcsIO(pipeline_options={ + "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, + "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation}) src = self.INPUT_FILE dest = self.gcs_tempdir + '/test_copy' self.gcsio.copy(src, dest) self._verify_copy(src, dest) + unknown_src = self.test_pipeline.get_option('temp_location') + \ + '/gcs_it-' + str(uuid.uuid4()) + with self.assertRaises(NotFound): + self.gcsio.copy(unknown_src, dest) + + @pytest.mark.it_postcommit + def test_copy_and_delete(self): + self.gcsio = gcsio.GcsIO(pipeline_options={ + "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, + "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation}) + src = self.INPUT_FILE + dest = self.gcs_tempdir + '/test_copy' + + self.gcsio.copy(src, dest) + self._verify_copy(src, dest) + + self.gcsio.delete(dest) + + # no exception if we delete an nonexistent file. + self.gcsio.delete(dest) + @pytest.mark.it_postcommit def test_batch_copy_and_delete(self): + self.gcsio = gcsio.GcsIO(pipeline_options={ + "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, + "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation}) num_copies = 10 srcs = [self.INPUT_FILE] * num_copies dests = [ From 3f01a7647e77ad87156fbb2fc42d699949abc1aa Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 22:22:41 -0400 Subject: [PATCH 11/16] Apply yapf --- .../io/gcp/gcsio_integration_test.py | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 4d0cf6e6c8d23..1bddfa497a2e9 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -52,8 +52,9 @@ @unittest.skipIf(gcsio is None, 'GCP dependencies are not installed') -@parameterized_class(('no_gcsio_throttling_counter', 'enable_gcsio_blob_generation'), - [(False, False), (False, True), (True, False), (True, True)]) +@parameterized_class( + ('no_gcsio_throttling_counter', 'enable_gcsio_blob_generation'), + [(False, False), (False, True), (True, False), (True, True)]) class GcsIOIntegrationTest(unittest.TestCase): INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt' @@ -70,7 +71,6 @@ def setUp(self): self.gcs_tempdir = ( self.test_pipeline.get_option('temp_location') + '/gcs_it-' + str(uuid.uuid4())) - self.gcsio = gcsio.GcsIO() def tearDown(self): FileSystems.delete([self.gcs_tempdir + '/']) @@ -95,9 +95,11 @@ def _verify_copy(self, src, dest, dest_kms_key_name=None): @pytest.mark.it_postcommit def test_copy(self): - self.gcsio = gcsio.GcsIO(pipeline_options={ - "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, - "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation}) + self.gcsio = gcsio.GcsIO( + pipeline_options={ + "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, + "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation + }) src = self.INPUT_FILE dest = self.gcs_tempdir + '/test_copy' @@ -111,9 +113,11 @@ def test_copy(self): @pytest.mark.it_postcommit def test_copy_and_delete(self): - self.gcsio = gcsio.GcsIO(pipeline_options={ - "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, - "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation}) + self.gcsio = gcsio.GcsIO( + pipeline_options={ + "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, + "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation + }) src = self.INPUT_FILE dest = self.gcs_tempdir + '/test_copy' @@ -127,9 +131,11 @@ def test_copy_and_delete(self): @pytest.mark.it_postcommit def test_batch_copy_and_delete(self): - self.gcsio = gcsio.GcsIO(pipeline_options={ - "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, - "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation}) + self.gcsio = gcsio.GcsIO( + pipeline_options={ + "no_gcsio_throttling_counter": self.no_gcsio_throttling_counter, + "enable_gcsio_blob_generation": self.enable_gcsio_blob_generation + }) num_copies = 10 srcs = [self.INPUT_FILE] * num_copies dests = [ @@ -182,6 +188,7 @@ def test_batch_copy_and_delete(self): @mock.patch('apache_beam.io.gcp.gcsio.default_gcs_bucket_name') @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') def test_create_default_bucket(self, mock_default_gcs_bucket_name): + self.gcsio = gcsio.GcsIO() google_cloud_options = self.test_pipeline.options.view_as( GoogleCloudOptions) # overwrite kms option here, because get_or_create_default_gcs_bucket() From 572527dd3f73400a924ca2e31e156523a0f84734 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 22:55:28 -0400 Subject: [PATCH 12/16] Fix a typo. --- sdks/python/apache_beam/io/gcp/gcsio_retry_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py index 618361ea45723..750879ae0284c 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -41,7 +41,7 @@ def test_retry_on_non_retriable(self): mock = Mock(side_effect=[ Exception('Something wrong!'), ]) - retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTERS + retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTER with self.assertRaises(Exception): retry(mock)() @@ -52,7 +52,7 @@ def test_retry_on_throttling(self): api_exceptions.TooManyRequests("Slow down again!"), 12345 ]) - retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTERS + retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING_COUNTER sampler = statesampler.StateSampler('', counters.CounterFactory()) statesampler.set_current_tracker(sampler) From 4ff13377591d9be92a6ffcbd1c00525bf31c9312 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 23:05:42 -0400 Subject: [PATCH 13/16] Revert the change of copy_batch and delete_batch and add warning in their docstring. --- sdks/python/apache_beam/io/gcp/gcsio.py | 71 ++++++++++++++++++------- 1 file changed, 51 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index aa62335f2396c..a052f3480f72c 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -263,6 +263,7 @@ def delete(self, path): def delete_batch(self, paths): """Deletes the objects at the given GCS paths. + Warning: any exception during batch delete will NOT be retried. Args: paths: List of GCS file path patterns or Dict with GCS file path patterns @@ -274,16 +275,29 @@ def delete_batch(self, paths): succeeded or the relevant exception if the operation failed. """ final_results = [] - for path in paths: - error_code = None - try: - self.delete(path) - except Exception as e: - error_code = getattr(e, "code", None) - if error_code is None: - error_code = getattr(e, "status_code", None) - - final_results.append((path, error_code)) + s = 0 + if not isinstance(paths, list): paths = list(iter(paths)) + while s < len(paths): + if (s + MAX_BATCH_OPERATION_SIZE) < len(paths): + current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE] + else: + current_paths = paths[s:] + current_batch = self.client.batch(raise_exception=False) + with current_batch: + for path in current_paths: + bucket_name, blob_name = parse_gcs_path(path) + bucket = self.client.bucket(bucket_name) + bucket.delete_blob(blob_name) + + for i, path in enumerate(current_paths): + error_code = None + resp = current_batch._responses[i] + if resp.status_code >= 400 and resp.status_code != 404: + error_code = resp.status_code + final_results.append((path, error_code)) + + s += MAX_BATCH_OPERATION_SIZE + return final_results def copy(self, src, dest): @@ -319,6 +333,7 @@ def copy(self, src, dest): def copy_batch(self, src_dest_pairs): """Copies the given GCS objects from src to dest. + Warning: any exception during batch copy will NOT be retried. Args: src_dest_pairs: list of (src, dest) tuples of gs:/// files @@ -330,16 +345,32 @@ def copy_batch(self, src_dest_pairs): succeeded or the relevant exception if the operation failed. """ final_results = [] - for src, dest in src_dest_pairs: - error_code = None - try: - self.copy(src, dest) - except Exception as e: - error_code = getattr(e, "code", None) - if error_code is None: - error_code = getattr(e, "status_code", None) - - final_results.append((src, dest, error_code)) + s = 0 + while s < len(src_dest_pairs): + if (s + MAX_BATCH_OPERATION_SIZE) < len(src_dest_pairs): + current_pairs = src_dest_pairs[s:s + MAX_BATCH_OPERATION_SIZE] + else: + current_pairs = src_dest_pairs[s:] + current_batch = self.client.batch(raise_exception=False) + with current_batch: + for pair in current_pairs: + src_bucket_name, src_blob_name = parse_gcs_path(pair[0]) + dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1]) + src_bucket = self.client.bucket(src_bucket_name) + src_blob = src_bucket.blob(src_blob_name) + dest_bucket = self.client.bucket(dest_bucket_name) + + src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name) + + for i, pair in enumerate(current_pairs): + error_code = None + resp = current_batch._responses[i] + if resp.status_code >= 400: + error_code = resp.status_code + final_results.append((pair[0], pair[1], error_code)) + + s += MAX_BATCH_OPERATION_SIZE + return final_results # We intentionally do not decorate this method with a retry, since the From 9d61c282eab1ae9eafde3c4ef3a25e938b64cf62 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 16 Sep 2024 23:39:09 -0400 Subject: [PATCH 14/16] Fix lint --- sdks/python/apache_beam/io/gcp/gcsio_integration_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 1bddfa497a2e9..07a5fb5df5535 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -33,8 +33,8 @@ import uuid import mock -from parameterized import parameterized_class import pytest +from parameterized import parameterized_class from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import GoogleCloudOptions From cebdacfd9cd6670d91e188480071c90684b57c86 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Sep 2024 14:10:40 -0400 Subject: [PATCH 15/16] Minor change according to code review. --- sdks/python/apache_beam/io/gcp/gcsio.py | 15 ++++++++------- sdks/python/apache_beam/io/gcp/gcsio_retry.py | 2 +- .../apache_beam/options/pipeline_options.py | 2 +- sdks/python/tox.ini | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index a052f3480f72c..22a33fa13c63b 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -247,13 +247,14 @@ def delete(self, path): path: GCS file path pattern in the form gs:///. """ bucket_name, blob_name = parse_gcs_path(path) + bucket = self.client.bucket(bucket_name) + if self._use_blob_generation: + # blob can be None if not found + blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) + generation = getattr(blob, "generation", None) + else: + generation = None try: - bucket = self.client.bucket(bucket_name) - if self._use_blob_generation: - blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) - generation = getattr(blob, "generation", None) - else: - generation = None bucket.delete_blob( blob_name, if_generation_match=generation, @@ -317,7 +318,7 @@ def copy(self, src, dest): src_blob = src_bucket.get_blob(src_blob_name) if src_blob is None: raise NotFound("source blob %s not found during copying" % src) - src_generation = getattr(src_blob, "generation", None) + src_generation = src_blob.generation else: src_blob = src_bucket.blob(src_blob_name) src_generation = None diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py index 87be6157763e5..29fd71c5195b4 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -42,7 +42,7 @@ class ThrottlingHandler(object): def __call__(self, exc): if isinstance(exc, api_exceptions.TooManyRequests): _LOGGER.debug('Caught GCS quota error (%s), retrying.', exc.reason) - # TODO: revist the logic here when gcs client library supports error + # TODO: revisit the logic here when gcs client library supports error # callbacks frame = inspect.currentframe() if frame is None: diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 69f65307937c7..0c5a2f961a468 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -950,7 +950,7 @@ def _add_argparse_args(cls, parser): default=False, action='store_true', help='Use blob generation when mutating blobs in GCSIO to ' - 'overcome race conditions.') + 'mitigate race conditions at the cost of more HTTP requests.') def _create_default_gcs_bucket(self): try: diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index d733fd17fb6bc..9fd3098d39152 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -60,7 +60,7 @@ list_dependencies_command = {envbindir}/python {envbindir}/pip freeze commands_pre = python --version pip --version - pip check + # pip check bash {toxinidir}/scripts/run_tox_cleanup.sh commands_post = bash {toxinidir}/scripts/run_tox_cleanup.sh From 2316d5984673eb79ce62deb595fb092424424231 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 17 Sep 2024 22:46:49 -0400 Subject: [PATCH 16/16] Restore the previous tox.ini that got accidentally changed. --- sdks/python/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 9fd3098d39152..d733fd17fb6bc 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -60,7 +60,7 @@ list_dependencies_command = {envbindir}/python {envbindir}/pip freeze commands_pre = python --version pip --version - # pip check + pip check bash {toxinidir}/scripts/run_tox_cleanup.sh commands_post = bash {toxinidir}/scripts/run_tox_cleanup.sh