diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 8980f96e6114..6bf95e2c79bd 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -16,6 +16,7 @@ * Remove `is_anonymous_accessible` from management entities. * Remove `support_ordering` from `create_queue` and `QueueProperties` * Remove `enable_subscription_partitioning` from `create_topic` and `TopicProperties` +* `get_dead_letter_[queue,subscription]_receiver()` has been removed. To connect to a dead letter queue, utilize the `sub_queue` parameter of `get_[queue,subscription]_receiver()` provided with a value from the `SubQueue` enum ## 7.0.0b5 (2020-08-10) diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index ffa3c82ba069..3a2bded25653 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -282,7 +282,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: #### [DeadLetter][deadletter_reference] -Transfer the message from the primary queue into a special "dead-letter sub-queue" where it can be accessed using the `ServiceBusClient.get__deadletter_receiver` function and consumed from like any other receiver. (see sample [here](./samples/sync_samples/receive_deadlettered_messages.py)) +Transfer the message from the primary queue into a special "dead-letter sub-queue" where it can be accessed using the `ServiceBusClient.get__receiver` function with parameter `sub_queue=SubQueue.DeadLetter` and consumed from like any other receiver. (see sample [here](./samples/sync_samples/receive_deadlettered_messages.py)) ```Python from azure.servicebus import ServiceBusClient diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py index 826e54f3e087..3b9ec8358e35 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py @@ -15,7 +15,7 @@ from ._servicebus_session import ServiceBusSession from ._base_handler import ServiceBusSharedKeyCredential from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage -from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE +from ._common.constants import ReceiveSettleMode, SubQueue from ._common.auto_lock_renewer import AutoLockRenew TransportType = constants.TransportType @@ -26,7 +26,7 @@ 'PeekMessage', 'ReceivedMessage', 'ReceiveSettleMode', - 'NEXT_AVAILABLE', + 'SubQueue', 'ServiceBusClient', 'ServiceBusReceiver', 'ServiceBusSessionReceiver', diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py index 00182a290432..4e1185b0c4d2 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py @@ -115,6 +115,11 @@ class SessionFilter(Enum): NextAvailable = 0 +class SubQueue(Enum): + DeadLetter = 1 + TransferDeadLetter = 2 + + ANNOTATION_SYMBOL_PARTITION_KEY = types.AMQPSymbol(_X_OPT_PARTITION_KEY) ANNOTATION_SYMBOL_VIA_PARTITION_KEY = types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY) ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index be27075ea2de..0f90b3f15aea 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -13,6 +13,7 @@ from ._servicebus_session_receiver import ServiceBusSessionReceiver from ._common._configuration import Configuration from ._common.utils import create_authentication, generate_dead_letter_entity_name +from ._common.constants import SubQueue if TYPE_CHECKING: from azure.core.credentials import TokenCredential @@ -192,6 +193,9 @@ def get_queue_receiver(self, queue_name, **kwargs): """Get ServiceBusReceiver for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. + :keyword Optional[SubQueue] sub_queue: If specified, the subqueue this receiver will connect to. + This includes the DeadLetter and TransferDeadLetter queues, holds messages that can't be delivered to any + receiver or messages that can't be processed. The default is None, meaning connect to the primary queue. :keyword mode: The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete @@ -222,10 +226,16 @@ def get_queue_receiver(self, queue_name, **kwargs): """ + sub_queue = kwargs.get('sub_queue', None) + if sub_queue and sub_queue in SubQueue: + queue_name = generate_dead_letter_entity_name( + queue_name=queue_name, + transfer_deadletter=(sub_queue == SubQueue.TransferDeadLetter) + ) # pylint: disable=protected-access handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, - queue_name=queue_name, + entity_name=queue_name, credential=self._credential, logging_enable=self._config.logging_enable, transport_type=self._config.transport_type, @@ -237,69 +247,6 @@ def get_queue_receiver(self, queue_name, **kwargs): self._handlers.append(handler) return handler - def get_queue_deadletter_receiver(self, queue_name, **kwargs): - # type: (str, Any) -> ServiceBusReceiver - """Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by - the specific Queue, it holds messages that can't be delivered to any receiver or messages that can't - be processed. - - :param str queue_name: The path of specific Service Bus Queue the client connects to. - :keyword mode: The mode with which messages will be retrieved from the entity. The two options - are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given - lock period before they will be removed from the queue. Messages received with ReceiveAndDelete - will be immediately removed from the queue, and cannot be subsequently rejected or re-received if - the client fails to process the message. The default mode is PeekLock. - :paramtype mode: ~azure.servicebus.ReceiveSettleMode - :keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will - automatically stop receiving. The default value is 0, meaning no timeout. - :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. - Default value is 3. - :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. - Default value is 0.8. - :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. - :keyword bool transfer_deadletter: Whether to connect to the transfer dead-letter queue, or the standard - dead-letter queue. The transfer dead-letter queue holds messages that have failed to be transferred in - ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint. - :keyword int prefetch: The maximum number of messages to cache with each request to the service. - This setting is only for advanced performance tuning. Increasing this value will improve message throughput - performance but increase the chance that messages will expire while they are cached if they're not - processed fast enough. - The default value is 0, meaning messages will be received from the service and processed one at a time. - In the case of prefetch being 0, `ServiceBusReceiver.receive` would try to cache `max_batch_size` (if provided) - within its request to the service. - :rtype: ~azure.servicebus.ServiceBusReceiver - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_queue_deadletter_receiver_from_sb_client_sync] - :end-before: [END create_queue_deadletter_receiver_from_sb_client_sync] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient. - - - """ - # pylint: disable=protected-access - entity_name = generate_dead_letter_entity_name( - queue_name=queue_name, - transfer_deadletter=kwargs.get('transfer_deadletter', False) - ) - handler = ServiceBusReceiver( - fully_qualified_namespace=self.fully_qualified_namespace, - entity_name=entity_name, - credential=self._credential, - logging_enable=self._config.logging_enable, - transport_type=self._config.transport_type, - http_proxy=self._config.http_proxy, - connection=self._connection, - is_dead_letter_receiver=True, - user_agent=self._config.user_agent, - **kwargs - ) - self._handlers.append(handler) - return handler - def get_topic_sender(self, topic_name, **kwargs): # type: (str, Any) -> ServiceBusSender """Get ServiceBusSender for the specific topic. @@ -343,6 +290,9 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): :param str topic_name: The name of specific Service Bus Topic the client connects to. :param str subscription_name: The name of specific Service Bus Subscription under the given Service Bus Topic. + :keyword Optional[SubQueue] sub_queue: If specified, the subqueue this receiver will connect to. + This includes the DeadLetter and TransferDeadLetter queues, holds messages that can't be delivered to any + receiver or messages that can't be processed. The default is None, meaning connect to the primary queue. :keyword mode: The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete @@ -377,83 +327,37 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): """ # pylint: disable=protected-access - handler = ServiceBusReceiver( - fully_qualified_namespace=self.fully_qualified_namespace, - topic_name=topic_name, - subscription_name=subscription_name, - credential=self._credential, - logging_enable=self._config.logging_enable, - transport_type=self._config.transport_type, - http_proxy=self._config.http_proxy, - connection=self._connection, - user_agent=self._config.user_agent, - **kwargs - ) - self._handlers.append(handler) - return handler - - def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs): - # type: (str, str, Any) -> ServiceBusReceiver - """Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by - the specific topic subscription, it holds messages that can't be delivered to any receiver or messages that - can't be processed. - - :param str topic_name: The name of specific Service Bus Topic the client connects to. - :param str subscription_name: The name of specific Service Bus Subscription - under the given Service Bus Topic. - :keyword mode: The mode with which messages will be retrieved from the entity. The two options - are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given - lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete - will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if - the client fails to process the message. The default mode is PeekLock. - :paramtype mode: ~azure.servicebus.ReceiveSettleMode - :keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will - automatically stop receiving. The default value is 0, meaning no timeout. - :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. - Default value is 3. - :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. - Default value is 0.8. - :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. - :keyword bool transfer_deadletter: Whether to connect to the transfer dead-letter queue, or the standard - dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in - ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint. - :keyword int prefetch: The maximum number of messages to cache with each request to the service. - This setting is only for advanced performance tuning. Increasing this value will improve message throughput - performance but increase the chance that messages will expire while they are cached if they're not - processed fast enough. - The default value is 0, meaning messages will be received from the service and processed one at a time. - In the case of prefetch being 0, `ServiceBusReceiver.receive` would try to cache `max_batch_size` (if provided) - within its request to the service. - :rtype: ~azure.servicebus.ServiceBusReceiver - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_subscription_deadletter_receiver_from_sb_client_sync] - :end-before: [END create_subscription_deadletter_receiver_from_sb_client_sync] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient. - - - """ - entity_name = generate_dead_letter_entity_name( - topic_name=topic_name, - subscription_name=subscription_name, - transfer_deadletter=kwargs.get('transfer_deadletter', False) - ) - handler = ServiceBusReceiver( - fully_qualified_namespace=self.fully_qualified_namespace, - entity_name=entity_name, - credential=self._credential, - logging_enable=self._config.logging_enable, - transport_type=self._config.transport_type, - http_proxy=self._config.http_proxy, - connection=self._connection, - is_dead_letter_receiver=True, - user_agent=self._config.user_agent, - **kwargs - ) + sub_queue = kwargs.get('sub_queue', None) + if sub_queue and sub_queue in SubQueue: + entity_name = generate_dead_letter_entity_name( + topic_name=topic_name, + subscription_name=subscription_name, + transfer_deadletter=(sub_queue == SubQueue.TransferDeadLetter) + ) + handler = ServiceBusReceiver( + fully_qualified_namespace=self.fully_qualified_namespace, + entity_name=entity_name, + credential=self._credential, + logging_enable=self._config.logging_enable, + transport_type=self._config.transport_type, + http_proxy=self._config.http_proxy, + connection=self._connection, + user_agent=self._config.user_agent, + **kwargs + ) + else: + handler = ServiceBusReceiver( + fully_qualified_namespace=self.fully_qualified_namespace, + topic_name=topic_name, + subscription_name=subscription_name, + credential=self._credential, + logging_enable=self._config.logging_enable, + transport_type=self._config.transport_type, + http_proxy=self._config.http_proxy, + connection=self._connection, + user_agent=self._config.user_agent, + **kwargs + ) self._handlers.append(handler) return handler diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index a6827a8ae91a..33fa2775c52c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -14,6 +14,7 @@ from ._servicebus_session_receiver_async import ServiceBusSessionReceiver from .._common._configuration import Configuration from .._common.utils import generate_dead_letter_entity_name +from .._common.constants import SubQueue from ._async_utils import create_authentication if TYPE_CHECKING: @@ -194,6 +195,9 @@ def get_queue_receiver(self, queue_name, **kwargs): """Get ServiceBusReceiver for the specific queue. :param str queue_name: The path of specific Service Bus Queue the client connects to. + :keyword Optional[SubQueue] sub_queue: If specified, the subqueue this receiver will connect to. + This includes the DeadLetter and TransferDeadLetter queues, holds messages that can't be delivered to any + receiver or messages that can't be processed. The default is None, meaning connect to the primary queue. :keyword mode: The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the queue. Messages received with ReceiveAndDelete @@ -224,77 +228,20 @@ def get_queue_receiver(self, queue_name, **kwargs): """ # pylint: disable=protected-access + sub_queue = kwargs.get('sub_queue', None) + if sub_queue and sub_queue in SubQueue: + queue_name = generate_dead_letter_entity_name( + queue_name=queue_name, + transfer_deadletter=(sub_queue == SubQueue.TransferDeadLetter) + ) handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, - queue_name=queue_name, - credential=self._credential, - logging_enable=self._config.logging_enable, - transport_type=self._config.transport_type, - http_proxy=self._config.http_proxy, - connection=self._connection, - user_agent=self._config.user_agent, - **kwargs - ) - self._handlers.append(handler) - return handler - - def get_queue_deadletter_receiver(self, queue_name, **kwargs): - # type: (str, Any) -> ServiceBusReceiver - """Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by - the specific Queue, it holds messages that can't be delivered to any receiver or messages that can't - be processed. - - :param str queue_name: The path of specific Service Bus Queue the client connects to. - :keyword mode: The mode with which messages will be retrieved from the entity. The two options - are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given - lock period before they will be removed from the queue. Messages received with ReceiveAndDelete - will be immediately removed from the queue, and cannot be subsequently rejected or re-received if - the client fails to process the message. The default mode is PeekLock. - :paramtype mode: ~azure.servicebus.ReceiveSettleMode - :keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will - automatically stop receiving. The default value is 0, meaning no timeout. - :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. - Default value is 3. - :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. - Default value is 0.8. - :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. - :keyword bool transfer_deadletter: Whether to connect to the transfer dead-letter queue, or the standard - dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in - ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint. - :keyword int prefetch: The maximum number of messages to cache with each request to the service. - This setting is only for advanced performance tuning. Increasing this value will improve message throughput - performance but increase the chance that messages will expire while they are cached if they're not - processed fast enough. - The default value is 0, meaning messages will be received from the service and processed one at a time. - In the case of prefetch being 0, `ServiceBusReceiver.receive` would try to cache `max_batch_size` (if provided) - within its request to the service. - :rtype: ~azure.servicebus.aio.ServiceBusReceiver - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START create_queue_deadletter_receiver_from_sb_client_async] - :end-before: [END create_queue_deadletter_receiver_from_sb_client_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient. - - - """ - # pylint: disable=protected-access - entity_name = generate_dead_letter_entity_name( - queue_name=queue_name, - transfer_deadletter=kwargs.get('transfer_deadletter', False) - ) - handler = ServiceBusReceiver( - fully_qualified_namespace=self.fully_qualified_namespace, - entity_name=entity_name, + entity_name=queue_name, credential=self._credential, logging_enable=self._config.logging_enable, transport_type=self._config.transport_type, http_proxy=self._config.http_proxy, connection=self._connection, - is_dead_letter_receiver=True, user_agent=self._config.user_agent, **kwargs ) @@ -344,6 +291,9 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): :param str topic_name: The name of specific Service Bus Topic the client connects to. :param str subscription_name: The name of specific Service Bus Subscription under the given Service Bus Topic. + :keyword Optional[SubQueue] sub_queue: If specified, the subqueue this receiver will connect to. + This includes the DeadLetter and TransferDeadLetter queues, holds messages that can't be delivered to any + receiver or messages that can't be processed. The default is None, meaning connect to the primary queue. :keyword mode: The mode with which messages will be retrieved from the entity. The two options are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete @@ -378,83 +328,37 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): """ # pylint: disable=protected-access - handler = ServiceBusReceiver( - fully_qualified_namespace=self.fully_qualified_namespace, - topic_name=topic_name, - subscription_name=subscription_name, - credential=self._credential, - logging_enable=self._config.logging_enable, - transport_type=self._config.transport_type, - http_proxy=self._config.http_proxy, - connection=self._connection, - user_agent=self._config.user_agent, - **kwargs - ) - self._handlers.append(handler) - return handler - - def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs): - # type: (str, str, Any) -> ServiceBusReceiver - """Get ServiceBusReceiver for the dead-letter queue which is the secondary subqueue provided by - the specific topic subscription, it holds messages that can't be delivered to any receiver or messages that - can't be processed. - - :param str topic_name: The name of specific Service Bus Topic the client connects to. - :param str subscription_name: The name of specific Service Bus Subscription - under the given Service Bus Topic. - :keyword mode: The mode with which messages will be retrieved from the entity. The two options - are PeekLock and ReceiveAndDelete. Messages received with PeekLock must be settled within a given - lock period before they will be removed from the subscription. Messages received with ReceiveAndDelete - will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if - the client fails to process the message. The default mode is PeekLock. - :paramtype mode: ~azure.servicebus.ReceiveSettleMode - :keyword float max_wait_time: The timeout in seconds between received messages after which the receiver will - automatically stop receiving. The default value is 0, meaning no timeout. - :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. - Default value is 3. - :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries. - Default value is 0.8. - :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120. - :keyword bool transfer_deadletter: Whether to connect to the transfer dead-letter queue, or the standard - dead-letter queue. The transfer dead letter queue holds messages that have failed to be transferred in - ForwardTo or SendVia scenarios. Default is False, using the standard dead-letter endpoint. - :keyword int prefetch: The maximum number of messages to cache with each request to the service. - This setting is only for advanced performance tuning. Increasing this value will improve message throughput - performance but increase the chance that messages will expire while they are cached if they're not - processed fast enough. - The default value is 0, meaning messages will be received from the service and processed one at a time. - In the case of prefetch being 0, `ServiceBusReceiver.receive` would try to cache `max_batch_size` (if provided) - within its request to the service. - :rtype: ~azure.servicebus.aio.ServiceBusReceiver - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START create_subscription_deadletter_receiver_from_sb_client_async] - :end-before: [END create_subscription_deadletter_receiver_from_sb_client_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the ServiceBusReceiver for Dead Letter Queue from ServiceBusClient. - - - """ - entity_name = generate_dead_letter_entity_name( - topic_name=topic_name, - subscription_name=subscription_name, - transfer_deadletter=kwargs.get('transfer_deadletter', False) - ) - handler = ServiceBusReceiver( - fully_qualified_namespace=self.fully_qualified_namespace, - entity_name=entity_name, - credential=self._credential, - logging_enable=self._config.logging_enable, - transport_type=self._config.transport_type, - http_proxy=self._config.http_proxy, - connection=self._connection, - is_dead_letter_receiver=True, - user_agent=self._config.user_agent, - **kwargs - ) + sub_queue = kwargs.get('sub_queue', None) + if sub_queue and sub_queue in SubQueue: + entity_name = generate_dead_letter_entity_name( + topic_name=topic_name, + subscription_name=subscription_name, + transfer_deadletter=(sub_queue == SubQueue.TransferDeadLetter) + ) + handler = ServiceBusReceiver( + fully_qualified_namespace=self.fully_qualified_namespace, + entity_name=entity_name, + credential=self._credential, + logging_enable=self._config.logging_enable, + transport_type=self._config.transport_type, + http_proxy=self._config.http_proxy, + connection=self._connection, + user_agent=self._config.user_agent, + **kwargs + ) + else: + handler = ServiceBusReceiver( + fully_qualified_namespace=self.fully_qualified_namespace, + topic_name=topic_name, + subscription_name=subscription_name, + credential=self._credential, + logging_enable=self._config.logging_enable, + transport_type=self._config.transport_type, + http_proxy=self._config.http_proxy, + connection=self._connection, + user_agent=self._config.user_agent, + **kwargs + ) self._handlers.append(handler) return handler diff --git a/sdk/servicebus/azure-servicebus/migration_guide.md b/sdk/servicebus/azure-servicebus/migration_guide.md index deb41d4e6262..7516634f6338 100644 --- a/sdk/servicebus/azure-servicebus/migration_guide.md +++ b/sdk/servicebus/azure-servicebus/migration_guide.md @@ -43,6 +43,7 @@ semantics with the sender or receiver lifetime. |---|---|---| | `QueueClient.from_connection_string().get_receiver().fetch_next() and ServiceBusClient.from_connection_string().get_queue().get_receiver().fetch_next()`| `ServiceBusClient.from_connection_string().get_queue_receiver().receive_messages()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) | | `QueueClient.from_connection_string().get_receiver().peek() and ServiceBusClient.from_connection_string().get_queue().get_receiver().peek()`| `ServiceBusClient.from_connection_string().get_queue_receiver().peek_messages()`| [Get a receiver and receive a single batch of messages](./samples/sync_samples/receive_queue.py) | +| `QueueClient.from_connection_string().get_deadletter_receiver() and ServiceBusClient.from_connection_string().get_queue().get_deadletter_receiver()`| `ServiceBusClient.from_connection_string().get_queue_receiver(sub_queue=SubQueue.DeadLetter)`| [Get a deadletter receiver](./samples/sync_samples/receive_queue.py) | ### Sending messages diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/receive_deadlettered_messages_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/receive_deadlettered_messages_async.py index 79d9da81836c..40b443f90aba 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/receive_deadlettered_messages_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/receive_deadlettered_messages_async.py @@ -13,7 +13,7 @@ import os import asyncio -from azure.servicebus import Message +from azure.servicebus import Message, SubQueue from azure.servicebus.aio import ServiceBusClient @@ -39,7 +39,9 @@ async def main(): await msg.dead_letter() print('receiving deadlettered messages') - dlq_receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=QUEUE_NAME, prefetch=10) + dlq_receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, + sub_queue=SubQueue.DeadLetter, + prefetch=10) async with dlq_receiver: received_msgs = await dlq_receiver.receive_messages(max_batch_size=10, max_wait_time=5) for msg in received_msgs: diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_deadlettered_messages.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_deadlettered_messages.py index dcaed97c9313..03274d368fe1 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_deadlettered_messages.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_deadlettered_messages.py @@ -12,7 +12,7 @@ # pylint: disable=C0111 import os -from azure.servicebus import ServiceBusClient, Message +from azure.servicebus import ServiceBusClient, Message, SubQueue CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] @@ -34,7 +34,7 @@ msg.dead_letter() print('receiving deadlettered messages') - dlq_receiver = servicebus_client.get_queue_deadletter_receiver(queue_name=QUEUE_NAME) + dlq_receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, sub_queue=SubQueue.DeadLetter) with dlq_receiver: received_msgs = dlq_receiver.receive_messages(max_batch_size=10, max_wait_time=5) for msg in received_msgs: diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 8b0979497578..dbee1210a8a8 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -19,7 +19,7 @@ AutoLockRenew) from azure.servicebus import TransportType from azure.servicebus._common.message import Message, BatchMessage, PeekMessage -from azure.servicebus._common.constants import ReceiveSettleMode +from azure.servicebus._common.constants import ReceiveSettleMode, SubQueue from azure.servicebus._common.utils import utc_now from azure.servicebus.exceptions import ( ServiceBusConnectionError, @@ -402,7 +402,9 @@ async def test_async_queue_by_servicebus_client_iter_messages_with_retrieve_defe await message.dead_letter(reason="Testing reason", description="Testing description") count = 0 - async with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, max_wait_time=5) as receiver: + async with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5) as receiver: async for message in receiver: count += 1 print_message(_logger, message) @@ -514,8 +516,9 @@ async def test_async_queue_by_servicebus_client_receive_batch_with_deadletter(se count += 1 assert count == 0 - async with sb_client.get_queue_deadletter_receiver( + async with sb_client.get_queue_receiver( servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock) as dl_receiver: count = 0 @@ -555,8 +558,9 @@ async def test_async_queue_by_servicebus_client_receive_batch_with_retrieve_dead assert count == 10 - async with sb_client.get_queue_deadletter_receiver( + async with sb_client.get_queue_receiver( servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock ) as receiver: @@ -772,7 +776,10 @@ async def test_async_queue_message_time_to_live(self, servicebus_namespace_conne messages = await receiver.receive_messages(max_wait_time=10) assert not messages - async with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, max_wait_time=5, mode=ReceiveSettleMode.PeekLock) as receiver: + async with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5, + mode=ReceiveSettleMode.PeekLock) as receiver: count = 0 async for message in receiver: print_message(_logger, message) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 58372f2f23ef..0f6e7de70906 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -16,7 +16,7 @@ from uamqp.errors import VendorLinkDetach from azure.servicebus.aio import ServiceBusClient, ReceivedMessage, AutoLockRenew from azure.servicebus._common.message import Message, PeekMessage -from azure.servicebus._common.constants import ReceiveSettleMode, NEXT_AVAILABLE +from azure.servicebus._common.constants import ReceiveSettleMode, NEXT_AVAILABLE, SubQueue from azure.servicebus._common.utils import utc_now from azure.servicebus.exceptions import ( ServiceBusConnectionError, @@ -257,7 +257,9 @@ async def test_async_session_by_servicebus_client_iter_messages_with_retrieve_de await message.dead_letter(reason="Testing reason", description="Testing description") count = 0 - async with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, max_wait_time=5) as receiver: + async with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5) as receiver: async for message in receiver: count += 1 print_message(_logger, message) @@ -360,7 +362,9 @@ async def test_async_session_by_servicebus_client_fetch_next_with_retrieve_deadl messages = await receiver.receive_messages() assert count == 10 - async with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, max_wait_time=5) as session: + async with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5) as session: count = 0 async for message in session: print_message(_logger, message) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py index 637698ff3046..c83f87a1c9bd 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py @@ -14,6 +14,7 @@ from azure.servicebus import Message, PeekMessage, ReceiveSettleMode from azure.servicebus.aio import ServiceBusClient, ServiceBusSharedKeyCredential from azure.servicebus.exceptions import ServiceBusError +from azure.servicebus._common.constants import SubQueue from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer, CachedResourceGroupPreparer from servicebus_preparer import ( @@ -138,9 +139,10 @@ async def test_topic_by_servicebus_client_receive_batch_with_deadletter(self, se count += 1 assert count == 0 - async with sb_client.get_subscription_deadletter_receiver( + async with sb_client.get_subscription_receiver( topic_name=servicebus_topic.name, subscription_name=servicebus_subscription.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock ) as dl_receiver: diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 64412ddce7c7..46137893985e 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -17,6 +17,7 @@ from azure.servicebus import ServiceBusClient, AutoLockRenew, TransportType from azure.servicebus._common.message import Message, PeekMessage, ReceivedMessage, BatchMessage from azure.servicebus._common.constants import ( + SubQueue, ReceiveSettleMode, _X_OPT_LOCK_TOKEN, _X_OPT_PARTITION_KEY, @@ -509,8 +510,9 @@ def test_queue_by_servicebus_client_iter_messages_with_retrieve_deferred_receive message.dead_letter(reason="Testing reason", description="Testing description") count = 0 - with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, - max_wait_time=5) as receiver: + with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5) as receiver: for message in receiver: count += 1 print_message(_logger, message) @@ -632,8 +634,9 @@ def test_queue_by_servicebus_client_receive_batch_with_deadletter(self, serviceb count += 1 assert count == 0 - with sb_client.get_queue_deadletter_receiver( + with sb_client.get_queue_receiver( servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock) as dl_receiver: count = 0 @@ -679,8 +682,9 @@ def test_queue_by_servicebus_client_receive_batch_with_retrieve_deadletter(self, assert count == 10 - with sb_client.get_queue_deadletter_receiver( + with sb_client.get_queue_receiver( servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock) as dl_receiver: count = 0 @@ -961,8 +965,9 @@ def test_queue_message_time_to_live(self, servicebus_namespace_connection_string messages = receiver.receive_messages(5, max_wait_time=10) assert not messages - with sb_client.get_queue_deadletter_receiver( + with sb_client.get_queue_receiver( servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock) as dl_receiver: count = 0 diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index 20c8c646a9fe..cbee34ef2ee4 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -15,7 +15,7 @@ from azure.servicebus import ServiceBusClient, AutoLockRenew from azure.servicebus._common.message import Message, PeekMessage, ReceivedMessage -from azure.servicebus._common.constants import ReceiveSettleMode, NEXT_AVAILABLE +from azure.servicebus._common.constants import ReceiveSettleMode, NEXT_AVAILABLE, SubQueue from azure.servicebus._common.utils import utc_now from azure.servicebus.exceptions import ( ServiceBusConnectionError, @@ -332,7 +332,9 @@ def test_session_by_servicebus_client_iter_messages_with_retrieve_deferred_recei message.dead_letter(reason="Testing reason", description="Testing description") count = 0 - with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, max_wait_time=5) as receiver: + with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5) as receiver: for message in receiver: count += 1 print_message(_logger, message) @@ -444,8 +446,9 @@ def test_session_by_servicebus_client_receive_with_retrieve_deadletter(self, ser messages = receiver.receive_messages() assert count == 10 - with sb_client.get_queue_deadletter_receiver(servicebus_queue.name, - max_wait_time=5) as session: + with sb_client.get_queue_receiver(servicebus_queue.name, + sub_queue = SubQueue.DeadLetter, + max_wait_time=5) as session: count = 0 for message in session: print_message(_logger, message) diff --git a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py index c7bc6d7eac75..2855a1bf3c4b 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py @@ -13,6 +13,7 @@ from azure.servicebus import ServiceBusClient, Message, PeekMessage, ReceiveSettleMode, ServiceBusSharedKeyCredential from azure.servicebus.exceptions import ServiceBusError +from azure.servicebus._common.constants import SubQueue from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer, CachedResourceGroupPreparer from servicebus_preparer import ( @@ -158,9 +159,10 @@ def test_subscription_by_servicebus_client_receive_batch_with_deadletter(self, s count += 1 assert count == 0 - with sb_client.get_subscription_deadletter_receiver( + with sb_client.get_subscription_receiver( topic_name=servicebus_topic.name, subscription_name=servicebus_subscription.name, + sub_queue = SubQueue.DeadLetter, max_wait_time=5, mode=ReceiveSettleMode.PeekLock ) as dl_receiver: