From 0d972bce373edcd57653ab11a2925fb9bef7c61c Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Thu, 14 Mar 2024 14:27:47 -0400 Subject: [PATCH 01/12] Add types to qos.py Signed-off-by: Michael Carlstrom --- rclpy/rclpy/duration.py | 6 +- rclpy/rclpy/qos.py | 183 ++++++++++++++++++++++++---------------- 2 files changed, 113 insertions(+), 76 deletions(-) diff --git a/rclpy/rclpy/duration.py b/rclpy/rclpy/duration.py index 1a93b4d88..c3356cb07 100644 --- a/rclpy/rclpy/duration.py +++ b/rclpy/rclpy/duration.py @@ -19,7 +19,7 @@ from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy -class DurationType(Protocol): +class DurationHandle(Protocol): """Type alias of _rclpy.rcl_duration_t.""" nanoseconds: int @@ -41,7 +41,7 @@ def __init__(self, *, seconds: Union[int, float] = 0, nanoseconds: int = 0): # pybind11 would raise TypeError, but we want OverflowError raise OverflowError( 'Total nanoseconds value is too large to store in C duration.') - self._duration_handle: DurationType = _rclpy.rcl_duration_t(total_nanoseconds) + self._duration_handle: DurationHandle = _rclpy.rcl_duration_t(total_nanoseconds) @property def nanoseconds(self) -> int: @@ -106,7 +106,7 @@ def from_msg(cls, msg: builtin_interfaces.msg.Duration) -> 'Duration': raise TypeError('Must pass a builtin_interfaces.msg.Duration object') return cls(seconds=msg.sec, nanoseconds=msg.nanosec) - def get_c_duration(self) -> DurationType: + def get_c_duration(self) -> DurationHandle: return self._duration_handle diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index a63523e2c..40faf8af8 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from enum import Enum -from enum import IntEnum -from typing import Union - +from enum import Enum, IntEnum +from typing import (Callable, Iterable, List, Optional, Protocol, Tuple, Type, + TypeAlias, TypeVar, Union) import warnings -from rclpy.duration import Duration +from rclpy.duration import Duration, DurationHandle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy @@ -42,7 +41,7 @@ class QoSPolicyKind(IntEnum): AVOID_ROS_NAMESPACE_CONVENTIONS = 1 << 9, -def qos_policy_name_from_kind(policy_kind: Union[QoSPolicyKind, int]): +def qos_policy_name_from_kind(policy_kind: Union[QoSPolicyKind, int]) -> str: """Get QoS policy name from QoSPolicyKind enum.""" return QoSPolicyKind(policy_kind).name @@ -50,8 +49,20 @@ def qos_policy_name_from_kind(policy_kind: Union[QoSPolicyKind, int]): class InvalidQoSProfileException(Exception): """Raised when constructing a QoSProfile with invalid arguments.""" - def __init__(self, *args): - Exception.__init__(self, 'Invalid QoSProfile', *args) + def __init__(self, message: str) -> None: + Exception.__init__(self, 'Invalid QoSProfile', *message) + + +class QoSProfileHandle(Protocol): + qos_history: Union['QoSHistoryPolicy', int] + qos_depth: int + qos_reliability: Union['QoSReliabilityPolicy', int] + qos_durability: Union['QoSDurabilityPolicy', int] + pyqos_lifespan: DurationHandle + pyqos_deadline: DurationHandle + qos_liveliness: Union['QoSLivelinessPolicy', int] + pyqos_liveliness_lease_duration: DurationHandle + avoid_ros_namespace_conventions: bool class QoSProfile: @@ -73,96 +84,121 @@ class QoSProfile: '_avoid_ros_namespace_conventions', ] - def __init__(self, **kwargs): - assert all('_' + key in self.__slots__ for key in kwargs.keys()), \ - 'Invalid arguments passed to constructor: %r' % kwargs.keys() + def __init__(self, history: Optional['QoSHistoryPolicy'], depth: Optional[int] = None, + reliability: Optional['QoSReliabilityPolicy'] = None, + durability: Optional['QoSDurabilityPolicy'] = None, + lifespan: Optional[Duration] = None, deadline: Optional[Duration] = None, + liveliness: Optional['QoSLivelinessPolicy'] = None, + liveliness_lease_duration: Optional[Duration] = None, + avoid_ros_namespace_conventions: Optional[bool] = None) -> None: - if 'history' not in kwargs: - if 'depth' not in kwargs: + if history is None: + if depth is None: raise InvalidQoSProfileException('History and/or depth settings are required.') - kwargs['history'] = QoSHistoryPolicy.KEEP_LAST + history = QoSHistoryPolicy.KEEP_LAST - self.history = kwargs.get('history') + self.history = history if ( - QoSHistoryPolicy.KEEP_LAST == self.history and - 'depth' not in kwargs + QoSHistoryPolicy.KEEP_LAST == self.history and depth is None ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') - self.depth = kwargs.get('depth', QoSProfile.__qos_profile_default_dict['depth']) - self.reliability = kwargs.get( - 'reliability', QoSProfile.__qos_profile_default_dict['reliability']) - self.durability = kwargs.get( - 'durability', QoSProfile.__qos_profile_default_dict['durability']) - self.lifespan = kwargs.get('lifespan', QoSProfile.__qos_profile_default_dict['lifespan']) - self.deadline = kwargs.get('deadline', QoSProfile.__qos_profile_default_dict['deadline']) - self.liveliness = kwargs.get( - 'liveliness', QoSProfile.__qos_profile_default_dict['liveliness']) - self.liveliness_lease_duration = kwargs.get( - 'liveliness_lease_duration', - QoSProfile.__qos_profile_default_dict['liveliness_lease_duration']) - self.avoid_ros_namespace_conventions = kwargs.get( - 'avoid_ros_namespace_conventions', - QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions']) + if depth is None: + self.depth = QoSProfile.__qos_profile_default_dict['depth'] + else: + self.depth = depth + + if reliability is None: + self.reliability = QoSProfile.__qos_profile_default_dict['reliability'] + else: + self.reliability = reliability + + if durability is None: + self.durability = QoSProfile.__qos_profile_default_dict['durability'] + else: + self.durability = durability + + if lifespan is None: + self.lifespan = QoSProfile.__qos_profile_default_dict['lifespan'] + else: + self.lifespan = lifespan + + if deadline is None: + self.deadline = QoSProfile.__qos_profile_default_dict['deadline'] + else: + self.deadline = deadline + + if liveliness is None: + self.liveliness = QoSProfile.__qos_profile_default_dict['liveliness'] + else: + self.liveliness = liveliness + + if liveliness_lease_duration is None: + self.liveliness_lease_duration = \ + QoSProfile.__qos_profile_default_dict['liveliness_lease_duration'] + else: + self.liveliness_lease_duration = liveliness_lease_duration + + if avoid_ros_namespace_conventions is None: + self.avoid_ros_namespace_conventions = \ + QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions'] + else: + self.avoid_ros_namespace_conventions = avoid_ros_namespace_conventions @property - def history(self): + def history(self) -> 'QoSHistoryPolicy': """ Get field 'history'. :returns: history attribute - :rtype: QoSHistoryPolicy """ return self._history @history.setter - def history(self, value): + def history(self, value: Union['QoSHistoryPolicy', int]) -> None: assert isinstance(value, QoSHistoryPolicy) or isinstance(value, int) self._history = QoSHistoryPolicy(value) @property - def reliability(self): + def reliability(self) -> 'QoSReliabilityPolicy': """ Get field 'reliability'. :returns: reliability attribute - :rtype: QoSReliabilityPolicy """ return self._reliability @reliability.setter - def reliability(self, value): + def reliability(self, value: Union['QoSReliabilityPolicy', int]) -> None: assert isinstance(value, QoSReliabilityPolicy) or isinstance(value, int) self._reliability = QoSReliabilityPolicy(value) @property - def durability(self): + def durability(self) -> 'QoSDurabilityPolicy': """ Get field 'durability'. :returns: durability attribute - :rtype: QoSDurabilityPolicy """ return self._durability @durability.setter - def durability(self, value): + def durability(self, value: Union['QoSDurabilityPolicy', int]) -> None: assert isinstance(value, QoSDurabilityPolicy) or isinstance(value, int) self._durability = QoSDurabilityPolicy(value) @property - def depth(self): + def depth(self) -> int: """ Get field 'depth'. :returns: depth attribute - :rtype: int """ return self._depth @depth.setter - def depth(self, value): + def depth(self, value: int) -> None: assert isinstance(value, int) if self.history == QoSHistoryPolicy.KEEP_LAST and value == 0: @@ -173,81 +209,76 @@ def depth(self, value): self._depth = value @property - def lifespan(self): + def lifespan(self) -> Duration: """ Get field 'lifespan'. :returns: lifespan attribute - :rtype: Duration """ return self._lifespan @lifespan.setter - def lifespan(self, value): + def lifespan(self, value: Duration) -> None: assert isinstance(value, Duration) self._lifespan = value @property - def deadline(self): + def deadline(self) -> Duration: """ Get field 'deadline'. :returns: deadline attribute. - :rtype: Duration """ return self._deadline @deadline.setter - def deadline(self, value): + def deadline(self, value: Duration) -> None: assert isinstance(value, Duration) self._deadline = value @property - def liveliness(self): + def liveliness(self) -> 'QoSLivelinessPolicy': """ Get field 'liveliness'. :returns: liveliness attribute - :rtype: QoSLivelinessPolicy """ return self._liveliness @liveliness.setter - def liveliness(self, value): + def liveliness(self, value: Union['QoSLivelinessPolicy', int]) -> None: assert isinstance(value, (QoSLivelinessPolicy, int)) self._liveliness = QoSLivelinessPolicy(value) @property - def liveliness_lease_duration(self): + def liveliness_lease_duration(self) -> Duration: """ Get field 'liveliness_lease_duration'. :returns: liveliness_lease_duration attribute. - :rtype: Duration """ return self._liveliness_lease_duration @liveliness_lease_duration.setter - def liveliness_lease_duration(self, value): + def liveliness_lease_duration(self, value: Duration) -> None: assert isinstance(value, Duration) self._liveliness_lease_duration = value @property - def avoid_ros_namespace_conventions(self): + def avoid_ros_namespace_conventions(self) -> bool: """ Get field 'avoid_ros_namespace_conventions'. :returns: avoid_ros_namespace_conventions attribute - :rtype: bool """ return self._avoid_ros_namespace_conventions @avoid_ros_namespace_conventions.setter - def avoid_ros_namespace_conventions(self, value): + def avoid_ros_namespace_conventions(self, value: bool) -> None: assert isinstance(value, bool) self._avoid_ros_namespace_conventions = value - def get_c_qos_profile(self): + def get_c_qos_profile(self) -> QoSProfileHandle: return _rclpy.rmw_qos_profile_t( self.history, self.depth, @@ -260,14 +291,14 @@ def get_c_qos_profile(self): self.avoid_ros_namespace_conventions, ) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: if not isinstance(other, QoSProfile): return False return all( self.__getattribute__(slot) == other.__getattribute__(slot) for slot in self.__slots__) - def __str__(self): + def __str__(self) -> str: return f'{type(self).__name__}(%s)' % ( ', '.join(f'{slot[1:]}=%s' % getattr(self, slot) for slot in self.__slots__) ) @@ -281,17 +312,17 @@ class QoSPolicyEnum(IntEnum): """ @classmethod - def short_keys(cls): + def short_keys(cls) -> List[str]: """Return a list of shortened typing-friendly enum values.""" return [k.lower() for k in cls.__members__.keys() if not k.startswith('RMW')] @classmethod - def get_from_short_key(cls, name): + def get_from_short_key(cls, name: str) -> int: """Retrieve a policy type from a short name, case-insensitive.""" return cls[name.upper()].value @property - def short_key(self): + def short_key(self) -> str: for k, v in self.__class__.__members__.items(): if k.startswith('RMW'): continue @@ -302,14 +333,18 @@ def short_key(self): (self.value, self.__class__.__name__)) +_T = TypeVar('_T', bound=QoSPolicyEnum) + + class _DeprecatedPolicyValueAlias: """Helper to deprecate a policy value.""" - def __init__(self, replacement_name, deprecated_name): + def __init__(self, replacement_name: str, deprecated_name: str) -> None: self.replacement_name = replacement_name self.deprecated_name = deprecated_name - def __get__(self, obj, policy_cls): + def __get__(self, obj: object, + policy_cls: Type[_T]) -> _T: warnings.warn( f'{policy_cls.__name__}.{self.deprecated_name} is deprecated. ' f'Use {policy_cls.__name__}.{self.replacement_name} instead.' @@ -317,8 +352,9 @@ def __get__(self, obj, policy_cls): return policy_cls[self.replacement_name] -def _deprecated_policy_value_aliases(pairs): - def decorator(policy_cls): +def _deprecated_policy_value_aliases(pairs: Iterable[Tuple[str, str]]) \ + -> Callable[[Type[_T]], Type[_T]]: + def decorator(policy_cls: Type[_T]) -> Type[_T]: for deprecated_name, replacement_name in pairs: setattr( policy_cls, @@ -496,20 +532,21 @@ class QoSPresetProfiles(Enum): Our supported version of Python3 (3.5) doesn't have a fix that allows mixins on Enum. """ @classmethod - def short_keys(cls): + def short_keys(cls) -> List[str]: """Return a list of shortened typing-friendly enum values.""" return [k.lower() for k in cls.__members__.keys() if not k.startswith('RMW')] @classmethod - def get_from_short_key(cls, name): + def get_from_short_key(cls, name: str) -> QoSProfile: """Retrieve a policy type from a short name, case-insensitive.""" return cls[name.upper()].value -QoSCompatibility = _rclpy.QoSCompatibility +QoSCompatibility: TypeAlias = _rclpy.QoSCompatibility -def qos_check_compatible(publisher_qos: QoSProfile, subscription_qos: QoSProfile): +def qos_check_compatible(publisher_qos: QoSProfile, + subscription_qos: QoSProfile) -> Tuple[QoSCompatibility, str]: """ Check if two QoS profiles are compatible. From ddc5a3c89fd2a043502472b571115ee97b69519a Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Thu, 14 Mar 2024 14:39:02 -0400 Subject: [PATCH 02/12] Add missing default value Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 40faf8af8..25979252e 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -84,7 +84,7 @@ class QoSProfile: '_avoid_ros_namespace_conventions', ] - def __init__(self, history: Optional['QoSHistoryPolicy'], depth: Optional[int] = None, + def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional[int] = None, reliability: Optional['QoSReliabilityPolicy'] = None, durability: Optional['QoSDurabilityPolicy'] = None, lifespan: Optional[Duration] = None, deadline: Optional[Duration] = None, From 8946545e5757c1d0aacb041113fee40ae13805f6 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Thu, 14 Mar 2024 17:01:29 -0700 Subject: [PATCH 03/12] TypeAlias 3.8 Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 25979252e..39124fccf 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -14,12 +14,14 @@ from enum import Enum, IntEnum from typing import (Callable, Iterable, List, Optional, Protocol, Tuple, Type, - TypeAlias, TypeVar, Union) + TypeVar, TYPE_CHECKING, Union) import warnings from rclpy.duration import Duration, DurationHandle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +if TYPE_CHECKING: + from typing import TypeAlias class QoSPolicyKind(IntEnum): """ @@ -542,7 +544,7 @@ def get_from_short_key(cls, name: str) -> QoSProfile: return cls[name.upper()].value -QoSCompatibility: TypeAlias = _rclpy.QoSCompatibility +QoSCompatibility: 'TypeAlias' = _rclpy.QoSCompatibility def qos_check_compatible(publisher_qos: QoSProfile, From 58e2917292d02f51beaa6a8cbb446964a2570373 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 09:33:07 -0400 Subject: [PATCH 04/12] Flake8 fixes and simplification Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 63 +++++++++++++++------------------------------- 1 file changed, 20 insertions(+), 43 deletions(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 39124fccf..5a9d792ae 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -13,8 +13,8 @@ # limitations under the License. from enum import Enum, IntEnum -from typing import (Callable, Iterable, List, Optional, Protocol, Tuple, Type, - TypeVar, TYPE_CHECKING, Union) +from typing import (Callable, Iterable, List, Optional, Protocol, Tuple, Type, TYPE_CHECKING, + TypeVar, Union) import warnings from rclpy.duration import Duration, DurationHandle @@ -23,6 +23,7 @@ if TYPE_CHECKING: from typing import TypeAlias + class QoSPolicyKind(IntEnum): """ Enum for types of QoS policies that a Publisher or Subscription can set. @@ -106,47 +107,23 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') - if depth is None: - self.depth = QoSProfile.__qos_profile_default_dict['depth'] - else: - self.depth = depth - - if reliability is None: - self.reliability = QoSProfile.__qos_profile_default_dict['reliability'] - else: - self.reliability = reliability - - if durability is None: - self.durability = QoSProfile.__qos_profile_default_dict['durability'] - else: - self.durability = durability - - if lifespan is None: - self.lifespan = QoSProfile.__qos_profile_default_dict['lifespan'] - else: - self.lifespan = lifespan - - if deadline is None: - self.deadline = QoSProfile.__qos_profile_default_dict['deadline'] - else: - self.deadline = deadline - - if liveliness is None: - self.liveliness = QoSProfile.__qos_profile_default_dict['liveliness'] - else: - self.liveliness = liveliness - - if liveliness_lease_duration is None: - self.liveliness_lease_duration = \ - QoSProfile.__qos_profile_default_dict['liveliness_lease_duration'] - else: - self.liveliness_lease_duration = liveliness_lease_duration - - if avoid_ros_namespace_conventions is None: - self.avoid_ros_namespace_conventions = \ - QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions'] - else: - self.avoid_ros_namespace_conventions = avoid_ros_namespace_conventions + self.depth = depth or QoSProfile.__qos_profile_default_dict['depth'] + + self.reliability = reliability or QoSProfile.__qos_profile_default_dict['reliability'] + + self.durability = durability or QoSProfile.__qos_profile_default_dict['durability'] + + self.lifespan = lifespan or QoSProfile.__qos_profile_default_dict['lifespan'] + + self.deadline = deadline or QoSProfile.__qos_profile_default_dict['deadline'] + + self.liveliness = liveliness or QoSProfile.__qos_profile_default_dict['liveliness'] + + self.liveliness_lease_duration = liveliness_lease_duration or \ + QoSProfile.__qos_profile_default_dict['liveliness_lease_duration'] + + self.avoid_ros_namespace_conventions = avoid_ros_namespace_conventions or \ + QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions'] @property def history(self) -> 'QoSHistoryPolicy': From 3e28663a7488b44e8bcd1cb1eacbf2c5b0716b52 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 09:43:20 -0400 Subject: [PATCH 05/12] Fix depth=0 error Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 5a9d792ae..6462d307c 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -87,7 +87,7 @@ class QoSProfile: '_avoid_ros_namespace_conventions', ] - def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional[int] = None, + def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: int = 0, reliability: Optional['QoSReliabilityPolicy'] = None, durability: Optional['QoSDurabilityPolicy'] = None, lifespan: Optional[Duration] = None, deadline: Optional[Duration] = None, @@ -96,7 +96,7 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional avoid_ros_namespace_conventions: Optional[bool] = None) -> None: if history is None: - if depth is None: + if depth == 0: raise InvalidQoSProfileException('History and/or depth settings are required.') history = QoSHistoryPolicy.KEEP_LAST From 2fdf0ec0066f941ea2c7fea8edcc13521e0aa764 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 09:51:13 -0400 Subject: [PATCH 06/12] Fix depth=0 error Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 6462d307c..59ed62114 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -87,7 +87,7 @@ class QoSProfile: '_avoid_ros_namespace_conventions', ] - def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: int = 0, + def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional[int] = None, reliability: Optional['QoSReliabilityPolicy'] = None, durability: Optional['QoSDurabilityPolicy'] = None, lifespan: Optional[Duration] = None, deadline: Optional[Duration] = None, @@ -96,14 +96,14 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: int = 0, avoid_ros_namespace_conventions: Optional[bool] = None) -> None: if history is None: - if depth == 0: + if depth is None: raise InvalidQoSProfileException('History and/or depth settings are required.') history = QoSHistoryPolicy.KEEP_LAST self.history = history if ( - QoSHistoryPolicy.KEEP_LAST == self.history and depth is None + QoSHistoryPolicy.KEEP_LAST == self.history and depth == 0 ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') From b205e770e773a5334df33b0db52138250c689a2f Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 10:01:22 -0400 Subject: [PATCH 07/12] combine error message Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 59ed62114..8143bf891 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -53,7 +53,7 @@ class InvalidQoSProfileException(Exception): """Raised when constructing a QoSProfile with invalid arguments.""" def __init__(self, message: str) -> None: - Exception.__init__(self, 'Invalid QoSProfile', *message) + Exception.__init__(self, f'Invalid QoSProfile: {message}') class QoSProfileHandle(Protocol): From b13be6161072ac9a4baaaa6301146d324d6e735d Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 10:02:54 -0400 Subject: [PATCH 08/12] revert to is None Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 8143bf891..bec2116b0 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -103,7 +103,7 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional self.history = history if ( - QoSHistoryPolicy.KEEP_LAST == self.history and depth == 0 + QoSHistoryPolicy.KEEP_LAST == self.history and depth is None ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') From f7ec33a495be1403d731bfccc580dfdcbb850718 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 10:03:01 -0400 Subject: [PATCH 09/12] revert to is None Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index bec2116b0..1be26570e 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -53,7 +53,7 @@ class InvalidQoSProfileException(Exception): """Raised when constructing a QoSProfile with invalid arguments.""" def __init__(self, message: str) -> None: - Exception.__init__(self, f'Invalid QoSProfile: {message}') + Exception(self, f'Invalid QoSProfile: {message}') class QoSProfileHandle(Protocol): From d2b59d731dcbbbe042fb28f69ca34d2dc823d953 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 10:14:48 -0400 Subject: [PATCH 10/12] Fix 0 falsey Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 1be26570e..8824307d4 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -107,7 +107,10 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') - self.depth = depth or QoSProfile.__qos_profile_default_dict['depth'] + if depth: + self.depth = depth + else: + self.depth = QoSProfile.__qos_profile_default_dict['depth'] self.reliability = reliability or QoSProfile.__qos_profile_default_dict['reliability'] From 15fb2d34ae983b81e984bfeb6cd43cded875cd68 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Wed, 31 Jul 2024 10:21:27 -0400 Subject: [PATCH 11/12] flip self.depth Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 8824307d4..05612c66c 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -107,10 +107,10 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') - if depth: - self.depth = depth - else: + if depth is None: self.depth = QoSProfile.__qos_profile_default_dict['depth'] + else: + self.depth = depth self.reliability = reliability or QoSProfile.__qos_profile_default_dict['reliability'] From 973ad8c06f7b2ae7eef062d552717afa87ee796f Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Fri, 2 Aug 2024 18:48:36 -0400 Subject: [PATCH 12/12] replace or with ternary Signed-off-by: Michael Carlstrom --- rclpy/rclpy/qos.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/rclpy/rclpy/qos.py b/rclpy/rclpy/qos.py index 05612c66c..01fa80a55 100644 --- a/rclpy/rclpy/qos.py +++ b/rclpy/rclpy/qos.py @@ -107,26 +107,30 @@ def __init__(self, history: Optional['QoSHistoryPolicy'] = None, depth: Optional ): raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.') - if depth is None: - self.depth = QoSProfile.__qos_profile_default_dict['depth'] - else: - self.depth = depth + self.depth = QoSProfile.__qos_profile_default_dict['depth'] if depth is None else depth - self.reliability = reliability or QoSProfile.__qos_profile_default_dict['reliability'] + self.reliability = QoSProfile.__qos_profile_default_dict['reliability'] \ + if reliability is None else reliability - self.durability = durability or QoSProfile.__qos_profile_default_dict['durability'] + self.durability = QoSProfile.__qos_profile_default_dict['durability'] \ + if durability is None else durability - self.lifespan = lifespan or QoSProfile.__qos_profile_default_dict['lifespan'] + self.lifespan = QoSProfile.__qos_profile_default_dict['lifespan'] \ + if lifespan is None else lifespan - self.deadline = deadline or QoSProfile.__qos_profile_default_dict['deadline'] + self.deadline = QoSProfile.__qos_profile_default_dict['deadline'] \ + if deadline is None else deadline - self.liveliness = liveliness or QoSProfile.__qos_profile_default_dict['liveliness'] + self.liveliness = QoSProfile.__qos_profile_default_dict['liveliness'] \ + if liveliness is None else liveliness - self.liveliness_lease_duration = liveliness_lease_duration or \ - QoSProfile.__qos_profile_default_dict['liveliness_lease_duration'] + self.liveliness_lease_duration = \ + QoSProfile.__qos_profile_default_dict['liveliness_lease_duration'] \ + if liveliness_lease_duration is None else liveliness_lease_duration - self.avoid_ros_namespace_conventions = avoid_ros_namespace_conventions or \ - QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions'] + self.avoid_ros_namespace_conventions = \ + QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions'] \ + if avoid_ros_namespace_conventions is None else avoid_ros_namespace_conventions @property def history(self) -> 'QoSHistoryPolicy':