Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus&EventHubs] api fixes #22418

Merged
merged 17 commits into from
Jan 12, 2022
Merged
15 changes: 13 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import json
import logging
import uuid
from typing import (
Union,
Dict,
Expand Down Expand Up @@ -59,6 +60,16 @@
if TYPE_CHECKING:
import datetime

PrimitiveTypes = Optional[Union[
int,
swathipil marked this conversation as resolved.
Show resolved Hide resolved
float,
bytes,
str,
Dict,
List,
uuid.UUID,
]]

_LOGGER = logging.getLogger(__name__)

# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
Expand Down Expand Up @@ -319,7 +330,7 @@ def system_properties(self):

@property
def body(self):
# type: () -> Any
# type: () -> PrimitiveTypes
"""The body of the Message. The format may vary depending on the body type:
For :class:`azure.eventhub.amqp.AmqpMessageBodyType.DATA<azure.eventhub.amqp.AmqpMessageBodyType.DATA>`,
the body could be bytes or Iterable[bytes].
Expand All @@ -328,7 +339,7 @@ def body(self):
For :class:`azure.eventhub.amqp.AmqpMessageBodyType.VALUE<azure.eventhub.amqp.AmqpMessageBodyType.VALUE>`,
the body could be any type.

:rtype: Any
:rtype: int or float or bytes or str or Dict or List or uuid.UUID
swathipil marked this conversation as resolved.
Show resolved Hide resolved
"""
try:
return self._raw_amqp_message.body
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This version and all future versions will require Python 3.7+. Python 2.7 and 3.

- Added support for fixed (linear) retry backoff:
- Sync/async `ServiceBusClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
- Added new enum class `ServiceBusSessionFilter`, which is the type of existing `NEXT_AVAILABLE_SESSION` value.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ._common.constants import (
ServiceBusReceiveMode,
ServiceBusSubQueue,
ServiceBusSessionFilter,
NEXT_AVAILABLE_SESSION,
)
from ._common.auto_lock_renewer import AutoLockRenewer
Expand All @@ -37,6 +38,7 @@
"ServiceBusReceivedMessage",
"NEXT_AVAILABLE_SESSION",
"ServiceBusSubQueue",
"ServiceBusSessionFilter",
"ServiceBusReceiveMode",
"ServiceBusClient",
"ServiceBusReceiver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any, Union, Optional, TYPE_CHECKING
import logging
from weakref import WeakSet
from typing_extensions import Literal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned in a comment elsewhere - I think this will need to be:

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

Copy link
Member Author

@swathipil swathipil Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this causes a mypy error, same as this one: python/typeshed#3500

They mention checking python version as the solution. I'm not sure if that's what we want to do? or if we should leave this as is here since this works, though not ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add the setup.py amendment above (which we probably should - we don't want unnecessary dependencies) then typing_extensions wont be available in <3.8.
As linked to within the issue you linked to - Mypy should support/understand Python version checks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there's still an issue with mypy, but I think this is because this mypy check in the pipeline is using Python 3.7.

I can ask Scott about this.


import uamqp

Expand All @@ -25,6 +26,7 @@
from ._common.constants import (
ServiceBusSubQueue,
ServiceBusReceiveMode,
ServiceBusSessionFilter,
)

if TYPE_CHECKING:
Expand All @@ -34,6 +36,8 @@
AzureNamedKeyCredential,
)

NextAvailableSessionType = Literal[ServiceBusSessionFilter.NEXT_AVAILABLE]


_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -269,7 +273,7 @@ def get_queue_receiver(
self,
queue_name: str,
*,
session_id: Optional[str] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
Expand All @@ -285,13 +289,14 @@ def get_queue_receiver(
:keyword session_id: A specific session from which to receive. This must be specified for a
sessionful queue, otherwise it must be None. In order to receive messages from the next available
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
:paramtype session_id: str or ~azure.servicebus.NEXT_AVAILABLE_SESSION
:keyword sub_queue: If specified, the subqueue this receiver will
connect to.
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
receiver or messages that can't be processed.
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
enum or equivalent string values "deadletter" and "transferdeadletter".
:paramtype sub_queue: str or ~azure.servicebus.ServiceBusSubQueue
:keyword receive_mode: The receive_mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
Expand Down Expand Up @@ -421,7 +426,7 @@ def get_subscription_receiver(
topic_name: str,
subscription_name: str,
*,
session_id: Optional[str] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
Expand All @@ -439,13 +444,14 @@ def get_subscription_receiver(
:keyword session_id: A specific session from which to receive. This must be specified for a
sessionful subscription, otherwise it must be None. In order to receive messages from the next available
session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.
:paramtype session_id: Union[str, ~azure.servicebus.NEXT_AVAILABLE_SESSION]
:keyword Optional[Union[ServiceBusSubQueue, str]] sub_queue: If specified, the subqueue this receiver will
:paramtype session_id: str or ~azure.servicebus.NEXT_AVAILABLE_SESSION
:keyword sub_queue: If specified, the subqueue this receiver will
connect to.
This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any
receiver or messages that can't be processed.
The default is None, meaning connect to the primary queue. Can be assigned values from `ServiceBusSubQueue`
enum or equivalent string values "deadletter" and "transferdeadletter".
:paramtype sub_queue: str or ~azure.servicebus.ServiceBusSubQueue
:keyword receive_mode: The receive_mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import functools
import uuid
import datetime
import warnings
from typing import Any, List, Optional, Dict, Iterator, Union, TYPE_CHECKING

import six
Expand Down Expand Up @@ -660,7 +661,8 @@ def receive_deferred_messages(
self,
sequence_numbers: Union[int, List[int]],
*,
timeout: Optional[float] = None
timeout: Optional[float] = None,
**kwargs: Any
) -> List[ServiceBusReceivedMessage]:
"""Receive messages that have previously been deferred.

Expand All @@ -683,6 +685,8 @@ def receive_deferred_messages(
:caption: Receive deferred messages from ServiceBus.

"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the f {} formatting support in 3.6+?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is

self._check_live()
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down Expand Up @@ -734,7 +738,8 @@ def peek_messages(
max_message_count: int = 1,
*,
sequence_number: int = 0,
timeout: Optional[float] = None
timeout: Optional[float] = None,
**kwargs: Any
) -> List[ServiceBusReceivedMessage]:
"""Browse messages currently pending in the queue.

Expand All @@ -759,6 +764,8 @@ def peek_messages(
:caption: Look at pending messages in the queue.

"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
self._check_live()
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down Expand Up @@ -899,7 +906,8 @@ def renew_message_lock(
self,
message: ServiceBusReceivedMessage,
*,
timeout: Optional[float] = None
timeout: Optional[float] = None,
**kwargs: Any
) -> datetime.datetime:
# pylint: disable=protected-access,no-member
"""Renew the message lock.
Expand Down Expand Up @@ -933,6 +941,8 @@ def renew_message_lock(
:caption: Renew the lock on a received message.

"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
# type: ignore
try:
if self.session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import uuid
import datetime
import warnings
from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast

import uamqp
Expand Down Expand Up @@ -278,7 +279,8 @@ def schedule_messages(
messages: "MessageTypes",
schedule_time_utc: datetime.datetime,
*,
timeout: Optional[float] = None
timeout: Optional[float] = None,
**kwargs: Any
) -> List[int]:
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
Expand All @@ -301,6 +303,8 @@ def schedule_messages(
:dedent: 4
:caption: Schedule a message to be sent in future
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
# pylint: disable=protected-access

self._check_live()
Expand Down Expand Up @@ -332,7 +336,8 @@ def cancel_scheduled_messages(
self,
sequence_numbers: Union[int, List[int]],
*,
timeout: Optional[float] = None
timeout: Optional[float] = None,
**kwargs: Any
) -> None:
"""
Cancel one or more messages that have previously been scheduled and are still pending.
Expand All @@ -354,6 +359,8 @@ def cancel_scheduled_messages(
:dedent: 4
:caption: Cancelling messages scheduled to be sent in future
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
self._check_live()
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand All @@ -375,7 +382,8 @@ def send_messages(
self,
message: Union["MessageTypes", ServiceBusMessageBatch],
*,
timeout: Optional[float] = None
timeout: Optional[float] = None,
**kwargs: Any
) -> None:
"""Sends message and blocks until acknowledgement is received or operation times out.

Expand Down Expand Up @@ -407,6 +415,8 @@ def send_messages(
:caption: Send message.

"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
self._check_live()
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
# --------------------------------------------------------------------------------------------
import logging
import datetime
from typing import TYPE_CHECKING, Union, Optional
import warnings
from typing import TYPE_CHECKING, Any, Union, Optional
import six

from ._common.utils import utc_from_timestamp, utc_now
Expand Down Expand Up @@ -88,15 +89,15 @@ class ServiceBusSession(BaseSession):
:caption: Get session from a receiver
"""

def get_state(self, *, timeout: Optional[float] = None) -> bytes:
def get_state(self, *, timeout: Optional[float] = None, **kwargs: Any) -> bytes:
# pylint: disable=protected-access
"""Get the session state.

Returns None if no state has been set.

:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:rtype: str
:rtype: bytes

.. admonition:: Example:

Expand All @@ -107,6 +108,8 @@ def get_state(self, *, timeout: Optional[float] = None) -> bytes:
:dedent: 4
:caption: Get the session state
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
self._receiver._check_live() # pylint: disable=protected-access
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand All @@ -119,7 +122,7 @@ def get_state(self, *, timeout: Optional[float] = None) -> bytes:
session_state = response.get(MGMT_RESPONSE_SESSION_STATE) # type: ignore
return session_state

def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[float] = None) -> None:
def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[float] = None, **kwargs: Any) -> None:
# pylint: disable=protected-access
"""Set the session state.

Expand All @@ -137,6 +140,8 @@ def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[fl
:dedent: 4
:caption: Set the session state
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
self._receiver._check_live() # pylint: disable=protected-access
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand All @@ -153,7 +158,7 @@ def set_state(self, state: Union[str, bytes, bytearray], *, timeout: Optional[fl
timeout=timeout,
)

def renew_lock(self, *, timeout: Optional[float] = None) -> datetime.datetime:
def renew_lock(self, *, timeout: Optional[float] = None, **kwargs: Any) -> datetime.datetime:
# pylint: disable=protected-access
"""Renew the session lock.

Expand All @@ -179,6 +184,8 @@ def renew_lock(self, *, timeout: Optional[float] = None) -> datetime.datetime:
:dedent: 4
:caption: Renew the session lock before it expires
"""
if kwargs:
warnings.warn(f"Unsupported keyword args: {kwargs}")
self._receiver._check_live() # pylint: disable=protected-access
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down
Loading