Skip to content

Commit

Permalink
samples: fix flaky tests (#233)
Browse files Browse the repository at this point in the history
* fix: reorder tests

* fix: increase timeout

* fix: use backoff

feat: Enable server side flow control by default with the option to turn it off (#231)

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server. If flow_control.max_messages > 0 or flow_control.max_bytes > 0, flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature --in case they encouter issues with server side flow control-- can pass in use_legacy_flow_control=True in SubscriberClient.subscribe().

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server.
If flow_control.max_messages > 0 or flow_control.max_bytes > 0, flow control will be enforced
at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature
--in case they encouter issues with server side flow control-- can pass in
use_legacy_flow_control=true in subscriberclient.subscribe().

Co-authored-by: Tianzi Cai <tianzi@google.com>

fix: replace AssertionError with NotFound

fix: add another pytest fixture in failing test

remove backoff

* add py version in resource names

* keep pulling until response is not None

* use fstrings

* change scope to session and set retry deadline

* lint and increase timeout to 90 for dlq receive
  • Loading branch information
anguillanneuf authored Dec 11, 2020
1 parent f3f4e5a commit 7c532a2
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 76 deletions.
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-pubsub==2.1.0
google-cloud-pubsub==2.2.0
31 changes: 14 additions & 17 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ def create_subscription(project_id, topic_id, subscription_id):


def create_subscription_with_dead_letter_topic(
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
project_id, topic_id, subscription_id, dead_letter_topic_id, max_delivery_attempts=5
):
"""Create a subscription with dead letter policy."""
# [START pubsub_dead_letter_create_subscription]
Expand Down Expand Up @@ -122,7 +121,7 @@ def create_subscription_with_dead_letter_topic(

dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
max_delivery_attempts=max_delivery_attempts,
)

with subscriber:
Expand Down Expand Up @@ -264,8 +263,7 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint):


def update_subscription_with_dead_letter_policy(
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
project_id, topic_id, subscription_id, dead_letter_topic_id, max_delivery_attempts=5
):
"""Update a subscription's dead letter policy."""
# [START pubsub_dead_letter_update_subscription]
Expand Down Expand Up @@ -304,7 +302,7 @@ def update_subscription_with_dead_letter_policy(
# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
max_delivery_attempts=max_delivery_attempts,
)

# Construct the subscription with the dead letter policy you expect to have
Expand Down Expand Up @@ -483,6 +481,7 @@ def callback(message):
def synchronous_pull(project_id, subscription_id):
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
from google.api_core import retry
from google.cloud import pubsub_v1

# TODO(developer)
Expand All @@ -497,9 +496,11 @@ def synchronous_pull(project_id, subscription_id):
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages.
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)

ack_ids = []
Expand All @@ -526,6 +527,7 @@ def synchronous_pull_with_lease_management(project_id, subscription_id):
import sys
import time

from google.api_core import retry
from google.cloud import pubsub_v1

multiprocessing.log_to_stderr()
Expand All @@ -541,7 +543,8 @@ def synchronous_pull_with_lease_management(project_id, subscription_id):
subscription_path = subscriber.subscription_path(project_id, subscription_id)

response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 3}
request={"subscription": subscription_path, "max_messages": 3},
retry=retry.Retry(deadline=300),
)

# Start a process for each message based on its size modulo 10.
Expand Down Expand Up @@ -682,10 +685,7 @@ def callback(message):
create_with_dead_letter_policy_parser.add_argument("subscription_id")
create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
create_with_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
"max_delivery_attempts", type=int, nargs="?", default=5
)

create_push_parser = subparsers.add_parser(
Expand Down Expand Up @@ -719,10 +719,7 @@ def callback(message):
update_dead_letter_policy_parser.add_argument("subscription_id")
update_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
update_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
"max_delivery_attempts", type=int, nargs="?", default=5
)

remove_dead_letter_policy_parser = subparsers.add_parser(
Expand Down
121 changes: 63 additions & 58 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import os
import sys
import uuid

import backoff
Expand All @@ -23,25 +24,26 @@
import subscriber

UUID = uuid.uuid4().hex
PY_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}"
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
TOPIC = "subscription-test-topic-" + UUID
DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID
SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID
SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID
SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID
SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID
ENDPOINT = "https://{}.appspot.com/push".format(PROJECT_ID)
NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT_ID)
TOPIC = f"subscription-test-topic-{PY_VERSION}-{UUID}"
DEAD_LETTER_TOPIC = f"subscription-test-dead-letter-topic-{PY_VERSION}-{UUID}"
SUBSCRIPTION_ADMIN = f"subscription-test-subscription-admin-{PY_VERSION}-{UUID}"
SUBSCRIPTION_ASYNC = f"subscription-test-subscription-async-{PY_VERSION}-{UUID}"
SUBSCRIPTION_SYNC = f"subscription-test-subscription-sync-{PY_VERSION}-{UUID}"
SUBSCRIPTION_DLQ = f"subscription-test-subscription-dlq-{PY_VERSION}-{UUID}"
ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push"
NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2"
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
UPDATED_MAX_DELIVERY_ATTEMPTS = 20


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC)

Expand All @@ -55,7 +57,7 @@ def topic(publisher_client):
publisher_client.delete_topic(request={"topic": topic.name})


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def dead_letter_topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT_ID, DEAD_LETTER_TOPIC)

Expand All @@ -69,14 +71,14 @@ def dead_letter_topic(publisher_client):
publisher_client.delete_topic(request={"topic": dead_letter_topic.name})


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def subscriber_client():
subscriber_client = pubsub_v1.SubscriberClient()
yield subscriber_client
subscriber_client.close()


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def subscription_admin(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT_ID, SUBSCRIPTION_ADMIN
Expand All @@ -94,7 +96,7 @@ def subscription_admin(subscriber_client, topic):
yield subscription.name


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def subscription_sync(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT_ID, SUBSCRIPTION_SYNC
Expand All @@ -114,7 +116,7 @@ def subscription_sync(subscriber_client, topic):
subscriber_client.delete_subscription(request={"subscription": subscription.name})


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def subscription_async(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT_ID, SUBSCRIPTION_ASYNC
Expand All @@ -134,7 +136,7 @@ def subscription_async(subscriber_client, topic):
subscriber_client.delete_subscription(request={"subscription": subscription.name})


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def subscription_dlq(subscriber_client, topic, dead_letter_topic):
from google.cloud.pubsub_v1.types import DeadLetterPolicy

Expand All @@ -161,6 +163,13 @@ def subscription_dlq(subscriber_client, topic, dead_letter_topic):
subscriber_client.delete_subscription(request={"subscription": subscription.name})


def _publish_messages(publisher_client, topic, **attrs):
for n in range(5):
data = f"message {n}".encode("utf-8")
publish_future = publisher_client.publish(topic, data, **attrs)
publish_future.result()


def test_list_in_topic(subscription_admin, capsys):
@backoff.on_exception(backoff.expo, AssertionError, max_time=60)
def eventually_consistent_test():
Expand Down Expand Up @@ -219,10 +228,25 @@ def test_create_subscription_with_dead_letter_policy(
assert f"After {DEFAULT_MAX_DELIVERY_ATTEMPTS} delivery attempts." in out


def test_receive_with_delivery_attempts(
publisher_client, topic, dead_letter_topic, subscription_dlq, capsys
):
_publish_messages(publisher_client, topic)

subscriber.receive_messages_with_delivery_attempts(PROJECT_ID, SUBSCRIPTION_DLQ, 90)

out, _ = capsys.readouterr()
assert f"Listening for messages on {subscription_dlq}.." in out
assert "With delivery attempts: " in out


def test_update_dead_letter_policy(subscription_dlq, dead_letter_topic, capsys):
_ = subscriber.update_subscription_with_dead_letter_policy(
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC,
UPDATED_MAX_DELIVERY_ATTEMPTS
PROJECT_ID,
TOPIC,
SUBSCRIPTION_DLQ,
DEAD_LETTER_TOPIC,
UPDATED_MAX_DELIVERY_ATTEMPTS,
)

out, _ = capsys.readouterr()
Expand All @@ -231,6 +255,16 @@ def test_update_dead_letter_policy(subscription_dlq, dead_letter_topic, capsys):
assert f"max_delivery_attempts: {UPDATED_MAX_DELIVERY_ATTEMPTS}" in out


def test_remove_dead_letter_policy(subscription_dlq, capsys):
subscription_after_update = subscriber.remove_dead_letter_policy(
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ
)

out, _ = capsys.readouterr()
assert subscription_dlq in out
assert subscription_after_update.dead_letter_policy.dead_letter_topic == ""


def test_create_subscription_with_ordering(
subscriber_client, subscription_admin, capsys
):
Expand Down Expand Up @@ -293,13 +327,6 @@ def eventually_consistent_test():
eventually_consistent_test()


def _publish_messages(publisher_client, topic, **attrs):
for n in range(5):
data = "message {}".format(n).encode("utf-8")
publish_future = publisher_client.publish(topic, data, **attrs)
publish_future.result()


def test_receive(publisher_client, topic, subscription_async, capsys):
_publish_messages(publisher_client, topic)

Expand Down Expand Up @@ -340,6 +367,17 @@ def test_receive_with_flow_control(publisher_client, topic, subscription_async,
assert "message" in out


def test_listen_for_errors(publisher_client, topic, subscription_async, capsys):

_publish_messages(publisher_client, topic)

subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)

out, _ = capsys.readouterr()
assert subscription_async in out
assert "threw an exception" in out


def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys):
_publish_messages(publisher_client, topic)

Expand All @@ -360,36 +398,3 @@ def test_receive_synchronously_with_lease(

out, _ = capsys.readouterr()
assert f"Received and acknowledged 3 messages from {subscription_sync}." in out


def test_listen_for_errors(publisher_client, topic, subscription_async, capsys):

_publish_messages(publisher_client, topic)

subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)

out, _ = capsys.readouterr()
assert subscription_async in out
assert "threw an exception" in out


def test_receive_with_delivery_attempts(
publisher_client, topic, subscription_dlq, capsys
):
_publish_messages(publisher_client, topic)

subscriber.receive_messages_with_delivery_attempts(PROJECT_ID, SUBSCRIPTION_DLQ, 15)

out, _ = capsys.readouterr()
assert f"Listening for messages on {subscription_dlq}.." in out
assert "With delivery attempts: " in out


def test_remove_dead_letter_policy(subscription_dlq, capsys):
subscription_after_update = subscriber.remove_dead_letter_policy(
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ
)

out, _ = capsys.readouterr()
assert subscription_dlq in out
assert subscription_after_update.dead_letter_policy.dead_letter_topic == ""

0 comments on commit 7c532a2

Please sign in to comment.