Skip to content

Commit

Permalink
chore(pubsub): add subscriber role test for streaming (#9507)
Browse files Browse the repository at this point in the history
Pulling the messages using a streaming pull should work with accounts
having only the pubsub.subscriber role. This commits add a test that
covers this aspect.
  • Loading branch information
plamut authored Oct 31, 2019
1 parent cdcc278 commit df185b9
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import datetime
import itertools
import operator as op
import os
import threading
import time

Expand Down Expand Up @@ -488,6 +489,45 @@ def test_streaming_pull_max_messages(
finally:
subscription_future.cancel() # trigger clean shutdown

@pytest.mark.skipif(
"KOKORO_GFILE_DIR" not in os.environ,
reason="Requires Kokoro environment with a limited subscriber service account.",
)
def test_streaming_pull_subscriber_permissions_sufficient(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):

# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and subscribe to it
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)

# A service account granting only the pubsub.subscriber role must be used.
filename = os.path.join(
os.environ["KOKORO_GFILE_DIR"], "pubsub-subscriber-service-account.json"
)
streaming_pull_subscriber = type(subscriber).from_service_account_file(filename)

# Subscribe to the topic, publish a message, and verify that subscriber
# successfully pulls and processes it.
callback = StreamingPullCallback(processing_time=0.01, resolve_at_msg_count=1)
future = streaming_pull_subscriber.subscribe(subscription_path, callback)
self._publish_messages(publisher, topic_path, batch_sizes=[1])

try:
callback.done_future.result(timeout=10)
except exceptions.TimeoutError:
pytest.fail(
"Timeout: receiving/processing streamed messages took too long."
)
else:
assert 1 in callback.seen_message_ids
finally:
future.cancel()

def _publish_messages(self, publisher, topic_path, batch_sizes):
"""Publish ``count`` messages in batches and wait until completion."""
publish_futures = []
Expand Down

0 comments on commit df185b9

Please sign in to comment.