Skip to content

Commit

Permalink
Merge pull request #2086 from tseaver/2077-2079-2080-flaky-pubsub-sys…
Browse files Browse the repository at this point in the history
…tem-tests

Address flaky pubsub system tests
  • Loading branch information
tseaver authored Aug 16, 2016
2 parents ffb3dc7 + 4078404 commit ac330c4
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions system_tests/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def _all_created(result):
self.assertEqual(len(created), len(subscriptions_to_create))

def test_message_pull_mode_e2e(self):
import operator
topic = Config.CLIENT.topic(DEFAULT_TOPIC_NAME,
timestamp_messages=True)
self.assertFalse(topic.exists())
Expand All @@ -169,15 +170,24 @@ def test_message_pull_mode_e2e(self):
topic.publish(MESSAGE_1, extra=EXTRA_1)
topic.publish(MESSAGE_2, extra=EXTRA_2)

received = subscription.pull(max_messages=2)
ack_ids = [recv[0] for recv in received]
subscription.acknowledge(ack_ids)
messages = [recv[1] for recv in received]
class Hoover(object):

def _by_timestamp(message):
return message.timestamp
def __init__(self):
self.received = []

message1, message2 = sorted(messages, key=_by_timestamp)
def done(self, *dummy):
return len(self.received) == 2

def suction(self):
with subscription.auto_ack(max_messages=2) as ack:
self.received.extend(ack.values())

hoover = Hoover()
retry = RetryInstanceState(hoover.done)
retry(hoover.suction)()

message1, message2 = sorted(hoover.received,
key=operator.attrgetter('timestamp'))
self.assertEqual(message1.data, MESSAGE_1)
self.assertEqual(message1.attributes['extra'], EXTRA_1)
self.assertEqual(message2.data, MESSAGE_2)
Expand Down Expand Up @@ -243,6 +253,7 @@ def test_subscription_iam_policy(self):
self.assertEqual(new_policy.viewers, policy.viewers)

def test_fetch_delete_subscription_w_deleted_topic(self):
from gcloud.iterator import MethodIterator
TO_DELETE = 'delete-me' + unique_resource_id('-')
ORPHANED = 'orphaned' + unique_resource_id('-')
topic = Config.CLIENT.topic(TO_DELETE)
Expand All @@ -251,13 +262,15 @@ def test_fetch_delete_subscription_w_deleted_topic(self):
subscription.create()
topic.delete()

all_subs = []
token = None
while True:
subs, token = Config.CLIENT.list_subscriptions(page_token=token)
all_subs.extend(subs)
if token is None:
break
def _fetch():
return list(MethodIterator(Config.CLIENT.list_subscriptions))

def _found_orphan(result):
names = [subscription.name for subscription in result]
return ORPHANED in names

retry_until_found_orphan = RetryResult(_found_orphan)
all_subs = retry_until_found_orphan(_fetch)()

created = [subscription for subscription in all_subs
if subscription.name == ORPHANED]
Expand All @@ -267,8 +280,9 @@ def test_fetch_delete_subscription_w_deleted_topic(self):
def _no_topic(instance):
return instance.topic is None

retry = RetryInstanceState(_no_topic, max_tries=6)
retry(orphaned.reload)()
# Wait for the topic to clear: up to 63 seconds (2 ** 6 - 1)
retry_until_no_topic = RetryInstanceState(_no_topic, max_tries=7)
retry_until_no_topic(orphaned.reload)()

self.assertTrue(orphaned.topic is None)
orphaned.delete()

0 comments on commit ac330c4

Please sign in to comment.