From 6ec226ae01cfbb791f7955897380ea2dc397065b Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 25 Aug 2021 21:39:22 +0800 Subject: [PATCH] Add rcl_publisher_wait_for_all_acked support (#913) Signed-off-by: Barry Xu --- rcl/include/rcl/publisher.h | 42 ++++ rcl/src/rcl/publisher.c | 37 ++++ rcl/test/CMakeLists.txt | 9 + rcl/test/rcl/test_publisher.cpp | 69 ++++++ rcl/test/rcl/test_publisher_wait_all_ack.cpp | 208 ++++++++++++++++++ .../test_profile/disable_intraprocess.xml | 3 + 6 files changed, 368 insertions(+) create mode 100644 rcl/test/rcl/test_publisher_wait_all_ack.cpp create mode 100644 rcl/test/resources/test_profile/disable_intraprocess.xml diff --git a/rcl/include/rcl/publisher.h b/rcl/include/rcl/publisher.h index 85e26bfd5..8a8a2ca9f 100644 --- a/rcl/include/rcl/publisher.h +++ b/rcl/include/rcl/publisher.h @@ -27,6 +27,7 @@ extern "C" #include "rcl/macros.h" #include "rcl/node.h" #include "rcl/visibility_control.h" +#include "rcl/time.h" /// Internal rcl publisher implementation struct. struct rcl_publisher_impl_t; @@ -434,6 +435,47 @@ RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); +/// Wait until all published message data is acknowledged or until the specified timeout elapses. +/** + * This function waits until all published message data were acknowledged by peer node or timeout. + * + * The timeout unit is nanoseconds. + * If the timeout is negative then this function will block indefinitely until all published message + * data were acknowledged. + * If the timeout is 0 then this function will be non-blocking; checking all published message data + * were acknowledged (If acknowledged, return RCL_RET_OK. Otherwise, return RCL_RET_TIMEOUT), but + * not waiting. + * If the timeout is greater than 0 then this function will return after that period of time has + * elapsed (return RCL_RET_TIMEOUT) or all published message data were acknowledged (return + * RCL_RET_OK). + * + * This function only waits for acknowledgments if the publisher's QOS profile is RELIABLE. + * Otherwise this function will immediately return RCL_RET_OK. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | Yes + * Uses Atomics | No + * Lock-Free | No + * + * \param[in] publisher handle to the publisher that needs to wait for all acked. + * \param[in] timeout the duration to wait for all published message data were acknowledged, in + * nanoseconds. + * \return #RCL_RET_OK if successful, or + * \return #RCL_RET_TIMEOUT if timed out, or + * \return #RCL_RET_PUBLISHER_INVALID if publisher is invalid, or + * \return #RCL_RET_ERROR if an unspecified error occurs, or + * \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_publisher_wait_for_all_acked( + const rcl_publisher_t * publisher, + rcl_duration_value_t timeout); + /// Get the topic name for the publisher. /** * This function returns the publisher's internal topic name string. diff --git a/rcl/src/rcl/publisher.c b/rcl/src/rcl/publisher.c index a7c4103ce..a3a97471c 100644 --- a/rcl/src/rcl/publisher.c +++ b/rcl/src/rcl/publisher.c @@ -27,6 +27,8 @@ extern "C" #include "rcl/node.h" #include "rcutils/logging_macros.h" #include "rcutils/macros.h" +#include "rcl/time.h" +#include "rmw/time.h" #include "rmw/error_handling.h" #include "tracetools/tracetools.h" @@ -304,6 +306,41 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher) return RCL_RET_OK; } +rcl_ret_t +rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, rcl_duration_value_t timeout) +{ + if (!rcl_publisher_is_valid(publisher)) { + return RCL_RET_PUBLISHER_INVALID; // error already set + } + + rmw_time_t rmw_timeout; + if (timeout > 0) { + rmw_timeout.sec = RCL_NS_TO_S(timeout); + rmw_timeout.nsec = timeout % 1000000000; + } else if (timeout < 0) { + rmw_time_t infinite = RMW_DURATION_INFINITE; + rmw_timeout = infinite; + } else { + rmw_time_t zero = RMW_DURATION_UNSPECIFIED; + rmw_timeout = zero; + } + + rmw_ret_t ret = rmw_publisher_wait_for_all_acked(publisher->impl->rmw_handle, rmw_timeout); + if (ret != RMW_RET_OK) { + if (ret == RMW_RET_TIMEOUT) { + return RCL_RET_TIMEOUT; + } + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + if (ret == RMW_RET_UNSUPPORTED) { + return RCL_RET_UNSUPPORTED; + } else { + return RCL_RET_ERROR; + } + } + + return RCL_RET_OK; +} + const char * rcl_publisher_get_topic_name(const rcl_publisher_t * publisher) { diff --git a/rcl/test/CMakeLists.txt b/rcl/test/CMakeLists.txt index 277a0ad52..97ba66fbe 100644 --- a/rcl/test/CMakeLists.txt +++ b/rcl/test/CMakeLists.txt @@ -212,6 +212,15 @@ function(test_target_function) AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs" ) + rcl_add_custom_gtest(test_publisher_wait_all_ack${target_suffix} + SRCS rcl/test_publisher_wait_all_ack.cpp rcl/wait_for_entity_helpers.cpp + ENV ${rmw_implementation_env_var} + APPEND_LIBRARY_DIRS ${extra_lib_dirs} + INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../src/rcl/ + LIBRARIES ${PROJECT_NAME} mimick + AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs" + ) + rcl_add_custom_gtest(test_service${target_suffix} SRCS rcl/test_service.cpp rcl/wait_for_entity_helpers.cpp ENV ${rmw_implementation_env_var} diff --git a/rcl/test/rcl/test_publisher.cpp b/rcl/test/rcl/test_publisher.cpp index cf28011f7..9c853ccde 100644 --- a/rcl/test/rcl/test_publisher.cpp +++ b/rcl/test/rcl/test_publisher.cpp @@ -402,6 +402,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish EXPECT_EQ(RCL_RET_OK, rcl_publisher_assert_liveliness(&publisher)); + EXPECT_EQ(RCL_RET_OK, rcl_publisher_wait_for_all_acked(&publisher, 0)); + size_t count_size; test_msgs__msg__BasicTypes msg; rcl_serialized_message_t serialized_msg = rmw_get_zero_initialized_serialized_message(); @@ -429,6 +431,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -471,6 +475,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -502,6 +508,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -532,6 +540,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(nullptr)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(nullptr, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(nullptr, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -572,6 +582,65 @@ TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_assert rcl_reset_error(); } +// Mocking rmw_publisher_wait_for_all_acked to make +// rcl_publisher_wait_for_all_acked fail +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, ==) +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, !=) +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, <) +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, >) + +TEST_F( + CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), + test_mock_assert_wait_for_all_acked) +{ +#define CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_RESULT, EXPECT_RET) do { \ + rmw_publisher_wait_for_all_acked_return = RMW_RET_RESULT; \ + ret = rcl_publisher_wait_for_all_acked(&publisher, 1000000); \ + EXPECT_EQ(EXPECT_RET, ret); \ + rcl_reset_error(); \ +} while (0) + + rcl_ret_t ret; + rmw_ret_t rmw_publisher_wait_for_all_acked_return; + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_publisher_wait_for_all_acked, rmw_publisher_wait_for_all_acked_return); + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_TIMEOUT + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_TIMEOUT, RCL_RET_TIMEOUT); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_UNSUPPORTED + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_UNSUPPORTED, RCL_RET_UNSUPPORTED); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_INVALID_ARGUMENT + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INVALID_ARGUMENT, RCL_RET_ERROR); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_INCORRECT_RMW_IMPLEMENTATION + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INCORRECT_RMW_IMPLEMENTATION, RCL_RET_ERROR); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_ERROR + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_ERROR, RCL_RET_ERROR); + } +} + // Mocking rmw_publish to make rcl_publish fail TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_publish) { auto mock = mocking_utils::patch_and_return("lib:rcl", rmw_publish, RMW_RET_ERROR); diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp new file mode 100644 index 000000000..c931c09aa --- /dev/null +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -0,0 +1,208 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rcl/allocator.h" +#include "rcl/publisher.h" +#include "rcl/subscription.h" +#include "rcpputils/filesystem_helper.hpp" +#include "rcutils/env.h" + +#include "rcl/rcl.h" +#include "test_msgs/msg/strings.h" +#include "test_msgs/msg/basic_types.h" +#include "rosidl_runtime_c/string_functions.h" +#include "wait_for_entity_helpers.hpp" + +#include "mimick/mimick.h" +#include "osrf_testing_tools_cpp/scope_exit.hpp" +#include "rcl/error_handling.h" + +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + + +/* This class is used for test_wait_for_all_acked + */ +class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::testing::Test +{ +public: + rcl_context_t * context_ptr; + rcl_node_t * node_ptr; + + void SetUp() + { + bool is_fastdds = (std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") == 0); + + if (is_fastdds) { + // By default, fastdds use intraprocess mode in this scenario. But this leads to high-speed + // data transmission. test_wait_for_all_acked need low data transmission. So disable this + // mode via fastdds profile file. + rcpputils::fs::path fastdds_profile(TEST_RESOURCES_DIRECTORY); + fastdds_profile /= "test_profile/disable_intraprocess.xml"; + ASSERT_EQ( + rcutils_set_env("FASTRTPS_DEFAULT_PROFILES_FILE", fastdds_profile.string().c_str()), + true); + } + + rcl_ret_t ret; + { + rcl_init_options_t init_options = rcl_get_zero_initialized_init_options(); + ret = rcl_init_options_init(&init_options, rcl_get_default_allocator()); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ(RCL_RET_OK, rcl_init_options_fini(&init_options)) << rcl_get_error_string().str; + }); + this->context_ptr = new rcl_context_t; + *this->context_ptr = rcl_get_zero_initialized_context(); + ret = rcl_init(0, nullptr, &init_options, this->context_ptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + this->node_ptr = new rcl_node_t; + *this->node_ptr = rcl_get_zero_initialized_node(); + constexpr char name[] = "test_publisher_node2"; + rcl_node_options_t node_options = rcl_node_get_default_options(); + ret = rcl_node_init(this->node_ptr, name, "", this->context_ptr, &node_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + void TearDown() + { + rcutils_set_env("FASTRTPS_DEFAULT_PROFILES_FILE", NULL); + rcl_ret_t ret = rcl_node_fini(this->node_ptr); + delete this->node_ptr; + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ret = rcl_shutdown(this->context_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ret = rcl_context_fini(this->context_ptr); + delete this->context_ptr; + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } +}; + +#define INIT_SUBSCRIPTION(idx) \ + rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \ + ret = rcl_subscription_init( \ + &subscription ## idx, \ + this->node_ptr, \ + ts, \ + topic_name, \ + &subscription_options); \ + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \ + { \ + ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \ + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + }); + +#define ONE_MEGABYTE (1024 * 1024) + +TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for_all_acked) { + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, BasicTypes); + constexpr char topic_name[] = "test_wait_for_all_acked"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + publisher_options.qos.depth = 10000; + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic_name, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + if (ret != RCL_RET_OK) { + FAIL() << rcl_get_error_string().str; + } + }); + + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + subscription_options.qos.depth = 1; + subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + + INIT_SUBSCRIPTION(1) + INIT_SUBSCRIPTION(2) + INIT_SUBSCRIPTION(3) + + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + + rcl_allocator_t allocator = rcl_get_default_allocator(); + char * test_string = static_cast(allocator.allocate(ONE_MEGABYTE, allocator.state)); + ASSERT_TRUE(test_string != NULL); + memset(test_string, 'a', ONE_MEGABYTE); + test_string[ONE_MEGABYTE - 1] = '\0'; + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + allocator.deallocate(test_string, allocator.state); + test_msgs__msg__Strings__fini(&msg); + }); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + + ret = rcl_publish(&publisher, &msg, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription1, context_ptr, 10, 100)); + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription2, context_ptr, 10, 100)); + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription3, context_ptr, 10, 100)); + + int i = 0; + for (; i < 500; i++) { + ret = rcl_publish(&publisher, &msg, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ret = rcl_publisher_wait_for_all_acked( + &publisher, + RCL_MS_TO_NS(500)); + EXPECT_TRUE(ret == RCL_RET_OK || ret == RCL_RET_TIMEOUT); + + ret = rcl_publisher_wait_for_all_acked(&publisher, -1); + EXPECT_EQ(RCL_RET_OK, ret); +} + +TEST_F( + CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), + test_wait_for_all_acked_with_best_effort) +{ + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, BasicTypes); + constexpr char topic_name[] = "test_wait_for_all_acked_with_best_effort"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + publisher_options.qos.depth = 10000; + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic_name, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + ret = rcl_publisher_wait_for_all_acked( + &publisher, + RCL_MS_TO_NS(500)); + EXPECT_EQ(RCL_RET_OK, ret); + + ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; +} diff --git a/rcl/test/resources/test_profile/disable_intraprocess.xml b/rcl/test/resources/test_profile/disable_intraprocess.xml new file mode 100644 index 000000000..8c1a92f2e --- /dev/null +++ b/rcl/test/resources/test_profile/disable_intraprocess.xml @@ -0,0 +1,3 @@ + + OFF +