Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions for waiting for publishers and subscribers #907

Merged
merged 6 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions rcl/include/rcl/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C"
#include <rmw/get_topic_names_and_types.h>
#include <rmw/topic_endpoint_info_array.h>

#include "rcutils/time.h"
#include "rcutils/types.h"

#include "rosidl_runtime_c/service_type_support_struct.h"
Expand Down Expand Up @@ -581,6 +582,98 @@ rcl_count_subscribers(
const char * topic_name,
size_t * count);

/// Wait for there to be a specified number of publishers on a given topic.
/**
* The `node` parameter must point to a valid node.
* The nodes graph guard condition is used by this function, and therefore the caller should
* take care not to use the guard condition concurrently in any other wait sets.
*
* The `allocator` parameter must point to a valid allocator.
*
* The `topic_name` parameter must not be `NULL`, and must not be an empty string.
* It should also follow the topic name rules.
*
* This function blocks and will return when the number of publishers for `topic_name`
* is greater than or equal to the `count` parameter, or the specified `timeout` is reached.
jacobperron marked this conversation as resolved.
Show resolved Hide resolved
*
* The `timeout` parameter is in nanoseconds.
* The timeout is based on system time elapsed.
* A negative value disables the timeout (i.e. this function blocks until the number of
* publishers is greater than or equals to `count`).
jacobperron marked this conversation as resolved.
Show resolved Hide resolved
*
* The `success` parameter must point to a valid bool.
* The `success` parameter is the output for this function and will be set.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Maybe [1]
* <i>[1] implementation may need to protect the data structure with a lock</i>
*
* \param[in] node the handle to the node being used to query the ROS graph
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
* \param[in] topic_name the name of the topic in question
* \param[in] count number of publishers to wait for
* \param[in] timeout maximum duration to wait for publishers
* \param[out] success `true` if the number of publishers is equal to or greater than count, or
* `false` if a timeout occurred waiting for publishers.
* \return #RCL_RET_OK if there was no errors, or
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of publishers is detected, or
* \return #RCL_RET_ERROR if an unspecified error occurred.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_wait_for_publishers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t count,
rcutils_duration_value_t timeout,
bool * success);

/// Wait for there to be a specified number of subscribers on a given topic.
/**
* \see rcl_wait_for_publishers
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Maybe [1]
* <i>[1] implementation may need to protect the data structure with a lock</i>
*
* \param[in] node the handle to the node being used to query the ROS graph
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
* \param[in] topic_name the name of the topic in question
* \param[in] count number of subscribers to wait for
* \param[in] timeout maximum duration to wait for subscribers
* \param[out] success `true` if the number of subscribers is equal to or greater than count, or
* `false` if a timeout occurred waiting for subscribers.
* \return #RCL_RET_OK if there was no errors, or
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of subscribers is detected, or
* \return #RCL_RET_ERROR if an unspecified error occurred.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_wait_for_subscribers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t count,
rcutils_duration_value_t timeout,
bool * success);

/// Return a list of all publishers to a topic.
/**
* The `node` parameter must point to a valid node.
Expand Down
177 changes: 177 additions & 0 deletions rcl/src/rcl/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ extern "C"
#include "rcl/graph.h"

#include "rcl/error_handling.h"
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rcutils/allocator.h"
#include "rcutils/error_handling.h"
#include "rcutils/macros.h"
#include "rcutils/time.h"
#include "rcutils/types.h"
#include "rmw/error_handling.h"
#include "rmw/get_node_info_and_types.h"
Expand Down Expand Up @@ -452,6 +456,179 @@ rcl_count_subscribers(
return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
}

typedef rcl_ret_t (* count_entities_func_t)(
const rcl_node_t * node,
const char * topic_name,
size_t * count);

rcl_ret_t
_rcl_wait_for_entities(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success,
count_entities_func_t count_entities_func)
{
if (!rcl_node_is_valid(node)) {
return RCL_RET_NODE_INVALID;
}
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT);

rcl_ret_t ret = RCL_RET_OK;
*success = false;

// We can avoid waiting if there are already the expected number of publishers
size_t count = 0u;
ret = count_entities_func(node, topic_name, &count);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}
if (expected_count <= count) {
*success = true;
return RCL_RET_OK;
}

// Create a wait set and add the node graph guard condition to it
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
ret = rcl_wait_set_init(
&wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}

const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node);
if (!guard_condition) {
// Error message already set
ret = RCL_RET_ERROR;
goto cleanup;
}

// Add it to the wait set
ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL);
if (ret != RCL_RET_OK) {
// Error message already set
goto cleanup;
}

// Get current time
// We use system time to be consistent with the clock used by rcl_wait()
rcutils_time_point_value_t start;
rcutils_ret_t time_ret = rcutils_system_time_now(&start);
if (time_ret != RCUTILS_RET_OK) {
rcutils_error_string_t error = rcutils_get_error_string();
rcutils_reset_error();
RCL_SET_ERROR_MSG(error.str);
ret = RCL_RET_ERROR;
goto cleanup;
}

// Wait for expected count or timeout
rcl_ret_t wait_ret;
while (true) {
// Use separate 'wait_ret' code to avoid returning spurious TIMEOUT value
wait_ret = rcl_wait(&wait_set, timeout);
if (wait_ret != RCL_RET_OK && wait_ret != RCL_RET_TIMEOUT) {
// Error message already set
ret = wait_ret;
break;
}

// Check count
ret = count_entities_func(node, topic_name, &count);
if (ret != RCL_RET_OK) {
// Error already set
break;
}
if (expected_count <= count) {
*success = true;
break;
jacobperron marked this conversation as resolved.
Show resolved Hide resolved
}

// If we're not waiting indefinitely, compute time remaining
if (timeout >= 0) {
rcutils_time_point_value_t now;
time_ret = rcutils_system_time_now(&now);
if (time_ret != RCUTILS_RET_OK) {
rcutils_error_string_t error = rcutils_get_error_string();
rcutils_reset_error();
RCL_SET_ERROR_MSG(error.str);
ret = RCL_RET_ERROR;
break;
}
timeout = timeout - (now - start);
if (timeout <= 0) {
ret = RCL_RET_TIMEOUT;
break;
}
}

// Clear wait set for next iteration
ret = rcl_wait_set_clear(&wait_set);
if (ret != RCL_RET_OK) {
// Error message already set
break;
}
}

rcl_ret_t cleanup_ret;
cleanup:
// Cleanup
cleanup_ret = rcl_wait_set_fini(&wait_set);
if (cleanup_ret != RCL_RET_OK) {
// If we got two unexpected errors, return the earlier error
if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) {
// Error message already set
ret = cleanup_ret;
}
}

return ret;
}

rcl_ret_t
rcl_wait_for_publishers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success)
{
return _rcl_wait_for_entities(
node,
allocator,
topic_name,
expected_count,
timeout,
success,
rcl_count_publishers);
}

rcl_ret_t
rcl_wait_for_subscribers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success)
{
return _rcl_wait_for_entities(
node,
allocator,
topic_name,
expected_count,
timeout,
success,
rcl_count_subscribers);
}

typedef rmw_ret_t (* get_topic_endpoint_info_func_t)(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
Expand Down
Loading