Skip to content

Commit

Permalink
Merge pull request #3307 from geigerj/subscription-fields
Browse files Browse the repository at this point in the history
Add new subscription fields
  • Loading branch information
geigerj authored Apr 18, 2017
2 parents 7100b84 + 7c90041 commit 331e616
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 35 deletions.
13 changes: 3 additions & 10 deletions core/google/cloud/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,9 @@ def _timedelta_to_duration_pb(timedelta_val):
:rtype: :class:`google.protobuf.duration_pb2.Duration`
:returns: A duration object equivalent to the time delta.
"""
seconds_decimal = timedelta_val.total_seconds()
# Truncate the parts other than the integer.
seconds = int(seconds_decimal)
if seconds_decimal < 0:
signed_micros = timedelta_val.microseconds - 10**6
else:
signed_micros = timedelta_val.microseconds
# Convert nanoseconds to microseconds.
nanos = 1000 * signed_micros
return duration_pb2.Duration(seconds=seconds, nanos=nanos)
duration_pb = duration_pb2.Duration()
duration_pb.FromTimedelta(timedelta_val)
return duration_pb


def _duration_pb_to_timedelta(duration_pb):
Expand Down
26 changes: 22 additions & 4 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from google.cloud._helpers import _to_bytes
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud._helpers import make_secure_channel
from google.cloud._http import DEFAULT_USER_AGENT
from google.cloud.exceptions import Conflict
Expand Down Expand Up @@ -276,7 +277,9 @@ def list_subscriptions(self, project, page_size=0, page_token=None):
return GAXIterator(self._client, page_iter, item_to_value)

def subscription_create(self, subscription_path, topic_path,
ack_deadline=None, push_endpoint=None):
ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""API call: create a subscription
See:
Expand All @@ -302,6 +305,18 @@ def subscription_create(self, subscription_path, topic_path,
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.
:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.
:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.
:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
Expand All @@ -310,13 +325,16 @@ def subscription_create(self, subscription_path, topic_path,
else:
push_config = None

if ack_deadline is None:
ack_deadline = 0
if message_retention_duration is not None:
message_retention_duration = _timedelta_to_duration_pb(
message_retention_duration)

try:
sub_pb = self._gax_api.create_subscription(
subscription_path, topic_path,
push_config=push_config, ack_deadline_seconds=ack_deadline)
push_config=push_config, ack_deadline_seconds=ack_deadline,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
Expand Down
27 changes: 26 additions & 1 deletion pubsub/google/cloud/pubsub/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os

from google.cloud import _http
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud.environment_vars import PUBSUB_EMULATOR
from google.cloud.iterator import HTTPIterator

Expand Down Expand Up @@ -295,7 +296,9 @@ def list_subscriptions(self, project, page_size=None, page_token=None):
extra_params=extra_params)

def subscription_create(self, subscription_path, topic_path,
ack_deadline=None, push_endpoint=None):
ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""API call: create a subscription
See:
Expand All @@ -321,6 +324,18 @@ def subscription_create(self, subscription_path, topic_path,
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.
:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.
:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.
:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
Expand All @@ -333,6 +348,16 @@ def subscription_create(self, subscription_path, topic_path,
if push_endpoint is not None:
resource['pushConfig'] = {'pushEndpoint': push_endpoint}

if retain_acked_messages is not None:
resource['retainAckedMessages'] = retain_acked_messages

if message_retention_duration is not None:
pb = _timedelta_to_duration_pb(message_retention_duration)
resource['messageRetentionDuration'] = {
'seconds': pb.seconds,
'nanos': pb.nanos
}

return self.api_request(method='PUT', path=path, data=resource)

def subscription_get(self, subscription_path):
Expand Down
39 changes: 35 additions & 4 deletions pubsub/google/cloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Define API Subscriptions."""

import datetime

from google.cloud.exceptions import NotFound
from google.cloud.pubsub._helpers import topic_name_from_path
from google.cloud.pubsub.iam import Policy
Expand Down Expand Up @@ -43,6 +45,19 @@ class Subscription(object):
(Optional) URL to which messages will be pushed by the back-end. If
not set, the application must pull messages.
:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.
:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.
:type client: :class:`~google.cloud.pubsub.client.Client`
:param client:
(Optional) The client to use. If not passed, falls back to the
Expand All @@ -57,6 +72,7 @@ class Subscription(object):
"""

def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None,
retain_acked_messages=None, message_retention_duration=None,
client=None):

if client is None and topic is None:
Expand All @@ -71,6 +87,8 @@ def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None,
self._project = self._client.project
self.ack_deadline = ack_deadline
self.push_endpoint = push_endpoint
self.retain_acked_messages = retain_acked_messages
self.message_retention_duration = message_retention_duration

@classmethod
def from_api_repr(cls, resource, client, topics=None):
Expand Down Expand Up @@ -107,10 +125,21 @@ def from_api_repr(cls, resource, client, topics=None):
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
push_endpoint = push_config.get('pushEndpoint')
retain_acked_messages = resource.get('retainAckedMessages')
resource_duration = resource.get('duration', {})
message_retention_duration = datetime.timedelta(
seconds=resource_duration.get('seconds', 0),
microseconds=resource_duration.get('nanos', 0) / 1000)
if topic is None:
return cls(name, ack_deadline=ack_deadline,
push_endpoint=push_endpoint, client=client)
return cls(name, topic, ack_deadline, push_endpoint)
push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration,
client=client)
return cls(name, topic=topic, ack_deadline=ack_deadline,
push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)

@property
def project(self):
Expand Down Expand Up @@ -182,8 +211,10 @@ def create(self, client=None):
client = self._require_client(client)
api = client.subscriber_api
api.subscription_create(
self.full_name, self.topic.full_name, self.ack_deadline,
self.push_endpoint)
self.full_name, self.topic.full_name,
ack_deadline=self.ack_deadline, push_endpoint=self.push_endpoint,
retain_acked_messages=self.retain_acked_messages,
message_retention_duration=self.message_retention_duration)

def exists(self, client=None):
"""API call: test existence of the subscription via a GET request
Expand Down
22 changes: 19 additions & 3 deletions pubsub/google/cloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def __init__(self, name, client, timestamp_messages=False):
self._client = client
self.timestamp_messages = timestamp_messages

def subscription(self, name, ack_deadline=None, push_endpoint=None):
def subscription(self, name, ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""Creates a subscription bound to the current topic.
Example: pull-mode subcription, default paramter values
Expand Down Expand Up @@ -85,11 +87,25 @@ def subscription(self, name, ack_deadline=None, push_endpoint=None):
back-end. If not set, the application must pull
messages.
:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.
:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.
:rtype: :class:`Subscription`
:returns: The subscription created with the passed in arguments.
"""
return Subscription(name, self, ack_deadline=ack_deadline,
push_endpoint=push_endpoint)
return Subscription(
name, self, ack_deadline=ack_deadline, push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)

@classmethod
def from_api_repr(cls, resource, client):
Expand Down
24 changes: 24 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
import unittest

Expand Down Expand Up @@ -155,6 +156,26 @@ def test_create_subscription_w_ack_deadline(self):
self.assertEqual(subscription.ack_deadline, 120)
self.assertIs(subscription.topic, topic)

def test_create_subscription_w_message_retention(self):
TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-')
topic = Config.CLIENT.topic(TOPIC_NAME)
self.assertFalse(topic.exists())
topic.create()
self.to_delete.append(topic)
SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id()
duration = datetime.timedelta(hours=12)
subscription = topic.subscription(
SUBSCRIPTION_NAME, retain_acked_messages=True,
message_retention_duration=duration)
self.assertFalse(subscription.exists())
subscription.create()
self.to_delete.append(subscription)
self.assertTrue(subscription.exists())
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
self.assertTrue(subscription.retain_acked_messages)
self.assertEqual(subscription.message_retention_duration, duration)
self.assertIs(subscription.topic, topic)

def test_list_subscriptions(self):
TOPIC_NAME = 'list-sub' + unique_resource_id('-')
topic = Config.CLIENT.topic(TOPIC_NAME)
Expand Down Expand Up @@ -287,3 +308,6 @@ def test_subscription_iam_policy(self):
policy.viewers = viewers
new_policy = subscription.set_iam_policy(policy)
self.assertEqual(new_policy.viewers, policy.viewers)

# TODO(geigerj): set retain_acked_messages=True in snapshot system test once
# PR #3303 is merged
Loading

0 comments on commit 331e616

Please sign in to comment.