Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus&EventHubs] api fixes #22418

Merged
merged 17 commits into from
Jan 12, 2022
16 changes: 14 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import json
import logging
import uuid
from typing import (
Union,
Dict,
Expand Down Expand Up @@ -59,6 +60,17 @@
if TYPE_CHECKING:
import datetime

PrimitiveTypes = Union[
None,
swathipil marked this conversation as resolved.
Show resolved Hide resolved
int,
swathipil marked this conversation as resolved.
Show resolved Hide resolved
float,
bytes,
str,
Dict,
List,
uuid.UUID,
]

_LOGGER = logging.getLogger(__name__)

# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
Expand Down Expand Up @@ -319,7 +331,7 @@ def system_properties(self):

@property
def body(self):
# type: () -> Any
# type: () -> PrimitiveTypes
"""The body of the Message. The format may vary depending on the body type:
For :class:`azure.eventhub.amqp.AmqpMessageBodyType.DATA<azure.eventhub.amqp.AmqpMessageBodyType.DATA>`,
the body could be bytes or Iterable[bytes].
Expand All @@ -328,7 +340,7 @@ def body(self):
For :class:`azure.eventhub.amqp.AmqpMessageBodyType.VALUE<azure.eventhub.amqp.AmqpMessageBodyType.VALUE>`,
the body could be any type.

:rtype: Any
:rtype: Union[None, int, float, bytes, str, Dict, List, uuid.UUID]
"""
try:
return self._raw_amqp_message.body
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, Union, Optional, TYPE_CHECKING
from typing import Any, Union, Optional, TYPE_CHECKING, Literal
import logging
from weakref import WeakSet

Expand All @@ -25,6 +25,7 @@
from ._common.constants import (
ServiceBusSubQueue,
ServiceBusReceiveMode,
NEXT_AVAILABLE_SESSION,
)

if TYPE_CHECKING:
Expand All @@ -34,6 +35,8 @@
AzureNamedKeyCredential,
)

NextAvailableSessionType = Literal[NEXT_AVAILABLE_SESSION]


_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -269,7 +272,7 @@ def get_queue_receiver(
self,
queue_name: str,
*,
session_id: Optional[str] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
Expand All @@ -285,13 +288,14 @@ def get_queue_receiver(
:keyword session_id: A specific session from which to receive. This must be specified for a
sessionful queue, otherwise it must be None. In order to receive messages from the next available
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
:paramtype session_id: Union[str, Literal[~azure.servicebus.NEXT_AVAILABLE_SESSION]]
swathipil marked this conversation as resolved.
Show resolved Hide resolved
:keyword sub_queue: If specified, the subqueue this receiver will
connect to.
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
receiver or messages that can't be processed.
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
enum or equivalent string values "deadletter" and "transferdeadletter".
:paramtype sub_queue: Optional[Union[~azure.servicebus.ServiceBusSubQueue, str]]
:keyword receive_mode: The receive_mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
Expand Down Expand Up @@ -421,7 +425,7 @@ def get_subscription_receiver(
topic_name: str,
subscription_name: str,
*,
session_id: Optional[str] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
Expand All @@ -439,13 +443,14 @@ def get_subscription_receiver(
:keyword session_id: A specific session from which to receive. This must be specified for a
sessionful subscription, otherwise it must be None. In order to receive messages from the next available
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
:paramtype session_id: Union[str, Literal[~azure.servicebus.NEXT_AVAILABLE_SESSION]]
:keyword sub_queue: If specified, the subqueue this receiver will
connect to.
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
receiver or messages that can't be processed.
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
enum or equivalent string values "deadletter" and "transferdeadletter".
:paramtype sub_queue: Optional[Union[~azure.servicebus.ServiceBusSubQueue, str]]
:keyword receive_mode: The receive_mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_state(self, *, timeout: Optional[float] = None) -> bytes:

:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:rtype: str
:rtype: bytes

.. admonition:: Example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, Union, Optional, TYPE_CHECKING
from typing import Any, Union, Optional, TYPE_CHECKING, Literal
import logging
from weakref import WeakSet

Expand All @@ -21,13 +21,16 @@
from .._common.utils import generate_dead_letter_entity_name, strip_protocol_from_uri
from .._common.constants import (
ServiceBusSubQueue,
ServiceBusReceiveMode
ServiceBusReceiveMode,
NEXT_AVAILABLE_SESSION,
)
from ._async_utils import create_authentication

if TYPE_CHECKING:
from azure.core.credentials_async import AsyncTokenCredential

NextAvailableSessionType = Literal[NEXT_AVAILABLE_SESSION]

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -258,7 +261,7 @@ def get_queue_receiver(
self,
queue_name: str,
*,
session_id: Optional[str] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
Expand All @@ -274,13 +277,14 @@ def get_queue_receiver(
:keyword session_id: A specific session from which to receive. This must be specified for a
sessionful queue, otherwise it must be None. In order to receive messages from the next available
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
:paramtype session_id: Union[str, Literal[~azure.servicebus.NEXT_AVAILABLE_SESSION]]
:keyword sub_queue: If specified, the subqueue this receiver will
connect to.
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
receiver or messages that can't be processed.
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
enum or equivalent string values "deadletter" and "transferdeadletter".
:paramtype sub_queue: Optional[Union[~azure.servicebus.ServiceBusSubQueue, str]]
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
Expand Down Expand Up @@ -408,7 +412,7 @@ def get_subscription_receiver(
topic_name: str,
subscription_name: str,
*,
session_id: Optional[str] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
Expand All @@ -426,13 +430,14 @@ def get_subscription_receiver(
:keyword session_id: A specific session from which to receive. This must be specified for a
sessionful subscription, otherwise it must be None. In order to receive messages from the next available
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
:paramtype session_id: Union[str, Literal[~azure.servicebus.NEXT_AVAILABLE_SESSION]]
:keyword sub_queue: If specified, the subqueue this receiver will
connect to.
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
receiver or messages that can't be processed.
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
enum or equivalent string values "deadletter" and "transferdeadletter".
:paramtype sub_queue: Optional[Union[~azure.servicebus.ServiceBusSubQueue, str]]
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def get_state(self, *, timeout: Optional[float] = None) -> bytes:

:keyword Optional[float] timeout: The total operation timeout in seconds including all the retries.
The value must be greater than 0 if specified. The default value is None, meaning no timeout.
:rtype: str
:rtype: bytes

.. admonition:: Example:

Expand Down