Skip to content

Commit

Permalink
Adds types to Lifecycle Objects (#1338)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <rmc@carlstrom.com>
  • Loading branch information
InvincibleRMC authored Oct 4, 2024
1 parent 1dd3b21 commit 2f9a771
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 56 deletions.
97 changes: 83 additions & 14 deletions rclpy/rclpy/impl/_rclpy_pybind11.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ from __future__ import annotations

from enum import Enum, IntEnum
from types import TracebackType
from typing import Any, Generic, Literal, overload, Sequence, TypedDict
from typing import Any, Generic, Literal, overload, Sequence, TypeAlias, TypedDict

from rclpy.clock import JumpHandle
from rclpy.clock_type import ClockType
Expand Down Expand Up @@ -101,21 +101,23 @@ class rcl_duration_t:
nanoseconds: int


class rcl_subscription_event_type_t(IntEnum):
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED: int
RCL_SUBSCRIPTION_LIVELINESS_CHANGED: int
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS: int
RCL_SUBSCRIPTION_MESSAGE_LOST: int
RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE: int
RCL_SUBSCRIPTION_MATCHED: int
class rcl_subscription_event_type_t(Enum):
_value_: int
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = ...
RCL_SUBSCRIPTION_LIVELINESS_CHANGED = ...
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS = ...
RCL_SUBSCRIPTION_MESSAGE_LOST = ...
RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE = ...
RCL_SUBSCRIPTION_MATCHED = ...


class rcl_publisher_event_type_t(IntEnum):
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED: int
RCL_PUBLISHER_LIVELINESS_LOST: int
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS: int
RCL_PUBLISHER_INCOMPATIBLE_TYPE: int
RCL_PUBLISHER_MATCHED: int
class rcl_publisher_event_type_t(Enum):
_value_: int
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = ...
RCL_PUBLISHER_LIVELINESS_LOST = ...
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS = ...
RCL_PUBLISHER_INCOMPATIBLE_TYPE = ...
RCL_PUBLISHER_MATCHED = ...


class EventHandle(Destroyable):
Expand All @@ -135,6 +137,73 @@ class EventHandle(Destroyable):
"""Get pending data from a ready event."""


LifecycleStateMachineState: TypeAlias = tuple[int, str]


class LifecycleStateMachine(Destroyable):

def __init__(self, node: Node, enable_com_interface: bool) -> None: ...

@property
def initialized(self) -> bool:
"""Check if state machine is initialized."""

@property
def current_state(self) -> LifecycleStateMachineState:
"""Get the current state machine state."""

@property
def available_states(self) -> list[LifecycleStateMachineState]:
"""Get the available states."""

@property
def available_transitions(self) -> list[tuple[int, str, int, str, int, str]]:
"""Get the available transitions."""

@property
def transition_graph(self) -> list[tuple[int, str, int, str, int, str]]:
"""Get the transition graph."""

def get_transition_by_label(self, label: str) -> int:
"""Get the transition id from a transition label."""

def trigger_transition_by_id(self, transition_id: int, publish_update: bool) -> None:
"""Trigger a transition by transition id."""

def trigger_transition_by_label(self, label: str, publish_update: bool) -> None:
"""Trigger a transition by label."""

@property
def service_change_state(self) -> Service:
"""Get the change state service."""

@property
def service_get_state(self) -> Service:
"""Get the get state service."""

@property
def service_get_available_states(self) -> Service:
"""Get the get available states service."""

@property
def service_get_available_transitions(self) -> Service:
"""Get the get available transitions service."""

@property
def service_get_transition_graph(self) -> Service:
"""Get the get transition graph service."""


class TransitionCallbackReturnType(Enum):
_value_: int
SUCCESS = ...
FAILURE = ...
ERROR = ...

def to_label(self) -> str:
"""Convert the transition callback return code to a transition label."""


class GuardCondition(Destroyable):

def __init__(self, context: Context) -> None: ...
Expand Down
50 changes: 36 additions & 14 deletions rclpy/rclpy/lifecycle/managed_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,85 @@
# limitations under the License.

from functools import wraps
from typing import Any, Callable, Dict, List, Optional, overload, TYPE_CHECKING, Union

from ..impl.implementation_singleton import rclpy_implementation as _rclpy

if TYPE_CHECKING:
from typing import TypeAlias
from rclpy.lifecycle.node import LifecycleState

TransitionCallbackReturn = _rclpy.TransitionCallbackReturnType

TransitionCallbackReturn: 'TypeAlias' = _rclpy.TransitionCallbackReturnType


class ManagedEntity:

def on_configure(self, state) -> TransitionCallbackReturn:
def on_configure(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle configure transition request."""
return TransitionCallbackReturn.SUCCESS

def on_cleanup(self, state) -> TransitionCallbackReturn:
def on_cleanup(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle cleanup transition request."""
return TransitionCallbackReturn.SUCCESS

def on_shutdown(self, state) -> TransitionCallbackReturn:
def on_shutdown(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle shutdown transition request."""
return TransitionCallbackReturn.SUCCESS

def on_activate(self, state) -> TransitionCallbackReturn:
def on_activate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle activate transition request."""
return TransitionCallbackReturn.SUCCESS

def on_deactivate(self, state) -> TransitionCallbackReturn:
def on_deactivate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle deactivate transition request."""
return TransitionCallbackReturn.SUCCESS

def on_error(self, state) -> TransitionCallbackReturn:
def on_error(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle error transition request."""
return TransitionCallbackReturn.SUCCESS


class SimpleManagedEntity(ManagedEntity):
"""A simple managed entity that only sets a flag when activated/deactivated."""

def __init__(self):
def __init__(self) -> None:
self._enabled = False

def on_activate(self, state) -> TransitionCallbackReturn:
def on_activate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
self._enabled = True
return TransitionCallbackReturn.SUCCESS

def on_deactivate(self, state) -> TransitionCallbackReturn:
def on_deactivate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
self._enabled = False
return TransitionCallbackReturn.SUCCESS

@property
def is_activated(self):
def is_activated(self) -> bool:
return self._enabled

@staticmethod
def when_enabled(wrapped=None, *, when_not_enabled=None):
def decorator(wrapped):
@overload
def when_enabled(wrapped: None, *,
when_not_enabled: Optional[Callable[..., None]] = None
) -> Callable[[Callable[..., None]], Callable[..., None]]: ...

@staticmethod
@overload
def when_enabled(wrapped: Callable[..., None], *,
when_not_enabled: Optional[Callable[..., None]] = None
) -> Callable[..., None]: ...

@staticmethod
def when_enabled(wrapped: Optional[Callable[..., None]] = None, *,
when_not_enabled: Optional[Callable[..., None]] = None) -> Union[
Callable[..., None],
Callable[[Callable[..., None]], Callable[..., None]]
]:
def decorator(wrapped: Callable[..., None]) -> Callable[..., None]:
@wraps(wrapped)
def only_when_enabled_wrapper(self: SimpleManagedEntity, *args, **kwargs):
def only_when_enabled_wrapper(self: SimpleManagedEntity, *args: List[Any],
**kwargs: Dict[str, Any]) -> None:
if not self._enabled:
if when_not_enabled is not None:
when_not_enabled()
Expand Down
Loading

0 comments on commit 2f9a771

Please sign in to comment.