diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 61d02c5303cab..b38ef213f1f0c 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -2,6 +2,10 @@ ## 7.0.0b2 (Unreleased) +**New Features** + +* Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic. + **BugFixes** * Fig bug where http_proxy and transport_type in ServiceBusClient are not propagated into Sender/Receiver creation properly. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 9a790baded6cc..57eaac2b73eb7 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -222,3 +222,38 @@ def get_queue_receiver(self, queue_name, **kwargs): ) return receiver + + def get_topic_sender(self, topic_name, **kwargs): + # type: (str, Any) -> ServiceBusSender + """Get ServiceBusSender for the specific topic. + + :param str topic_name: The path of specific Service Bus Topic the client connects to. + :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. + Default value is 3. + :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. + Default value is 0.8. + :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. + :rtype: ~azure.servicebus.ServiceBusSender + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START create_topic_sender_from_sb_client_sync] + :end-before: [END create_topic_sender_from_sb_client_sync] + :language: python + :dedent: 4 + :caption: Create a new instance of the ServiceBusSender from ServiceBusClient. + + """ + sender = ServiceBusSender( + fully_qualified_namespace=self.fully_qualified_namespace, + topic_name=topic_name, + credential=self._credential, + logging_enable=self._config.logging_enable, + transport_type=self._config.transport_type, + http_proxy=self._config.http_proxy, + connection=self._connection, + **kwargs + ) + + return sender diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 5eaf4a502bcc8..b9313d6b3627a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -255,8 +255,10 @@ def from_connection_string( """Create a ServiceBusSender from a connection string. :param conn_str: The connection string of a Service Bus. - :keyword str queue_name: The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided. - :keyword str topic_name: The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided. + :keyword str queue_name: The path of specific Service Bus Queue the client connects to. + Only one of queue_name or topic_name can be provided. + :keyword str topic_name: The path of specific Service Bus Topic the client connects to. + Only one of queue_name or topic_name can be provided. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index bab8617e73e5f..5e47c9e9e327a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -220,3 +220,38 @@ def get_queue_receiver(self, queue_name, **kwargs): ) return receiver + + def get_topic_sender(self, topic_name, **kwargs): + # type: (str, Any) -> ServiceBusSender + """Get ServiceBusSender for the specific topic. + + :param str topic_name: The path of specific Service Bus Topic the client connects to. + :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. + Default value is 3. + :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. + Default value is 0.8. + :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. + :rtype: ~azure.servicebus.aio.ServiceBusSender + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START create_topic_sender_from_sb_client_async] + :end-before: [END create_topic_sender_from_sb_client_async] + :language: python + :dedent: 4 + :caption: Create a new instance of the ServiceBusSender from ServiceBusClient. + + """ + sender = ServiceBusSender( + fully_qualified_namespace=self.fully_qualified_namespace, + topic_name=topic_name, + credential=self._credential, + logging_enable=self._config.logging_enable, + transport_type=self._config.transport_type, + http_proxy=self._config.http_proxy, + connection=self._connection, + **kwargs + ) + + return sender diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 1bab7a6bc5b1c..de2f75c7a0bdf 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -46,8 +46,10 @@ class ServiceBusSender(BaseHandlerAsync, SenderMixin): implements a particular interface for getting tokens. It accepts :class:`ServiceBusSharedKeyCredential`, or credential objects generated by the azure-identity library and objects that implement the `get_token(self, *scopes)` method. - :keyword str queue_name: The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided. - :keyword str topic_name: The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided. + :keyword str queue_name: The path of specific Service Bus Queue the client connects to. + Only one of queue_name or topic_name can be provided. + :keyword str topic_name: The path of specific Service Bus Topic the client connects to. + Only one of queue_name or topic_name can be provided. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index e2c2d451beef5..cea388a0090a8 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -16,6 +16,9 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - [send_queue.py](./sync_samples/send_queue.py) ([async version](./async_samples/send_queue_async.py)) - Examples to send messages to a service bus queue: - From a connection string - Enabling Logging +- [send_topic.py](./sync_samples/send_topic.py) ([async version](./async_samples/send_topic_async.py)) - Examples to send messages to a service bus topic: + - From a connection string + - Enabling Logging - [receive_queue.py](./sync_samples/receive_queue.py) ([async_version](./async_samples/receive_queue_async.py)) - Examples to receive messages from a service bus queue: - Receive messages - [receive_peek.py](./sync_samples/receive_peek.py) ([async_version](./async_samples/receive_peek_async.py)) - Examples to peek messages from a service bus queue: diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index 7ea3fe9e993e5..f0dd36daee4e5 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -88,6 +88,17 @@ async def example_create_servicebus_sender_async(): async with servicebus_client: queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name) # [END create_servicebus_sender_from_sb_client_async] + + # [START create_topic_sender_from_sb_client_async] + import os + from azure.servicebus import ServiceBusClient + servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] + topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] + servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) + async with servicebus_client: + queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name) + # [END create_topic_sender_from_sb_client_async] + return queue_sender diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/send_topic_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/send_topic_async.py new file mode 100644 index 0000000000000..6b210a540eee9 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/send_topic_async.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Example to show sending message(s) to a Service Bus Topic asynchronously. +""" + +# pylint: disable=C0111 + +import os +import asyncio +from azure.servicebus import Message +from azure.servicebus.aio import ServiceBusClient + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +TOPIC_NAME = os.environ["SERVICE_BUS_TOPIC_NAME"] + + +async def send_single_message(sender): + message = Message("DATA" * 64) + await sender.send(message) + + +async def send_batch_message(sender): + batch_message = await sender.create_batch() + while True: + try: + batch_message.add(Message("DATA" * 256)) + except ValueError: + # BatchMessage object reaches max_size. + # New BatchMessage object can be created here to send more data. + break + await sender.send(batch_message) + + +async def main(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) + + async with servicebus_client: + sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME) + async with sender: + await send_single_message(sender) + await send_batch_message(sender) + + print("Send message is done.") + + +loop = asyncio.get_event_loop() +loop.run_until_complete(main()) diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 9a00ca1f02eb0..a8b638767ac4e 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -85,6 +85,17 @@ def example_create_servicebus_sender_sync(): with servicebus_client: queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name) # [END create_servicebus_sender_from_sb_client_sync] + + # [START create_topic_sender_from_sb_client_sync] + import os + from azure.servicebus import ServiceBusClient + servicebus_connection_str = os.environ['SERVICE_BUS_CONNECTION_STR'] + topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] + servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) + with servicebus_client: + queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name) + # [END create_topic_sender_from_sb_client_sync] + return queue_sender diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/send_topic.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/send_topic.py new file mode 100644 index 0000000000000..37bca424a1cd5 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/send_topic.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Example to show sending message(s) to a Service Bus Topic. +""" + +# pylint: disable=C0111 + +import os +from azure.servicebus import ServiceBusClient, Message + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +TOPIC_NAME = os.environ["SERVICE_BUS_TOPIC_NAME"] + + +def send_single_message(sender): + message = Message("DATA" * 64) + sender.send(message) + + +def send_batch_message(sender): + batch_message = sender.create_batch() + while True: + try: + batch_message.add(Message("DATA" * 256)) + except ValueError: + # BatchMessage object reaches max_size. + # New BatchMessage object can be created here to send more data. + break + sender.send(batch_message) + + +servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) +with servicebus_client: + sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME) + with sender: + send_single_message(sender) + send_batch_message(sender) + +print("Send message is done.") diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_topic_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_topic_async.py new file mode 100644 index 0000000000000..a481eab2c0bb6 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_topic_async.py @@ -0,0 +1,63 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import asyncio +import logging +import sys +import os +import pytest +import time +from datetime import datetime, timedelta + +from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer, CachedResourceGroupPreparer + +from azure.servicebus.aio import ServiceBusClient, ServiceBusSharedKeyCredential +from azure.servicebus._common.message import Message +from servicebus_preparer import ( + ServiceBusNamespacePreparer, + ServiceBusTopicPreparer, + CachedServiceBusNamespacePreparer, + CachedServiceBusTopicPreparer +) +from utilities import get_logger, print_message + +_logger = get_logger(logging.DEBUG) + + +class ServiceBusTopicsAsyncTests(AzureMgmtTestCase): + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + async def test_topic_by_servicebus_client_conn_str_send_basic(self, servicebus_namespace_connection_string, servicebus_topic, **kwargs): + + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False + ) as sb_client: + async with sb_client.get_topic_sender(servicebus_topic.name) as sender: + message = Message(b"Sample topic message") + await sender.send(message) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + async def test_topic_by_sas_token_credential_conn_str_send_basic(self, servicebus_namespace, servicebus_namespace_key_name, servicebus_namespace_primary_key, servicebus_topic, **kwargs): + fully_qualified_namespace = servicebus_namespace.name + '.servicebus.windows.net' + async with ServiceBusClient( + fully_qualified_namespace=fully_qualified_namespace, + credential=ServiceBusSharedKeyCredential( + policy=servicebus_namespace_key_name, + key=servicebus_namespace_primary_key + ), + logging_enable=False + ) as sb_client: + async with sb_client.get_topic_sender(servicebus_topic.name) as sender: + message = Message(b"Sample topic message") + await sender.send(message) diff --git a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py index f0140c0ee5ac7..be201f1f8800b 100644 --- a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py +++ b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py @@ -179,7 +179,6 @@ def remove_resource(self, name, **kwargs): self.client.topics.delete(group.name, namespace.name, name, polling=False) - class ServiceBusSubscriptionPreparer(_ServiceBusChildResourcePreparer): def __init__(self, name_prefix='', @@ -243,8 +242,6 @@ def _get_topic(self, **kwargs): raise AzureTestError(template.format(ServiceBusTopicPreparer.__name__)) - - class ServiceBusQueuePreparer(_ServiceBusChildResourcePreparer): def __init__(self, name_prefix='', @@ -433,5 +430,7 @@ def _get_queue(self, **kwargs): 'decorator @{} in front of this service bus preparer.' raise AzureTestError(template.format(ServiceBusQueuePreparer.__name__)) + CachedServiceBusNamespacePreparer = functools.partial(ServiceBusNamespacePreparer, use_cache=True) CachedServiceBusQueuePreparer = functools.partial(ServiceBusQueuePreparer, use_cache=True) +CachedServiceBusTopicPreparer = functools.partial(ServiceBusTopicPreparer, use_cache=True) diff --git a/sdk/servicebus/azure-servicebus/tests/test_topic.py b/sdk/servicebus/azure-servicebus/tests/test_topic.py new file mode 100644 index 0000000000000..22de2b6b546b3 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/test_topic.py @@ -0,0 +1,80 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import logging +import sys +import os +import pytest +import time +from datetime import datetime, timedelta + +from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer, CachedResourceGroupPreparer + +from azure.servicebus import ServiceBusClient, ServiceBusSharedKeyCredential +from azure.servicebus._common.message import Message +from servicebus_preparer import ( + ServiceBusNamespacePreparer, + ServiceBusTopicPreparer, + CachedServiceBusNamespacePreparer, + CachedServiceBusTopicPreparer +) +from utilities import get_logger, print_message + +_logger = get_logger(logging.DEBUG) + + +class ServiceBusTopicsTests(AzureMgmtTestCase): + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + def test_topic_by_servicebus_client_conn_str_send_basic(self, servicebus_namespace_connection_string, servicebus_topic, **kwargs): + + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False + ) as sb_client: + with sb_client.get_topic_sender(servicebus_topic.name) as sender: + message = Message(b"Sample topic message") + sender.send(message) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + def test_topic_by_sas_token_credential_conn_str_send_basic(self, servicebus_namespace, servicebus_namespace_key_name, servicebus_namespace_primary_key, servicebus_topic, **kwargs): + fully_qualified_namespace = servicebus_namespace.name + '.servicebus.windows.net' + with ServiceBusClient( + fully_qualified_namespace=fully_qualified_namespace, + credential=ServiceBusSharedKeyCredential( + policy=servicebus_namespace_key_name, + key=servicebus_namespace_primary_key + ), + logging_enable=False + ) as sb_client: + with sb_client.get_topic_sender(servicebus_topic.name) as sender: + message = Message(b"Sample topic message") + sender.send(message) + + @pytest.mark.skip(reason="Pending management apis") + @pytest.mark.liveTest + @pytest.mark.live_test_only + @RandomNameResourceGroupPreparer() + @ServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusTopicPreparer(name_prefix='servicebustest') + def test_topic_by_servicebus_client_list_topics(self, servicebus_namespace, servicebus_namespace_key_name, servicebus_namespace_primary_key, servicebus_topic, **kwargs): + + client = ServiceBusClient( + service_namespace=servicebus_namespace.name, + shared_access_key_name=servicebus_namespace_key_name, + shared_access_key_value=servicebus_namespace_primary_key, + debug=False) + + topics = client.list_topics() + assert len(topics) >= 1 + # assert all(isinstance(t, TopicClient) for t in topics)