Skip to content

Commit

Permalink
API and implementation for Liveliness and Deadline QoS event callbacks
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Apr 25, 2019
1 parent 616e853 commit 636a454
Show file tree
Hide file tree
Showing 10 changed files with 889 additions and 21 deletions.
1 change: 1 addition & 0 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ def _wait_for_ready_callbacks(
entity_count.num_timers,
entity_count.num_clients,
entity_count.num_services,
entity_count.num_events,
self._context.handle)

entities = {
Expand Down
36 changes: 30 additions & 6 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from rclpy.qos import qos_profile_parameter_events
from rclpy.qos import qos_profile_services_default
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.service import Service
from rclpy.subscription import Subscription
from rclpy.time_source import TimeSource
Expand Down Expand Up @@ -349,16 +351,23 @@ def create_publisher(
msg_type,
topic: str,
*,
qos_profile: QoSProfile = qos_profile_default
qos_profile: QoSProfile = qos_profile_default,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[PublisherEventCallbacks] = None,
) -> Publisher:
"""
Create a new publisher.
:param msg_type: The type of ROS messages the publisher will publish.
:param topic: The name of the topic the publisher will publish to.
:param qos_profile: The quality of service profile to apply to the publisher.
:param callback_group: The callback group for the publisher's event handlers.
If ``None``, then the node's default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:return: The new publisher.
"""
callback_group = callback_group or self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -369,8 +378,16 @@ def create_publisher(
failed = True
if failed:
self._validate_topic_or_service_name(topic)
publisher = Publisher(publisher_handle, msg_type, topic, qos_profile, self.handle)

publisher = Publisher(
publisher_handle, msg_type, topic, qos_profile, self.handle,
event_callbacks=event_callbacks or PublisherEventCallbacks(),
callback_group=callback_group)
self.publishers.append(publisher)

for event_callback in publisher.event_handlers:
self.add_waitable(event_callback)

return publisher

def create_subscription(
Expand All @@ -380,7 +397,8 @@ def create_subscription(
callback: Callable[[MsgType], None],
*,
qos_profile: QoSProfile = qos_profile_default,
callback_group: CallbackGroup = None,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
raw: bool = False
) -> Subscription:
"""
Expand All @@ -393,11 +411,12 @@ def create_subscription(
:param qos_profile: The quality of service profile to apply to the subscription.
:param callback_group: The callback group for the subscription. If ``None``, then the
nodes default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
"""
if callback_group is None:
callback_group = self.default_callback_group
callback_group = callback_group or self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -413,9 +432,14 @@ def create_subscription(

subscription = Subscription(
subscription_handle, msg_type,
topic, callback, callback_group, qos_profile, self.handle, raw)
topic, callback, callback_group, qos_profile, self.handle, raw,
event_callbacks=event_callbacks or SubscriptionEventCallbacks())
self.subscriptions.append(subscription)
callback_group.add_entity(subscription)

for event_handler in subscription.event_handlers:
self.add_waitable(event_handler)

return subscription

def create_client(
Expand Down
11 changes: 10 additions & 1 deletion rclpy/rclpy/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from typing import TypeVar

from rclpy.callback_groups import CallbackGroup
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks

MsgType = TypeVar('MsgType')

Expand All @@ -28,7 +30,9 @@ def __init__(
msg_type: MsgType,
topic: str,
qos_profile: QoSProfile,
node_handle
node_handle,
event_callbacks: PublisherEventCallbacks,
callback_group: CallbackGroup,
) -> None:
"""
Create a container for a ROS publisher.
Expand All @@ -51,6 +55,11 @@ def __init__(
self.topic = topic
self.qos_profile = qos_profile
self.node_handle = node_handle
self.callback_group = callback_group

self.event_callbacks = event_callbacks
self.event_handlers = event_callbacks.create_event_handlers(
callback_group, publisher_handle)

def publish(self, msg: MsgType) -> None:
"""
Expand Down
224 changes: 224 additions & 0 deletions rclpy/rclpy/qos_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import IntEnum
from typing import Callable
from typing import List
from typing import NamedTuple

from rclpy.callback_groups import CallbackGroup
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.waitable import NumberOfEntities
from rclpy.waitable import Waitable


class QoSPublisherEventType(IntEnum):
"""
Enum for types of QoS events that a Publisher can receive.
This enum matches the one defined in rcl/event.h
"""

RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = 0
RCL_PUBLISHER_LIVELINESS_LOST = 1


class QoSSubscriptionEventType(IntEnum):
"""
Enum for types of QoS events that a Subscription can receive.
This enum matches the one defined in rcl/event.h
"""

RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = 0
RCL_SUBSCRIPTION_LIVELINESS_CHANGED = 1


"""
Payload type for Subscription Deadline callback.
Mirrors rmw_requested_deadline_missed_status_t from rmw/types.h
"""
QoSRequestedDeadlineMissedInfo = NamedTuple(
'QoSRequestedDeadlineMissedInfo', [
('total_count', 'int'),
('total_count_change', 'int'),
])

"""
Payload type for Subscription Liveliness callback.
Mirrors rmw_liveliness_changed_status_t from rmw/types.h
"""
QoSLivelinessChangedInfo = NamedTuple(
'QoSLivelinessChangedInfo', [
('alive_count', 'int'),
('not_alive_count', 'int'),
('alive_count_change', 'int'),
('not_alive_count_change', 'int'),
])

"""
Payload type for Publisher Deadline callback.
Mirrors rmw_offered_deadline_missed_status_t from rmw/types.h
"""
QoSOfferedDeadlineMissedInfo = NamedTuple(
'QoSOfferedDeadlineMissedInfo', [
('total_count', 'int'),
('total_count_change', 'int'),
])

"""
Payload type for Publisher Liveliness callback.
Mirrors rmw_liveliness_lost_status_t from rmw/types.h
"""
QoSLivelinessLostInfo = NamedTuple(
'QoSLivelinessLostInfo', [
('total_count', 'int'),
('total_count_change', 'int'),
])


class QoSEventHandler(Waitable):
"""Waitable type to handle QoS events."""

def __init__(
self,
*,
callback_group,
callback,
event_type,
parent_handle,
):
# Waitable init adds self to callback_group
super().__init__(callback_group)
self.event_type = event_type
self.callback = callback

self._parent_handle = parent_handle
self._event_handle = _rclpy.rclpy_create_event(event_type, parent_handle)
self._ready_to_take_data = False
self._event_index = None

# Start Waitable API
def is_ready(self, wait_set):
"""Return True if entities are ready in the wait set."""
if self._event_index is None:
return False
if _rclpy.rclpy_wait_set_is_ready('event', wait_set, self._event_index):
self._ready_to_take_data = True
return self._ready_to_take_data

def take_data(self):
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
if self._ready_to_take_data:
self._ready_to_take_data = False
return _rclpy.rclpy_take_event(
self._event_handle, self._parent_handle, self.event_type)
return None

async def execute(self, taken_data):
"""Execute work after data has been taken from a ready wait set."""
if not taken_data:
return
self.callback(taken_data)

def get_num_entities(self):
"""Return number of each type of entity used."""
return NumberOfEntities(num_events=1)

def add_to_wait_set(self, wait_set):
"""Add entites to wait set."""
self._event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, self._event_handle)
# End Waitable API


class SubscriptionEventCallbacks:
"""Container to provide middleware event callbacks for a Subscription."""

def __init__(
self,
*,
deadline: Callable[[QoSRequestedDeadlineMissedInfo], None] = None,
liveliness: Callable[[QoSLivelinessChangedInfo], None] = None,
) -> None:
"""
Constructor.
:param deadline: A user-defined callback that is called when a topic misses our
requested Deadline.
:param liveliness: A user-defined callback that is called when the Liveliness of
a Publisher on subscribed topic changes.
"""
self.deadline = deadline
self.liveliness = liveliness

def create_event_handlers(
self, callback_group: CallbackGroup, subscription_handle
) -> List[QoSEventHandler]:
event_handlers = []
if self.deadline:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.deadline,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED,
parent_handle=subscription_handle))
if self.liveliness:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.liveliness,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED,
parent_handle=subscription_handle))
return event_handlers


class PublisherEventCallbacks:
"""Container to provide middleware event callbacks for a Publisher."""

def __init__(
self,
*,
deadline: Callable[[QoSOfferedDeadlineMissedInfo], None] = None,
liveliness: Callable[[QoSLivelinessLostInfo], None] = None
) -> None:
"""
Constructor.
:param deadline: A user-defined callback that is called when the Publisher misses
its offered Deadline.
:param liveliness: A user-defined callback that is called when this Publisher
fails to signal its Liveliness and is reported as not-alive.
"""
self.deadline = deadline
self.liveliness = liveliness

def create_event_handlers(
self, callback_group: CallbackGroup, publisher_handle
) -> List[QoSEventHandler]:
event_handlers = []
if self.deadline:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.deadline,
event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED,
parent_handle=publisher_handle))
if self.liveliness:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.liveliness,
event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST,
parent_handle=publisher_handle))
return event_handlers
9 changes: 8 additions & 1 deletion rclpy/rclpy/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from rclpy.callback_groups import CallbackGroup
from rclpy.qos import QoSProfile
from rclpy.qos_event import SubscriptionEventCallbacks


# For documentation only
MsgType = TypeVar('MsgType')
Expand All @@ -33,7 +35,8 @@ def __init__(
callback_group: CallbackGroup,
qos_profile: QoSProfile,
node_handle,
raw: bool
raw: bool,
event_callbacks: SubscriptionEventCallbacks,
) -> None:
"""
Create a container for a ROS subscription.
Expand Down Expand Up @@ -66,6 +69,10 @@ def __init__(
self.qos_profile = qos_profile
self.raw = raw

self.event_callbacks = event_callbacks
self.event_handlers = event_callbacks.create_event_handlers(
callback_group, subscription_handle)

@property
def handle(self):
return self.__handle
Expand Down
Loading

0 comments on commit 636a454

Please sign in to comment.