Skip to content

Latest commit

 

History

History
274 lines (197 loc) · 7.93 KB

pubsub-usage.rst

File metadata and controls

274 lines (197 loc) · 7.93 KB

Using the API

Authentication / Configuration

Manage topics for a project

Create a new topic for the default project:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> topic.create()  # API request

Check for the existence of a topic:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> topic.exists()  # API request
True

List topics for the default project:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topics, next_page_token = client.list_topics()  # API request
>>> [topic.name for topic in topics]
['topic_name']

Delete a topic:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> topic.delete()  # API request

Publish messages to a topic

Publish a single message to a topic, without attributes:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> topic.publish('this is the message_payload')  # API request
<message_id>

Publish a single message to a topic, with attributes:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> topic.publish('this is another message_payload',
...               attr1='value1', attr2='value2')  # API request
<message_id>

Publish a set of messages to a topic (as a single request):

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> with topic.batch() as batch:
...     batch.publish('this is the first message_payload')
...     batch.publish('this is the second message_payload',
...                   attr1='value1', attr2='value2')
>>> list(batch)
[<message_id1>, <message_id2>]

Note

The only API request happens during the __exit__() of the topic used as a context manager.

Manage subscriptions to topics

Create a new pull subscription for a topic:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> subscription.create()  # API request

Create a new pull subscription for a topic with a non-default ACK deadline:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name', ack_deadline=90)
>>> subscription.create()  # API request

Create a new push subscription for a topic:

>>> from gcloud import pubsub
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name',
...                                   push_endpoint=ENDPOINT)
>>> subscription.create()  # API request

Check for the existence of a subscription:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> subscription.exists()  # API request
True

Convert a pull subscription to push:

>>> from gcloud import pubsub
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> subscription.modify_push_configuration(push_endpoint=ENDPOINT)  # API request

Convert a push subscription to pull:

>>> from gcloud import pubsub
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name',
...                                   push_endpoint=ENDPOINT)
>>> subscription.modify_push_configuration(push_endpoint=None)  # API request

List subscriptions for a topic:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> subscriptions, next_page_token = client.list_subscriptions(
...     topic_name='topic_name')  # API request
>>> [subscription.name for subscription in subscriptions]
['subscription_name']

List all subscriptions for the default project:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> subscription, next_page_tokens = client.list_subscriptions()  # API request
>>> [subscription.name for subscription in subscriptions]
['subscription_name']

Delete a subscription:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> subscription.delete()  # API request

Pull messages from a subscription

Fetch pending messages for a pull subscription:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
...     batch.publish('this is the first message_payload')
...     batch.publish('this is the second message_payload',
...                   attr1='value1', attr2='value2')
>>> received = subscription.pull()  # API request
>>> messages = [recv[1] for recv in received]
>>> [message.message_id for message in messages]
[<message_id1>, <message_id2>]
>>> [message.data for message in messages]
['this is the first message_payload', 'this is the second message_payload']
>>> [message.attributes for message in messages]
[{}, {'attr1': 'value1', 'attr2': 'value2'}]

Note that received messages must be acknowledged, or else the back-end will re-send them later:

>>> ack_ids = [recv[0] for recv in received]
>>> subscription.acknowledge(ack_ids)

Fetch a limited number of pending messages for a pull subscription:

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
...     batch.publish('this is the first message_payload')
...     batch.publish('this is the second message_payload',
...                   attr1='value1', attr2='value2')
>>> received = subscription.pull(max_messages=1)  # API request
>>> messages = [recv[1] for recv in received]
>>> [message.message_id for message in messages]

Fetch messages for a pull subscription without blocking (none pending):

>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = topic.subscription('subscription_name')
>>> received = subscription.pull(return_immediately=True)  # API request
>>> messages = [recv[1] for recv in received]
>>> [message.message_id for message in messages]
[]