From f83caa97a1ee0487b6e2173e803df792abeda4f4 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Mon, 6 Jan 2025 01:16:44 +0000 Subject: [PATCH] fix: Handle TransportError Exceptions thrown from gapic_publish --- google/cloud/pubsub_v1/publisher/_batch/thread.py | 6 +++++- tests/unit/pubsub_v1/publisher/batch/test_thread.py | 13 ++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index c4bf67c35..2afbe3761 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -24,6 +24,7 @@ from opentelemetry import trace import google.api_core.exceptions from google.api_core import gapic_v1 +from google.auth import exceptions as auth_exceptions from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher import futures @@ -342,7 +343,10 @@ def _commit(self) -> None: ) span.set_attribute(key="messaging.message.id", value=message_id) wrapper.end_create_span() - except google.api_core.exceptions.GoogleAPIError as exc: + except ( + google.api_core.exceptions.GoogleAPIError, + auth_exceptions.TransportError, + ) as exc: # We failed to publish, even after retries, so set the exception on # all futures and exit. self._status = base.BatchStatus.ERROR diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 32eaa3d98..ad8fa376b 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -31,6 +31,7 @@ import google.api_core.exceptions from google.api_core import gapic_v1 from google.auth import credentials +from google.auth import exceptions as auth_exceptions from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.publisher import exceptions @@ -329,7 +330,14 @@ def test_blocking__commit_wrong_messageid_length(): assert isinstance(future.exception(), exceptions.PublishError) -def test_block__commmit_api_error(): +@pytest.mark.parametrize( + "error", + [ + (google.api_core.exceptions.InternalServerError("Internal server error"),), + (auth_exceptions.TransportError("some transport error"),), + ], +) +def test_block__commmit_api_error(error): batch = create_batch() futures = ( batch.publish( @@ -345,7 +353,6 @@ def test_block__commmit_api_error(): ) # Make the API throw an error when publishing. - error = google.api_core.exceptions.InternalServerError("uh oh") patch = mock.patch.object(type(batch.client), "_gapic_publish", side_effect=error) with patch: @@ -353,7 +360,7 @@ def test_block__commmit_api_error(): for future in futures: assert future.done() - assert future.exception() == error + assert future.exception() == error[0] def test_block__commmit_retry_error():