diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 13e81d281f42..7ffb4a580194 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -16,6 +16,7 @@ import datetime import itertools +import operator as op import threading import time @@ -183,6 +184,176 @@ def test_subscribe_to_messages_async_callbacks( future.cancel() +def test_creating_subscriptions_with_non_default_settings( + publisher, subscriber, project, topic_path, subscription_path, cleanup +): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and a subscription, customize the latter's policy + publisher.create_topic(topic_path) + + msg_retention_duration = {"seconds": 911} + expiration_policy = {"ttl": {"seconds": 90210}} + new_subscription = subscriber.create_subscription( + subscription_path, + topic_path, + ack_deadline_seconds=30, + retain_acked_messages=True, + message_retention_duration=msg_retention_duration, + expiration_policy=expiration_policy, + ) + + # fetch the subscription and check its settings + project_path = subscriber.project_path(project) + subscriptions = subscriber.list_subscriptions(project_path) + + subscriptions = [sub for sub in subscriptions if sub.topic == topic_path] + assert len(subscriptions) == 1 + subscription = subscriptions[0] + + assert subscription == new_subscription + assert subscription.ack_deadline_seconds == 30 + assert subscription.retain_acked_messages + assert subscription.message_retention_duration.seconds == 911 + assert subscription.expiration_policy.ttl.seconds == 90210 + + +def test_listing_project_topics(publisher, project, cleanup): + topic_paths = [ + publisher.topic_path(project, "topic-{}".format(i) + unique_resource_id(".")) + for i in range(1, 4) + ] + for topic in topic_paths: + cleanup.append((publisher.delete_topic, topic)) + publisher.create_topic(topic) + + project_path = publisher.project_path(project) + project_topics = publisher.list_topics(project_path) + project_topics = set(t.name for t in project_topics) + + # there might be other topics in the project, thus do a "is subset" check + assert set(topic_paths) <= project_topics + + +def test_listing_project_subscriptions(publisher, subscriber, project, cleanup): + # create topics + topic_paths = [ + publisher.topic_path(project, "topic-1" + unique_resource_id(".")), + publisher.topic_path(project, "topic-2" + unique_resource_id(".")), + ] + for topic in topic_paths: + cleanup.append((publisher.delete_topic, topic)) + publisher.create_topic(topic) + + # create subscriptions + subscription_paths = [ + subscriber.subscription_path( + project, "sub-{}".format(i) + unique_resource_id(".") + ) + for i in range(1, 4) + ] + for i, subscription in enumerate(subscription_paths): + topic = topic_paths[i % 2] + cleanup.append((subscriber.delete_subscription, subscription)) + subscriber.create_subscription(subscription, topic) + + # retrieve subscriptions and check that the list matches the expected + project_path = subscriber.project_path(project) + subscriptions = subscriber.list_subscriptions(project_path) + subscriptions = set(s.name for s in subscriptions) + + # there might be other subscriptions in the project, thus do a "is subset" check + assert set(subscription_paths) <= subscriptions + + +def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup): + # create topics + topic_paths = [ + publisher.topic_path(project, "topic-1" + unique_resource_id(".")), + publisher.topic_path(project, "topic-2" + unique_resource_id(".")), + ] + for topic in topic_paths: + cleanup.append((publisher.delete_topic, topic)) + publisher.create_topic(topic) + + # create subscriptions + subscription_paths = [ + subscriber.subscription_path( + project, "sub-{}".format(i) + unique_resource_id(".") + ) + for i in range(1, 4) + ] + for i, subscription in enumerate(subscription_paths): + topic = topic_paths[i % 2] + cleanup.append((subscriber.delete_subscription, subscription)) + subscriber.create_subscription(subscription, topic) + + # retrieve subscriptions and check that the list matches the expected + subscriptions = publisher.list_topic_subscriptions(topic_paths[0]) + subscriptions = set(subscriptions) + + assert subscriptions == {subscription_paths[0], subscription_paths[2]} + + +def test_managing_topic_iam_policy(publisher, topic_path, cleanup): + cleanup.append((publisher.delete_topic, topic_path)) + + # create a topic and customize its policy + publisher.create_topic(topic_path) + topic_policy = publisher.get_iam_policy(topic_path) + + topic_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"]) + topic_policy.bindings.add( + role="roles/pubsub.viewer", members=["group:cloud-logs@google.com"] + ) + new_policy = publisher.set_iam_policy(topic_path, topic_policy) + + # fetch the topic policy again and check its values + topic_policy = publisher.get_iam_policy(topic_path) + assert topic_policy.bindings == new_policy.bindings + assert len(topic_policy.bindings) == 2 + + bindings = sorted(topic_policy.bindings, key=op.attrgetter("role")) + assert bindings[0].role == "roles/pubsub.editor" + assert bindings[0].members == ["domain:google.com"] + + assert bindings[1].role == "roles/pubsub.viewer" + assert bindings[1].members == ["group:cloud-logs@google.com"] + + +def test_managing_subscription_iam_policy( + publisher, subscriber, topic_path, subscription_path, cleanup +): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and a subscription, customize the latter's policy + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + sub_policy = subscriber.get_iam_policy(subscription_path) + + sub_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"]) + sub_policy.bindings.add( + role="roles/pubsub.viewer", members=["group:cloud-logs@google.com"] + ) + new_policy = subscriber.set_iam_policy(subscription_path, sub_policy) + + # fetch the subscription policy again and check its values + sub_policy = subscriber.get_iam_policy(subscription_path) + assert sub_policy.bindings == new_policy.bindings + assert len(sub_policy.bindings) == 2 + + bindings = sorted(sub_policy.bindings, key=op.attrgetter("role")) + assert bindings[0].role == "roles/pubsub.editor" + assert bindings[0].members == ["domain:google.com"] + + assert bindings[1].role == "roles/pubsub.viewer" + assert bindings[1].members == ["group:cloud-logs@google.com"] + + class TestStreamingPull(object): def test_streaming_pull_callback_error_propagation( self, publisher, topic_path, subscriber, subscription_path, cleanup