From b1dbca980cc335742a8c4977c3ce3a02ab3d38b1 Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Thu, 1 Apr 2021 13:06:46 -0700 Subject: [PATCH 1/6] Add functions for waiting for publishers and subscribers These blocking functions are especially useful for tests where we want to wait for some number of publishers/subscribers to be available before proceeding with some other checks. Signed-off-by: Jacob Perron --- rcl/include/rcl/graph.h | 90 +++++++++++++++++++++ rcl/src/rcl/graph.c | 156 ++++++++++++++++++++++++++++++++++++ rcl/test/rcl/test_graph.cpp | 90 +++++++++++++++++++++ 3 files changed, 336 insertions(+) diff --git a/rcl/include/rcl/graph.h b/rcl/include/rcl/graph.h index 50d6fb526..17a74a6ea 100644 --- a/rcl/include/rcl/graph.h +++ b/rcl/include/rcl/graph.h @@ -26,6 +26,7 @@ extern "C" #include #include +#include "rcutils/time.h" #include "rcutils/types.h" #include "rosidl_runtime_c/service_type_support_struct.h" @@ -581,6 +582,95 @@ rcl_count_subscribers( const char * topic_name, size_t * count); +/// Wait for there to be a specified number of publishers on a given topic. +/** + * The `node` parameter must point to a valid node. + * + * The `allocator` parameter must point to a valid allocator. + * + * The `topic_name` parameter must not be `NULL`, and must not be an empty string. + * It should also follow the topic name rules. + * + * This function blocks and will return when the number of publishers for `topic_name` + * is greater than or equal to the `count` parameter, or the specificed `timeout` is reached. + * + * The `timeout` parameter is in nanoseconds. + * A negative value disables the timeout (i.e. this function to blocks until the number of + * publishers is greater than or equals to `count`). + * + * The `success` parameter must point to a valid bool. + * The `success` parameter is the output for this function and will be set. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | Yes + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Maybe [1] + * [1] implementation may need to protect the data structure with a lock + * + * \param[in] node the handle to the node being used to query the ROS graph + * \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events + * \param[in] topic_name the name of the topic in question + * \param[in] count number of publishers to wait for + * \param[in] timeout maximum duration to wait for publishers + * \param[out] success `true` if the number of publishers is equal to or greater than count, or + * `false` if a timeout occurred waiting for publishers. + * \return #RCL_RET_OK if there was no errors, or + * \return #RCL_RET_NODE_INVALID if the node is invalid, or + * \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or + * \return #RCL_RET_TIMEOUT if a timeout occurs before the number of publishers is detected, or + * \return #RCL_RET_ERROR if an unspecified error occurred. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_wait_for_publishers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t count, + rcutils_duration_value_t timeout, + bool * success); + +/// Wait for there to be a specified number of subscribers on a given topic. +/** + * \see rcl_wait_for_publishers + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | Yes + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Maybe [1] + * [1] implementation may need to protect the data structure with a lock + * + * \param[in] node the handle to the node being used to query the ROS graph + * \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events + * \param[in] topic_name the name of the topic in question + * \param[in] count number of subscribers to wait for + * \param[in] timeout maximum duration to wait for subscribers + * \param[out] success `true` if the number of subscribers is equal to or greater than count, or + * `false` if a timeout occurred waiting for subscribers. + * \return #RCL_RET_OK if there was no errors, or + * \return #RCL_RET_NODE_INVALID if the node is invalid, or + * \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or + * \return #RCL_RET_TIMEOUT if a timeout occurs before the number of subscribers is detected, or + * \return #RCL_RET_ERROR if an unspecified error occurred. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_wait_for_subscribers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t count, + rcutils_duration_value_t timeout, + bool * success); + /// Return a list of all publishers to a topic. /** * The `node` parameter must point to a valid node. diff --git a/rcl/src/rcl/graph.c b/rcl/src/rcl/graph.c index 225b938d6..42279c665 100644 --- a/rcl/src/rcl/graph.c +++ b/rcl/src/rcl/graph.c @@ -20,8 +20,12 @@ extern "C" #include "rcl/graph.h" #include "rcl/error_handling.h" +#include "rcl/guard_condition.h" +#include "rcl/wait.h" #include "rcutils/allocator.h" +#include "rcutils/error_handling.h" #include "rcutils/macros.h" +#include "rcutils/time.h" #include "rcutils/types.h" #include "rmw/error_handling.h" #include "rmw/get_node_info_and_types.h" @@ -452,6 +456,158 @@ rcl_count_subscribers( return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret); } +typedef rcl_ret_t (* count_entities_func_t)( + const rcl_node_t * node, + const char * topic_name, + size_t * count); + +rcl_ret_t +_rcl_wait_for_entities( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t expected_count, + rcutils_duration_value_t timeout, + bool * success, + count_entities_func_t count_entities_func) +{ + if (!rcl_node_is_valid(node)) { + return RCL_RET_NODE_INVALID; + } + RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT); + + *success = false; + + // We can avoid waiting if there are already the expected number of publishers + size_t count = 0u; + rcl_ret_t count_ret = count_entities_func(node, topic_name, &count); + if (count_ret != RCL_RET_OK) { + // Error message already set + return count_ret; + } + if (expected_count <= count) { + *success = true; + return RCL_RET_OK; + } + + // Create a wait set and add the node graph guard condition to it + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + rcl_ret_t ret = rcl_wait_set_init( + &wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator); + if (ret != RCL_RET_OK) { + // Error message already set + return ret; + } + + const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node); + if (!guard_condition) { + // Error message already set + return RCL_RET_ERROR; + } + + // Add it to the wait set + ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL); + if (ret != RCL_RET_OK) { + // Error message already set + return ret; + } + + // Get current time + rcutils_time_point_value_t start; + rcutils_ret_t time_ret = rcutils_steady_time_now(&start); + if (time_ret != RCUTILS_RET_OK) { + rcutils_error_string_t error = rcutils_get_error_string(); + rcutils_reset_error(); + RCL_SET_ERROR_MSG(error.str); + return RCL_RET_ERROR; + } + + // Wait for expected count or timeout + while (true) { + ret = rcl_wait(&wait_set, timeout); + if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { + // Error message already set + return ret; + } + + // Check count + count_ret = count_entities_func(node, topic_name, &count); + if (count_ret != RCL_RET_OK) { + // Error already set + return count_ret; + } + if (expected_count <= count) { + *success = true; + break; + } + + // If we're not waiting indefinitely, compute time remaining + if (timeout >= 0) { + rcutils_time_point_value_t now; + time_ret = rcutils_steady_time_now(&now); + if (time_ret != RCUTILS_RET_OK) { + rcutils_error_string_t error = rcutils_get_error_string(); + rcutils_reset_error(); + RCL_SET_ERROR_MSG(error.str); + return RCL_RET_ERROR; + } + timeout = timeout - (now - start); + if (timeout <= 0) { + return RCL_RET_TIMEOUT; + } + } + + // Clear wait set for next iteration + ret = rcl_wait_set_clear(&wait_set); + if (ret != RCL_RET_OK) { + // Error message already set + return ret; + } + } + + return RCL_RET_OK; +} + +rcl_ret_t +rcl_wait_for_publishers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t expected_count, + rcutils_duration_value_t timeout, + bool * success) +{ + return _rcl_wait_for_entities( + node, + allocator, + topic_name, + expected_count, + timeout, + success, + rcl_count_publishers); +} + +rcl_ret_t +rcl_wait_for_subscribers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t expected_count, + rcutils_duration_value_t timeout, + bool * success) +{ + return _rcl_wait_for_entities( + node, + allocator, + topic_name, + expected_count, + timeout, + success, + rcl_count_subscribers); +} + typedef rmw_ret_t (* get_topic_endpoint_info_func_t)( const rmw_node_t * node, rcutils_allocator_t * allocator, diff --git a/rcl/test/rcl/test_graph.cpp b/rcl/test/rcl/test_graph.cpp index bb49a1859..d6025e4fe 100644 --- a/rcl/test/rcl/test_graph.cpp +++ b/rcl/test/rcl/test_graph.cpp @@ -706,6 +706,96 @@ TEST_F( rcl_reset_error(); } +/* Test the rcl_wait_for_publishers function. + */ +TEST_F( + CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), + test_rcl_wait_for_publishers +) { + rcl_ret_t ret; + rcl_node_t zero_node = rcl_get_zero_initialized_node(); + rcl_allocator_t zero_allocator = static_cast( + rcutils_get_zero_initialized_allocator()); + rcl_allocator_t allocator = rcl_get_default_allocator(); + const char * topic_name = "/topic_test_rcl_wait_for_publishers"; + bool success = false; + + // Invalid node + ret = rcl_wait_for_publishers(nullptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_publishers(&zero_node, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_publishers(this->old_node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid allocator + ret = rcl_wait_for_publishers(this->node_ptr, nullptr, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + ret = rcl_wait_for_publishers(this->node_ptr, &zero_allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid topic name + ret = rcl_wait_for_publishers(this->node_ptr, &allocator, nullptr, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid output arg + ret = rcl_wait_for_publishers(this->node_ptr, &allocator, topic_name, 1u, 100, nullptr); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Valid call (expect timeout since there are no publishers) + ret = rcl_wait_for_publishers(this->node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string().str; + rcl_reset_error(); +} + +/* Test the rcl_wait_for_subscribers function. + */ +TEST_F( + CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), + test_rcl_wait_for_subscribers +) { + rcl_ret_t ret; + rcl_node_t zero_node = rcl_get_zero_initialized_node(); + rcl_allocator_t zero_allocator = static_cast( + rcutils_get_zero_initialized_allocator()); + rcl_allocator_t allocator = rcl_get_default_allocator(); + const char * topic_name = "/topic_test_rcl_wait_for_subscribers"; + bool success = false; + + // Invalid node + ret = rcl_wait_for_subscribers(nullptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_subscribers(&zero_node, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_subscribers(this->old_node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid allocator + ret = rcl_wait_for_subscribers(this->node_ptr, nullptr, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + ret = rcl_wait_for_subscribers(this->node_ptr, &zero_allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid topic name + ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, nullptr, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid output arg + ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, topic_name, 1u, 100, nullptr); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Valid call (expect timeout since there are no subscribers) + ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string().str; + rcl_reset_error(); +} + void check_graph_state( const rcl_node_t * node_ptr, From 466ee6ef7ea984a1b188ca166e0923afed24d742 Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Thu, 1 Apr 2021 13:45:20 -0700 Subject: [PATCH 2/6] Update tests to use new graph API We can simplify some tests by reusing the new graph functions. Signed-off-by: Jacob Perron --- rcl/test/rcl/test_graph.cpp | 115 ++++++----------------- rcl/test/rcl/test_info_by_topic.cpp | 12 ++- rcl/test/rcl/wait_for_entity_helpers.cpp | 60 ------------ rcl/test/rcl/wait_for_entity_helpers.hpp | 20 ---- 4 files changed, 41 insertions(+), 166 deletions(-) diff --git a/rcl/test/rcl/test_graph.cpp b/rcl/test/rcl/test_graph.cpp index d6025e4fe..dd21856de 100644 --- a/rcl/test/rcl/test_graph.cpp +++ b/rcl/test/rcl/test_graph.cpp @@ -799,13 +799,11 @@ TEST_F( void check_graph_state( const rcl_node_t * node_ptr, - rcl_wait_set_t * wait_set_ptr, - const rcl_guard_condition_t * graph_guard_condition, std::string & topic_name, size_t expected_publisher_count, size_t expected_subscriber_count, bool expected_in_tnat, - size_t number_of_tries) + const std::chrono::nanoseconds & timeout) { RCUTILS_LOG_DEBUG_NAMED( ROS_PACKAGE_NAME, @@ -814,78 +812,39 @@ check_graph_state( expected_subscriber_count, expected_in_tnat ? "" : " not" ); - size_t publisher_count = 0; - size_t subscriber_count = 0; bool is_in_tnat = false; rcl_names_and_types_t tnat {}; rcl_ret_t ret; rcl_allocator_t allocator = rcl_get_default_allocator(); - for (size_t i = 0; i < number_of_tries; ++i) { - ret = rcl_count_publishers(node_ptr, topic_name.c_str(), &publisher_count); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - rcl_reset_error(); - - ret = rcl_count_subscribers(node_ptr, topic_name.c_str(), &subscriber_count); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - rcl_reset_error(); - - tnat = rcl_get_zero_initialized_names_and_types(); - ret = rcl_get_topic_names_and_types(node_ptr, &allocator, false, &tnat); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - rcl_reset_error(); - is_in_tnat = false; - for (size_t i = 0; RCL_RET_OK == ret && i < tnat.names.size; ++i) { - if (topic_name == std::string(tnat.names.data[i])) { - ASSERT_FALSE(is_in_tnat) << "duplicates in the tnat"; // Found it more than once! - is_in_tnat = true; - } - } - if (RCL_RET_OK == ret) { - ret = rcl_names_and_types_fini(&tnat); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - rcl_reset_error(); - } - RCUTILS_LOG_INFO_NAMED( - ROS_PACKAGE_NAME, - " Try %zu: %zu publishers, %zu subscribers, and that the topic is%s in the graph.", - i + 1, - publisher_count, - subscriber_count, - is_in_tnat ? "" : " not" - ); - if ( - expected_publisher_count == publisher_count && - expected_subscriber_count == subscriber_count && - expected_in_tnat == is_in_tnat) - { - RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, " state correct!"); - break; - } - // Wait for graph change before trying again. - if ((i + 1) == number_of_tries) { - // Don't wait for the graph to change on the last loop because we won't check again. - continue; - } - ret = rcl_wait_set_clear(wait_set_ptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - ret = rcl_wait_set_add_guard_condition(wait_set_ptr, graph_guard_condition, nullptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - std::chrono::nanoseconds time_to_sleep = std::chrono::milliseconds(400); - RCUTILS_LOG_INFO_NAMED( - ROS_PACKAGE_NAME, - " state wrong, waiting up to '%s' nanoseconds for graph changes... ", - std::to_string(time_to_sleep.count()).c_str()); - ret = rcl_wait(wait_set_ptr, time_to_sleep.count()); - if (ret == RCL_RET_TIMEOUT) { - RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "timeout"); - continue; + // Wait for expected number of publishers + bool success = false; + ret = rcl_wait_for_publishers( + node_ptr, &allocator, topic_name.c_str(), expected_publisher_count, timeout.count(), &success); + ASSERT_EQ(ret, RCL_RET_OK); + EXPECT_TRUE(success); + // Wait for expected number of subscribers + success = false; + ret = rcl_wait_for_subscribers( + node_ptr, &allocator, topic_name.c_str(), expected_subscriber_count, timeout.count(), &success); + ASSERT_EQ(ret, RCL_RET_OK); + EXPECT_TRUE(success); + + tnat = rcl_get_zero_initialized_names_and_types(); + ret = rcl_get_topic_names_and_types(node_ptr, &allocator, false, &tnat); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + is_in_tnat = false; + for (size_t i = 0; RCL_RET_OK == ret && i < tnat.names.size; ++i) { + if (topic_name == std::string(tnat.names.data[i])) { + ASSERT_FALSE(is_in_tnat) << "duplicates in the tnat"; // Found it more than once! + is_in_tnat = true; } - RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "change occurred"); + } + if (RCL_RET_OK == ret) { + ret = rcl_names_and_types_fini(&tnat); ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; } - EXPECT_EQ(expected_publisher_count, publisher_count); - EXPECT_EQ(expected_subscriber_count, subscriber_count); + if (expected_in_tnat) { EXPECT_TRUE(is_in_tnat); } else { @@ -1235,18 +1194,14 @@ TEST_F(CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), test_graph_query_functio topic_name += std::to_string(now.count()); RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Using topic name: %s", topic_name.c_str()); rcl_ret_t ret; - const rcl_guard_condition_t * graph_guard_condition = - rcl_node_get_graph_guard_condition(this->node_ptr); // First assert the "topic_name" is not in use. check_graph_state( this->node_ptr, - this->wait_set_ptr, - graph_guard_condition, topic_name, 0, // expected publishers on topic 0, // expected subscribers on topic false, // topic expected in graph - 9); // number of retries + std::chrono::seconds(4)); // timeout // Now create a publisher on "topic_name" and check that it is seen. rcl_publisher_t pub = rcl_get_zero_initialized_publisher(); rcl_publisher_options_t pub_ops = rcl_publisher_get_default_options(); @@ -1257,13 +1212,11 @@ TEST_F(CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), test_graph_query_functio // Check the graph. check_graph_state( this->node_ptr, - this->wait_set_ptr, - graph_guard_condition, topic_name, 1, // expected publishers on topic 0, // expected subscribers on topic true, // topic expected in graph - 9); // number of retries + std::chrono::seconds(4)); // timeout // Now create a subscriber. rcl_subscription_t sub = rcl_get_zero_initialized_subscription(); rcl_subscription_options_t sub_ops = rcl_subscription_get_default_options(); @@ -1273,13 +1226,11 @@ TEST_F(CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), test_graph_query_functio // Check the graph again. check_graph_state( this->node_ptr, - this->wait_set_ptr, - graph_guard_condition, topic_name, 1, // expected publishers on topic 1, // expected subscribers on topic true, // topic expected in graph - 9); // number of retries + std::chrono::seconds(4)); // timeout // Destroy the publisher. ret = rcl_publisher_fini(&pub, this->node_ptr); EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; @@ -1287,13 +1238,11 @@ TEST_F(CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), test_graph_query_functio // Check the graph again. check_graph_state( this->node_ptr, - this->wait_set_ptr, - graph_guard_condition, topic_name, 0, // expected publishers on topic 1, // expected subscribers on topic true, // topic expected in graph - 9); // number of retries + std::chrono::seconds(4)); // timeout // Destroy the subscriber. ret = rcl_subscription_fini(&sub, this->node_ptr); EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; @@ -1301,13 +1250,11 @@ TEST_F(CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), test_graph_query_functio // Check the graph again. check_graph_state( this->node_ptr, - this->wait_set_ptr, - graph_guard_condition, topic_name, 0, // expected publishers on topic 0, // expected subscribers on topic false, // topic expected in graph - 9); // number of retries + std::chrono::seconds(4)); // timeout } /* Test the graph guard condition notices below changes. diff --git a/rcl/test/rcl/test_info_by_topic.cpp b/rcl/test/rcl/test_info_by_topic.cpp index e7faadb9a..3ce43854a 100644 --- a/rcl/test/rcl/test_info_by_topic.cpp +++ b/rcl/test/rcl/test_info_by_topic.cpp @@ -359,7 +359,11 @@ TEST_F( ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; const std::string fqdn = std::string("/") + this->topic_name; // Wait until GraphCache publishers are updated - ASSERT_TRUE(wait_for_graph_publication(&this->node, fqdn.c_str(), 1u, 10, 100)); + bool success = false; + ret = rcl_wait_for_publishers( + &this->node, &allocator, fqdn.c_str(), 1u, RCUTILS_S_TO_NS(1), &success); + ASSERT_EQ(ret, RCL_RET_OK); + ASSERT_TRUE(success); // Get publishers info by topic rmw_topic_endpoint_info_array_t topic_endpoint_info_array_pub = rmw_get_zero_initialized_topic_endpoint_info_array(); @@ -375,7 +379,11 @@ TEST_F( assert_qos_equality(topic_endpoint_info_pub.qos_profile, default_qos_profile, true); // Wait until GraphCache subcribers are updated - ASSERT_TRUE(wait_for_graph_subscription(&this->node, fqdn.c_str(), 1u, 10, 100)); + success = false; + ret = rcl_wait_for_subscribers( + &this->node, &allocator, fqdn.c_str(), 1u, RCUTILS_S_TO_NS(1), &success); + ASSERT_EQ(ret, RCL_RET_OK); + ASSERT_TRUE(success); // Get subscribers info by topic rmw_topic_endpoint_info_array_t topic_endpoint_info_array_sub = rmw_get_zero_initialized_topic_endpoint_info_array(); diff --git a/rcl/test/rcl/wait_for_entity_helpers.cpp b/rcl/test/rcl/wait_for_entity_helpers.cpp index bdf2f5d0d..625e1ea4a 100644 --- a/rcl/test/rcl/wait_for_entity_helpers.cpp +++ b/rcl/test/rcl/wait_for_entity_helpers.cpp @@ -236,63 +236,3 @@ wait_for_subscription_to_be_ready( } return false; } - -bool -wait_for_graph_publication( - const rcl_node_t * node, - const char * topic_name, - size_t count_to_wait, - size_t max_tries, - int64_t period_ms) -{ - if (count_to_wait == 0) { - return true; // Nothing to wait - } - size_t iteration = 0; - while (iteration < max_tries) { - ++iteration; - size_t count; - rcl_ret_t ret = rcl_count_publishers(node, topic_name, &count); - if (ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - ROS_PACKAGE_NAME, - "Error in rcl_count_publishers: %s", rcl_get_error_string().str); - return false; - } - if (count == count_to_wait) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(period_ms)); - } - return false; -} - -bool -wait_for_graph_subscription( - const rcl_node_t * node, - const char * topic_name, - size_t count_to_wait, - size_t max_tries, - int64_t period_ms) -{ - if (count_to_wait == 0) { - return true; // Nothing to wait - } - size_t iteration = 0; - while (iteration < max_tries) { - ++iteration; - size_t count; - rcl_ret_t ret = rcl_count_subscribers(node, topic_name, &count); - if (ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - ROS_PACKAGE_NAME, - "Error in rcl_count_subscribers: %s", rcl_get_error_string().str); - return false; - } - if (count == count_to_wait) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(period_ms)); - } - return false; -} diff --git a/rcl/test/rcl/wait_for_entity_helpers.hpp b/rcl/test/rcl/wait_for_entity_helpers.hpp index 9a31ff10c..35baa645b 100644 --- a/rcl/test/rcl/wait_for_entity_helpers.hpp +++ b/rcl/test/rcl/wait_for_entity_helpers.hpp @@ -63,24 +63,4 @@ wait_for_subscription_to_be_ready( size_t max_tries, int64_t period_ms); -/// Wait for specified number of publication in GraphCache -/// by trying at most `max_tries` times with a `period_ms` period. -bool -wait_for_graph_publication( - const rcl_node_t * node, - const char * topic_name, - size_t count_to_wait, - size_t max_tries, - int64_t period_ms); - -/// Wait for specified number of subcription in GraphCache -/// by trying at most `max_tries` times with a `period_ms` period. -bool -wait_for_graph_subscription( - const rcl_node_t * node, - const char * topic_name, - size_t count_to_wait, - size_t max_tries, - int64_t period_ms); - #endif // RCL__WAIT_FOR_ENTITY_HELPERS_HPP_ From 15f1e6c43fb7286024523e6ed56363e03ceb2085 Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Thu, 1 Apr 2021 13:52:23 -0700 Subject: [PATCH 3/6] Fix doc typos Signed-off-by: Jacob Perron --- rcl/include/rcl/graph.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rcl/include/rcl/graph.h b/rcl/include/rcl/graph.h index 17a74a6ea..ab35843cd 100644 --- a/rcl/include/rcl/graph.h +++ b/rcl/include/rcl/graph.h @@ -592,10 +592,10 @@ rcl_count_subscribers( * It should also follow the topic name rules. * * This function blocks and will return when the number of publishers for `topic_name` - * is greater than or equal to the `count` parameter, or the specificed `timeout` is reached. + * is greater than or equal to the `count` parameter, or the specified `timeout` is reached. * * The `timeout` parameter is in nanoseconds. - * A negative value disables the timeout (i.e. this function to blocks until the number of + * A negative value disables the timeout (i.e. this function blocks until the number of * publishers is greater than or equals to `count`). * * The `success` parameter must point to a valid bool. From 0b94ee551d7cf28b50f9e27d6bb8f9dd40a86298 Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Thu, 1 Apr 2021 14:20:58 -0700 Subject: [PATCH 4/6] Improve documentation Signed-off-by: Jacob Perron --- rcl/include/rcl/graph.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rcl/include/rcl/graph.h b/rcl/include/rcl/graph.h index ab35843cd..23da58082 100644 --- a/rcl/include/rcl/graph.h +++ b/rcl/include/rcl/graph.h @@ -585,6 +585,8 @@ rcl_count_subscribers( /// Wait for there to be a specified number of publishers on a given topic. /** * The `node` parameter must point to a valid node. + * The nodes graph guard condition is used by this function, and therefore the caller should + * take care not to use the guard condition concurrently in any other wait sets. * * The `allocator` parameter must point to a valid allocator. * @@ -595,6 +597,7 @@ rcl_count_subscribers( * is greater than or equal to the `count` parameter, or the specified `timeout` is reached. * * The `timeout` parameter is in nanoseconds. + * The timeout is based on steady time elapsed. * A negative value disables the timeout (i.e. this function blocks until the number of * publishers is greater than or equals to `count`). * From 51b3235d3e465305dd87ce58113d2c011ba4d7e2 Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Thu, 1 Apr 2021 14:49:43 -0700 Subject: [PATCH 5/6] Finalize wait set before leaving function scope Signed-off-by: Jacob Perron --- rcl/src/rcl/graph.c | 54 +++++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/rcl/src/rcl/graph.c b/rcl/src/rcl/graph.c index 42279c665..f91722091 100644 --- a/rcl/src/rcl/graph.c +++ b/rcl/src/rcl/graph.c @@ -478,14 +478,15 @@ _rcl_wait_for_entities( RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT); RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT); + rcl_ret_t ret = RCL_RET_OK; *success = false; // We can avoid waiting if there are already the expected number of publishers size_t count = 0u; - rcl_ret_t count_ret = count_entities_func(node, topic_name, &count); - if (count_ret != RCL_RET_OK) { + ret = count_entities_func(node, topic_name, &count); + if (ret != RCL_RET_OK) { // Error message already set - return count_ret; + return ret; } if (expected_count <= count) { *success = true; @@ -494,7 +495,7 @@ _rcl_wait_for_entities( // Create a wait set and add the node graph guard condition to it rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_ret_t ret = rcl_wait_set_init( + ret = rcl_wait_set_init( &wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator); if (ret != RCL_RET_OK) { // Error message already set @@ -504,14 +505,15 @@ _rcl_wait_for_entities( const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node); if (!guard_condition) { // Error message already set - return RCL_RET_ERROR; + ret = RCL_RET_ERROR; + goto cleanup; } // Add it to the wait set ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL); if (ret != RCL_RET_OK) { // Error message already set - return ret; + goto cleanup; } // Get current time @@ -521,22 +523,26 @@ _rcl_wait_for_entities( rcutils_error_string_t error = rcutils_get_error_string(); rcutils_reset_error(); RCL_SET_ERROR_MSG(error.str); - return RCL_RET_ERROR; + ret = RCL_RET_ERROR; + goto cleanup; } // Wait for expected count or timeout + rcl_ret_t wait_ret; while (true) { - ret = rcl_wait(&wait_set, timeout); - if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { + // Use separate 'wait_ret' code to avoid returning spurious TIMEOUT value + wait_ret = rcl_wait(&wait_set, timeout); + if (wait_ret != RCL_RET_OK && wait_ret != RCL_RET_TIMEOUT) { // Error message already set - return ret; + ret = wait_ret; + break; } // Check count - count_ret = count_entities_func(node, topic_name, &count); - if (count_ret != RCL_RET_OK) { + ret = count_entities_func(node, topic_name, &count); + if (ret != RCL_RET_OK) { // Error already set - return count_ret; + break; } if (expected_count <= count) { *success = true; @@ -551,11 +557,13 @@ _rcl_wait_for_entities( rcutils_error_string_t error = rcutils_get_error_string(); rcutils_reset_error(); RCL_SET_ERROR_MSG(error.str); - return RCL_RET_ERROR; + ret = RCL_RET_ERROR; + break; } timeout = timeout - (now - start); if (timeout <= 0) { - return RCL_RET_TIMEOUT; + ret = RCL_RET_TIMEOUT; + break; } } @@ -563,11 +571,23 @@ _rcl_wait_for_entities( ret = rcl_wait_set_clear(&wait_set); if (ret != RCL_RET_OK) { // Error message already set - return ret; + break; } } - return RCL_RET_OK; + rcl_ret_t cleanup_ret; +cleanup: + // Cleanup + cleanup_ret = rcl_wait_set_fini(&wait_set); + if (cleanup_ret != RCL_RET_OK) { + // If we got two unexpected errors, return the earlier error + if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { + // Error message already set + ret = cleanup_ret; + } + } + + return ret; } rcl_ret_t From af14fc45bbbc073eb2cc3cad37af1f8dad561727 Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Thu, 1 Apr 2021 14:54:32 -0700 Subject: [PATCH 6/6] Use system time Signed-off-by: Jacob Perron --- rcl/include/rcl/graph.h | 2 +- rcl/src/rcl/graph.c | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rcl/include/rcl/graph.h b/rcl/include/rcl/graph.h index 23da58082..1c17762b8 100644 --- a/rcl/include/rcl/graph.h +++ b/rcl/include/rcl/graph.h @@ -597,7 +597,7 @@ rcl_count_subscribers( * is greater than or equal to the `count` parameter, or the specified `timeout` is reached. * * The `timeout` parameter is in nanoseconds. - * The timeout is based on steady time elapsed. + * The timeout is based on system time elapsed. * A negative value disables the timeout (i.e. this function blocks until the number of * publishers is greater than or equals to `count`). * diff --git a/rcl/src/rcl/graph.c b/rcl/src/rcl/graph.c index f91722091..6095cc3d5 100644 --- a/rcl/src/rcl/graph.c +++ b/rcl/src/rcl/graph.c @@ -517,8 +517,9 @@ _rcl_wait_for_entities( } // Get current time + // We use system time to be consistent with the clock used by rcl_wait() rcutils_time_point_value_t start; - rcutils_ret_t time_ret = rcutils_steady_time_now(&start); + rcutils_ret_t time_ret = rcutils_system_time_now(&start); if (time_ret != RCUTILS_RET_OK) { rcutils_error_string_t error = rcutils_get_error_string(); rcutils_reset_error(); @@ -552,7 +553,7 @@ _rcl_wait_for_entities( // If we're not waiting indefinitely, compute time remaining if (timeout >= 0) { rcutils_time_point_value_t now; - time_ret = rcutils_steady_time_now(&now); + time_ret = rcutils_system_time_now(&now); if (time_ret != RCUTILS_RET_OK) { rcutils_error_string_t error = rcutils_get_error_string(); rcutils_reset_error();