Skip to content

Commit

Permalink
Add types to timer.py
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <rmc@carlstrom.com>
  • Loading branch information
InvincibleRMC committed Apr 5, 2024
1 parent 7d52cfa commit bac0ce3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
2 changes: 1 addition & 1 deletion rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ def create_service(
def create_timer(
self,
timer_period_sec: float,
callback: Callable,
callback: Optional[Callable[[], None]],
callback_group: Optional[CallbackGroup] = None,
clock: Optional[Clock] = None,
autostart: bool = True,
Expand Down
72 changes: 53 additions & 19 deletions rclpy/rclpy/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,52 @@

from typing import Callable
from typing import Optional
from typing import Protocol

from rclpy.callback_groups import CallbackGroup
from rclpy.clock import Clock
from rclpy.context import Context
from rclpy.destroyable import DestroyableType
from rclpy.exceptions import InvalidHandle, ROSInterruptException
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.utilities import get_default_context


class TimerHandle(DestroyableType, Protocol):

def reset_timer(self) -> None:
...

def is_timer_ready(self) -> bool:
...

def call_timer(self) -> None:
...

def change_timer_period(self, period_nsec: int) -> None:
...

def time_until_next_call(self) -> Optional[int]:
...

def time_since_last_call(self) -> int:
...

def get_timer_period(self) -> int:
...

def cancel_timer(self) -> None:
...

def is_timer_canceled(self) -> bool:
...


class Timer:

def __init__(
self,
callback: Callable,
callback: Optional[Callable[[], None]],
callback_group: CallbackGroup,
timer_period_ns: int,
clock: Clock,
Expand All @@ -56,8 +88,10 @@ def __init__(
"""
self._context = get_default_context() if context is None else context
self._clock = clock
if self._context.handle is None:
raise RuntimeError('Context must be initialized before create a TimerHandle.')
with self._clock.handle, self._context.handle:
self.__timer = _rclpy.Timer(
self.__timer: TimerHandle = _rclpy.Timer(
self._clock.handle, self._context.handle, timer_period_ns, autostart)
self.timer_period_ns = timer_period_ns
self.callback = callback
Expand All @@ -66,59 +100,59 @@ def __init__(
self._executor_event = False

@property
def handle(self):
def handle(self) -> TimerHandle:
return self.__timer

def destroy(self):
def destroy(self) -> None:
self.__timer.destroy_when_not_in_use()

@property
def clock(self):
def clock(self) -> Clock:
return self._clock

@property
def timer_period_ns(self):
def timer_period_ns(self) -> int:
with self.__timer:
val = self.__timer.get_timer_period()
self.__timer_period_ns = val
return val

@timer_period_ns.setter
def timer_period_ns(self, value):
def timer_period_ns(self, value: int) -> None:
val = int(value)
with self.__timer:
self.__timer.change_timer_period(val)
self.__timer_period_ns = val

def is_ready(self):
def is_ready(self) -> bool:
with self.__timer:
return self.__timer.is_timer_ready()

def is_canceled(self):
def is_canceled(self) -> bool:
with self.__timer:
return self.__timer.is_timer_canceled()

def cancel(self):
def cancel(self) -> None:
with self.__timer:
self.__timer.cancel_timer()

def reset(self):
def reset(self) -> None:
with self.__timer:
self.__timer.reset_timer()

def time_since_last_call(self):
def time_since_last_call(self) -> int:
with self.__timer:
return self.__timer.time_since_last_call()

def time_until_next_call(self):
def time_until_next_call(self) -> Optional[int]:
with self.__timer:
return self.__timer.time_until_next_call()


class Rate:
"""A utility for sleeping at a fixed rate."""

def __init__(self, timer: Timer, *, context):
def __init__(self, timer: Timer, *, context: Context):
# Rate is a wrapper around a timer
self._timer = timer
self._is_shutdown = False
Expand All @@ -135,15 +169,15 @@ def __init__(self, timer: Timer, *, context):
# Set event when ROS is shutdown
context.on_shutdown(self._on_shutdown)

def _on_shutdown(self):
def _on_shutdown(self) -> None:
self._is_shutdown = True
self.destroy()

def destroy(self):
def destroy(self) -> None:
self._is_destroyed = True
self._event.set()

def _presleep(self):
def _presleep(self) -> None:
if self._is_shutdown:
raise ROSInterruptException()
if self._is_destroyed:
Expand All @@ -154,7 +188,7 @@ def _presleep(self):
with self._lock:
self._num_sleepers += 1

def _postsleep(self):
def _postsleep(self) -> None:
with self._lock:
self._num_sleepers -= 1
if self._num_sleepers == 0:
Expand All @@ -163,7 +197,7 @@ def _postsleep(self):
self.destroy()
raise ROSInterruptException()

def sleep(self):
def sleep(self) -> None:
"""
Block until timer triggers.
Expand Down

0 comments on commit bac0ce3

Please sign in to comment.