From 04bfec63a8ebb3419387b1090ff5faf62502141e Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 31 May 2019 14:24:15 -0700 Subject: [PATCH 1/3] separate publish futures from streaming pull futures documentation --- pubsub/google/cloud/pubsub_v1/futures.py | 23 ++++++------ .../cloud/pubsub_v1/publisher/futures.py | 35 +++++++++++++------ .../cloud/pubsub_v1/subscriber/futures.py | 5 ++- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/futures.py b/pubsub/google/cloud/pubsub_v1/futures.py index 39688f291dbf..34cf2edf1b1e 100644 --- a/pubsub/google/cloud/pubsub_v1/futures.py +++ b/pubsub/google/cloud/pubsub_v1/futures.py @@ -87,18 +87,12 @@ def done(self): return self._exception != self._SENTINEL or self._result != self._SENTINEL def result(self, timeout=None): - """Return the message ID, or raise an exception. - - This blocks until the message has successfully been published, and - returns the message ID. + """Resolve the future and return a value where appropriate. Args: timeout (Union[int, float]): The number of seconds before this call times out and raises TimeoutError. - Returns: - str: The message ID. - Raises: ~.pubsub_v1.TimeoutError: If the request times out. Exception: For undefined exceptions in the underlying @@ -115,9 +109,6 @@ def result(self, timeout=None): def exception(self, timeout=None): """Return the exception raised by the call, if any. - This blocks until the message has successfully been published, and - returns the exception. If the call succeeded, return None. - Args: timeout (Union[int, float]): The number of seconds before this call times out and raises TimeoutError. @@ -139,15 +130,21 @@ def exception(self, timeout=None): # Okay, this batch had an error; this should return it. return self._exception - def add_done_callback(self, fn): + def add_done_callback(self, callback): """Attach the provided callable to the future. The provided function is called, with this future as its only argument, when the future finishes running. + + Args: + callback (Callable): The function to call. + + Returns: + None """ if self.done(): - return fn(self) - self._callbacks.append(fn) + return callback(self) + self._callbacks.append(callback) def set_result(self, result): """Set the result of the future to the provided result. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/futures.py b/pubsub/google/cloud/pubsub_v1/publisher/futures.py index a47f50e00a0d..894eff41f16b 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/futures.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/futures.py @@ -36,13 +36,28 @@ class Future(futures.Future): :class:`threading.Event` will be created and used. """ - # The publishing-side subclass does not need any special behavior - # at this time. - # - # However, there is still a subclass so that if someone attempts - # isinstance checks against a publisher-returned or subscriber-returned - # future, trying either one against the other returns False. - pass - - -__all__ = ("Future",) + def result(self, timeout=None): + """Return the message ID or raise an exception. + + This blocks until the message has been published successfully and + returns the message ID unless an exception is raised. + + Args: + timeout (Union[int, float]): The number of seconds before this call + times out and raises TimeoutError. + + Returns: + str: The message ID. + + Raises: + ~.pubsub_v1.TimeoutError: If the request times out. + Exception: For undefined exceptions in the underlying + call execution. + """ + # Attempt to get the exception if there is one. + # If there is not one, then we know everything worked, and we can + # return an appropriate value. + err = self.exception(timeout=timeout) + if err is None: + return self._result + raise err diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py index f3c06416083b..12504c18b5df 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py @@ -46,5 +46,8 @@ def cancel(self): return self._manager.close() def cancelled(self): - """bool: True if the subscription has been cancelled.""" + """ + returns: + bool: ``True`` if the subscription has been cancelled. + """ return self._cancelled From 5985c2144c62f7b98159cbd27f6c8b36a6ccede0 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 31 May 2019 15:13:08 -0700 Subject: [PATCH 2/3] remove trailing whitespaces --- pubsub/google/cloud/pubsub_v1/futures.py | 2 +- pubsub/google/cloud/pubsub_v1/publisher/futures.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/futures.py b/pubsub/google/cloud/pubsub_v1/futures.py index 34cf2edf1b1e..21d5d810199f 100644 --- a/pubsub/google/cloud/pubsub_v1/futures.py +++ b/pubsub/google/cloud/pubsub_v1/futures.py @@ -137,7 +137,7 @@ def add_done_callback(self, callback): when the future finishes running. Args: - callback (Callable): The function to call. + callback (Callable): The function to call. Returns: None diff --git a/pubsub/google/cloud/pubsub_v1/publisher/futures.py b/pubsub/google/cloud/pubsub_v1/publisher/futures.py index 894eff41f16b..8fec17d2d64f 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/futures.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/futures.py @@ -40,14 +40,14 @@ def result(self, timeout=None): """Return the message ID or raise an exception. This blocks until the message has been published successfully and - returns the message ID unless an exception is raised. + returns the message ID unless an exception is raised. Args: timeout (Union[int, float]): The number of seconds before this call times out and raises TimeoutError. - Returns: - str: The message ID. + Returns: + str: The message ID. Raises: ~.pubsub_v1.TimeoutError: If the request times out. From edf8b8de1b918ce2a08bdff46864b6606af8df47 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 3 Jun 2019 15:49:38 -0700 Subject: [PATCH 3/3] Add test --- .../publisher/test_futures_publisher.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 pubsub/tests/unit/pubsub_v1/publisher/test_futures_publisher.py diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_futures_publisher.py b/pubsub/tests/unit/pubsub_v1/publisher/test_futures_publisher.py new file mode 100644 index 000000000000..eb32d05185b6 --- /dev/null +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_futures_publisher.py @@ -0,0 +1,32 @@ +# Copyright 2019, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import pytest + +from google.cloud.pubsub_v1.publisher import futures + + +class TestFuture(object): + def test_result_on_success(self): + future = futures.Future() + future.set_result("570307942214048") + assert future.result() == "570307942214048" + + def test_result_on_failure(self): + future = futures.Future() + future.set_exception(RuntimeError("Something bad happened.")) + with pytest.raises(RuntimeError): + future.result()