From 4c5eedba92a5024a0e0554f21494d7ac33c6055c Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 21 Apr 2021 17:39:56 +0800 Subject: [PATCH 1/7] Add rcl_publisher_wait_for_all_acked support Signed-off-by: Barry Xu --- rcl/include/rcl/publisher.h | 39 ++++ rcl/src/rcl/publisher.c | 36 ++++ rcl/test/CMakeLists.txt | 9 + rcl/test/rcl/test_publisher.cpp | 69 +++++++ rcl/test/rcl/test_publisher_wait_all_ack.cpp | 192 ++++++++++++++++++ .../test_profile/disable_intraprocess.xml | 3 + 6 files changed, 348 insertions(+) create mode 100644 rcl/test/rcl/test_publisher_wait_all_ack.cpp create mode 100644 rcl/test/resources/test_profile/disable_intraprocess.xml diff --git a/rcl/include/rcl/publisher.h b/rcl/include/rcl/publisher.h index 85e26bfd5..1d7c8e3e4 100644 --- a/rcl/include/rcl/publisher.h +++ b/rcl/include/rcl/publisher.h @@ -434,6 +434,45 @@ RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); +/// Wait until all published message data is acknowledged or until the specified timeout elapses. +/** + * This function waits until all published message data were acknowledged by peer node or timeout. + * + * The unit of timeout is nanoseconds. + * If the timeout is negative then this function will block indefinitely until all published message + * data were acknowledged. + * If the timeout is 0 then this function will be non-blocking; checking all published message data + * were acknowledged, but not waiting. + * If the timeout is greater than 0 then this function will return after that period of time has + * elapsed or all published message data were acknowledged. + * + * This function only works effectively while QOS profile of publisher is set to RELIABLE. + * Otherwise this function will immediately return RCL_RET_OK. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | Yes + * Uses Atomics | No + * Lock-Free | No + * + * \param[in] publisher handle to the publisher that needs to wait for all acked. + * \param[in] timeout the duration to wait for all published message data were acknowledged, in + * nanoseconds. + * \return #RCL_RET_OK if successful, or + * \return #RCL_RET_TIMEOUT if wait timed out, or + * \return #RCL_RET_PUBLISHER_INVALID if publisher is invalid, or + * \return #RCL_RET_ERROR if an unspecified error occurs, or + * \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature. + */ +RMW_PUBLIC +RMW_WARN_UNUSED +rcl_ret_t +rcl_publisher_wait_for_all_acked( + const rcl_publisher_t * publisher, + int64_t timeout); + /// Get the topic name for the publisher. /** * This function returns the publisher's internal topic name string. diff --git a/rcl/src/rcl/publisher.c b/rcl/src/rcl/publisher.c index a7c4103ce..3ea65eb3e 100644 --- a/rcl/src/rcl/publisher.c +++ b/rcl/src/rcl/publisher.c @@ -27,6 +27,8 @@ extern "C" #include "rcl/node.h" #include "rcutils/logging_macros.h" #include "rcutils/macros.h" +#include "rcl/time.h" +#include "rmw/time.h" #include "rmw/error_handling.h" #include "tracetools/tracetools.h" @@ -304,6 +306,40 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher) return RCL_RET_OK; } +rcl_ret_t +rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, int64_t timeout) +{ + if (!rcl_publisher_is_valid(publisher)) { + return RCL_RET_PUBLISHER_INVALID; // error already set + } + + rmw_time_t rmw_timeout; + if (timeout > 0) { + rmw_timeout.sec = RCL_NS_TO_S(timeout); + rmw_timeout.nsec = timeout; + } else if (timeout < 0) { + rmw_time_t infinite = RMW_DURATION_INFINITE; + rmw_timeout = infinite; + } else { + rmw_time_t zero = RMW_DURATION_UNSPECIFIED; + rmw_timeout = zero; + } + + rmw_ret_t ret = rmw_publisher_wait_for_all_acked(publisher->impl->rmw_handle, rmw_timeout); + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + if (ret == RMW_RET_TIMEOUT) { + return RCL_RET_TIMEOUT; + } else if (ret == RMW_RET_UNSUPPORTED) { + return RCL_RET_UNSUPPORTED; + } else { + return RCL_RET_ERROR; + } + } + + return RCL_RET_OK; +} + const char * rcl_publisher_get_topic_name(const rcl_publisher_t * publisher) { diff --git a/rcl/test/CMakeLists.txt b/rcl/test/CMakeLists.txt index 277a0ad52..97ba66fbe 100644 --- a/rcl/test/CMakeLists.txt +++ b/rcl/test/CMakeLists.txt @@ -212,6 +212,15 @@ function(test_target_function) AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs" ) + rcl_add_custom_gtest(test_publisher_wait_all_ack${target_suffix} + SRCS rcl/test_publisher_wait_all_ack.cpp rcl/wait_for_entity_helpers.cpp + ENV ${rmw_implementation_env_var} + APPEND_LIBRARY_DIRS ${extra_lib_dirs} + INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../src/rcl/ + LIBRARIES ${PROJECT_NAME} mimick + AMENT_DEPENDENCIES ${rmw_implementation} "osrf_testing_tools_cpp" "test_msgs" + ) + rcl_add_custom_gtest(test_service${target_suffix} SRCS rcl/test_service.cpp rcl/wait_for_entity_helpers.cpp ENV ${rmw_implementation_env_var} diff --git a/rcl/test/rcl/test_publisher.cpp b/rcl/test/rcl/test_publisher.cpp index cf28011f7..9c853ccde 100644 --- a/rcl/test/rcl/test_publisher.cpp +++ b/rcl/test/rcl/test_publisher.cpp @@ -402,6 +402,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish EXPECT_EQ(RCL_RET_OK, rcl_publisher_assert_liveliness(&publisher)); + EXPECT_EQ(RCL_RET_OK, rcl_publisher_wait_for_all_acked(&publisher, 0)); + size_t count_size; test_msgs__msg__BasicTypes msg; rcl_serialized_message_t serialized_msg = rmw_get_zero_initialized_serialized_message(); @@ -429,6 +431,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -471,6 +475,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -502,6 +508,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(&publisher)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(&publisher, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(&publisher, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -532,6 +540,8 @@ TEST_F(CLASSNAME(TestPublisherFixture, RMW_IMPLEMENTATION), test_invalid_publish rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_assert_liveliness(nullptr)); rcl_reset_error(); + EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publisher_wait_for_all_acked(nullptr, 10000000)); + rcl_reset_error(); EXPECT_EQ(RCL_RET_PUBLISHER_INVALID, rcl_publish(nullptr, &msg, null_allocation_is_valid_arg)); rcl_reset_error(); EXPECT_EQ( @@ -572,6 +582,65 @@ TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_assert rcl_reset_error(); } +// Mocking rmw_publisher_wait_for_all_acked to make +// rcl_publisher_wait_for_all_acked fail +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, ==) +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, !=) +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, <) +MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rmw_time_t, >) + +TEST_F( + CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), + test_mock_assert_wait_for_all_acked) +{ +#define CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_RESULT, EXPECT_RET) do { \ + rmw_publisher_wait_for_all_acked_return = RMW_RET_RESULT; \ + ret = rcl_publisher_wait_for_all_acked(&publisher, 1000000); \ + EXPECT_EQ(EXPECT_RET, ret); \ + rcl_reset_error(); \ +} while (0) + + rcl_ret_t ret; + rmw_ret_t rmw_publisher_wait_for_all_acked_return; + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_publisher_wait_for_all_acked, rmw_publisher_wait_for_all_acked_return); + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_TIMEOUT + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_TIMEOUT, RCL_RET_TIMEOUT); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_UNSUPPORTED + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_UNSUPPORTED, RCL_RET_UNSUPPORTED); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_INVALID_ARGUMENT + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INVALID_ARGUMENT, RCL_RET_ERROR); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_INCORRECT_RMW_IMPLEMENTATION + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_INCORRECT_RMW_IMPLEMENTATION, RCL_RET_ERROR); + } + + { + // Now normal usage of the function rcl_publisher_wait_for_all_acked returning + // unexpected RMW_RET_ERROR + SCOPED_TRACE("Check RCL return failed !"); + CHECK_PUBLISHER_WAIT_FOR_ALL_ACKED_RETURN(RMW_RET_ERROR, RCL_RET_ERROR); + } +} + // Mocking rmw_publish to make rcl_publish fail TEST_F(CLASSNAME(TestPublisherFixtureInit, RMW_IMPLEMENTATION), test_mock_publish) { auto mock = mocking_utils::patch_and_return("lib:rcl", rmw_publish, RMW_RET_ERROR); diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp new file mode 100644 index 000000000..54a0f821c --- /dev/null +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -0,0 +1,192 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rcl/publisher.h" +#include "rcl/subscription.h" +#include "rcpputils/filesystem_helper.hpp" + +#include "rcl/rcl.h" +#include "test_msgs/msg/strings.h" +#include "test_msgs/msg/basic_types.h" +#include "rosidl_runtime_c/string_functions.h" +#include "wait_for_entity_helpers.hpp" + +#include "mimick/mimick.h" +#include "osrf_testing_tools_cpp/scope_exit.hpp" +#include "rcl/error_handling.h" + +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + + +/* This class is used for test_wait_for_all_acked + */ +class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::testing::Test +{ +public: + rcl_context_t * context_ptr; + rcl_node_t * node_ptr; + + void SetUp() + { + is_fastdds = (std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") == 0); + is_cyclonedds = (std::string(rmw_get_implementation_identifier()).find("rmw_cyclonedds") == 0); + is_connextdds = (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + + if (is_fastdds) { + // By default, fastdds use intraprocess mode in this scenario. But this leads to high-speed + // data transmission. test_wait_for_all_acked need low data transmission. So disable this + // mode via fastdds profile file. + rcpputils::fs::path fastdds_profile(TEST_RESOURCES_DIRECTORY); + fastdds_profile /= "test_profile/disable_intraprocess.xml"; +#ifdef _WIN32 + _putenv(env_var.c_str()); +#else + ASSERT_EQ(setenv("FASTRTPS_DEFAULT_PROFILES_FILE", fastdds_profile.string().c_str(), 1), 0); +#endif + } + + rcl_ret_t ret; + { + rcl_init_options_t init_options = rcl_get_zero_initialized_init_options(); + ret = rcl_init_options_init(&init_options, rcl_get_default_allocator()); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ(RCL_RET_OK, rcl_init_options_fini(&init_options)) << rcl_get_error_string().str; + }); + this->context_ptr = new rcl_context_t; + *this->context_ptr = rcl_get_zero_initialized_context(); + ret = rcl_init(0, nullptr, &init_options, this->context_ptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + this->node_ptr = new rcl_node_t; + *this->node_ptr = rcl_get_zero_initialized_node(); + constexpr char name[] = "test_publisher_node2"; + rcl_node_options_t node_options = rcl_node_get_default_options(); + ret = rcl_node_init(this->node_ptr, name, "", this->context_ptr, &node_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + void TearDown() + { + unsetenv("FASTRTPS_DEFAULT_PROFILES_FILE"); + rcl_ret_t ret = rcl_node_fini(this->node_ptr); + delete this->node_ptr; + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ret = rcl_shutdown(this->context_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ret = rcl_context_fini(this->context_ptr); + delete this->context_ptr; + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + +protected: + bool is_fastdds; + bool is_cyclonedds; + bool is_connextdds; +}; + + +TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for_all_acked) { + // cyclonedds use sync publish, so the below test method cannot work. + if (is_cyclonedds) { + GTEST_SKIP(); + } + + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, BasicTypes); + constexpr char topic_name[] = "test_wait_for_all_acked"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + publisher_options.qos.depth = 10000; + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic_name, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + subscription_options.qos.depth = 1; + subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + +#define INIT_SUBSCRIPTION(idx) \ + rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \ + ret = rcl_subscription_init( \ + &subscription ## idx, \ + this->node_ptr, \ + ts, \ + topic_name, \ + &subscription_options); \ + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \ + { \ + ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \ + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + }); + + INIT_SUBSCRIPTION(1) + INIT_SUBSCRIPTION(2) + INIT_SUBSCRIPTION(3) + + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + + // Prepare 1MB message +#define ONE_MEGABYTE (1024 * 1024) + char test_string[ONE_MEGABYTE]; + memset(test_string, 'a', ONE_MEGABYTE); + test_string[ONE_MEGABYTE - 1] = '\0'; + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + + ret = rcl_publish(&publisher, &msg, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription1, context_ptr, 10, 100)); + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription2, context_ptr, 10, 100)); + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription3, context_ptr, 10, 100)); + + int i = 0; + for (; i < 500; i++) { + ret = rcl_publish(&publisher, &msg, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ret = rcl_publisher_wait_for_all_acked( + &publisher, + RCL_MS_TO_NS(500)); + EXPECT_EQ(RCL_RET_TIMEOUT, ret); + + ret = rcl_publisher_wait_for_all_acked(&publisher, -1); + EXPECT_EQ(RCL_RET_OK, ret); +} diff --git a/rcl/test/resources/test_profile/disable_intraprocess.xml b/rcl/test/resources/test_profile/disable_intraprocess.xml new file mode 100644 index 000000000..8c1a92f2e --- /dev/null +++ b/rcl/test/resources/test_profile/disable_intraprocess.xml @@ -0,0 +1,3 @@ + + OFF + From c738bd41e5853ed3fb39f79ebe3fe306c1df69ef Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Mon, 26 Apr 2021 15:05:20 +0800 Subject: [PATCH 2/7] Update codes based on review comments Signed-off-by: Barry Xu --- rcl/include/rcl/publisher.h | 9 +- rcl/src/rcl/publisher.c | 4 +- rcl/test/rcl/test_publisher_wait_all_ack.cpp | 89 ++++++++++++++++++-- 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/rcl/include/rcl/publisher.h b/rcl/include/rcl/publisher.h index 1d7c8e3e4..685e9673f 100644 --- a/rcl/include/rcl/publisher.h +++ b/rcl/include/rcl/publisher.h @@ -27,6 +27,7 @@ extern "C" #include "rcl/macros.h" #include "rcl/node.h" #include "rcl/visibility_control.h" +#include "rcl/time.h" /// Internal rcl publisher implementation struct. struct rcl_publisher_impl_t; @@ -442,9 +443,11 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); * If the timeout is negative then this function will block indefinitely until all published message * data were acknowledged. * If the timeout is 0 then this function will be non-blocking; checking all published message data - * were acknowledged, but not waiting. + * were acknowledged (If acknowledged, return RCL_RET_OK. Otherwise, return RCL_RET_TIMEOUT), but + * not waiting. * If the timeout is greater than 0 then this function will return after that period of time has - * elapsed or all published message data were acknowledged. + * elapsed (return RCL_RET_TIMEOUT) or all published message data were acknowledged (return + * RCL_RET_OK). * * This function only works effectively while QOS profile of publisher is set to RELIABLE. * Otherwise this function will immediately return RCL_RET_OK. @@ -471,7 +474,7 @@ RMW_WARN_UNUSED rcl_ret_t rcl_publisher_wait_for_all_acked( const rcl_publisher_t * publisher, - int64_t timeout); + rcl_duration_value_t timeout); /// Get the topic name for the publisher. /** diff --git a/rcl/src/rcl/publisher.c b/rcl/src/rcl/publisher.c index 3ea65eb3e..1e5bb959d 100644 --- a/rcl/src/rcl/publisher.c +++ b/rcl/src/rcl/publisher.c @@ -307,7 +307,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher) } rcl_ret_t -rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, int64_t timeout) +rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, rcl_duration_value_t timeout) { if (!rcl_publisher_is_valid(publisher)) { return RCL_RET_PUBLISHER_INVALID; // error already set @@ -316,7 +316,7 @@ rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, int64_t time rmw_time_t rmw_timeout; if (timeout > 0) { rmw_timeout.sec = RCL_NS_TO_S(timeout); - rmw_timeout.nsec = timeout; + rmw_timeout.nsec = timeout % 1000000000; } else if (timeout < 0) { rmw_time_t infinite = RMW_DURATION_INFINITE; rmw_timeout = infinite; diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp index 54a0f821c..9729d6f92 100644 --- a/rcl/test/rcl/test_publisher_wait_all_ack.cpp +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -110,11 +110,6 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for_all_acked) { - // cyclonedds use sync publish, so the below test method cannot work. - if (is_cyclonedds) { - GTEST_SKIP(); - } - rcl_ret_t ret; rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); const rosidl_message_type_support_t * ts = @@ -185,8 +180,90 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for ret = rcl_publisher_wait_for_all_acked( &publisher, RCL_MS_TO_NS(500)); - EXPECT_EQ(RCL_RET_TIMEOUT, ret); + if (is_cyclonedds) { + // cyclonedds use sync publish, so above scenario cannot lead to timeout. + EXPECT_EQ(RCL_RET_OK, ret); + } else { + EXPECT_EQ(RCL_RET_TIMEOUT, ret); + } ret = rcl_publisher_wait_for_all_acked(&publisher, -1); EXPECT_EQ(RCL_RET_OK, ret); } + +TEST_F( + CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), + test_wait_for_all_acked_with_best_effort) +{ + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, BasicTypes); + constexpr char topic_name[] = "test_wait_for_all_acked_with_best_effort"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + publisher_options.qos.depth = 10000; + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic_name, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + subscription_options.qos.depth = 1; + subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + +#define INIT_SUBSCRIPTION(idx) \ + rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \ + ret = rcl_subscription_init( \ + &subscription ## idx, \ + this->node_ptr, \ + ts, \ + topic_name, \ + &subscription_options); \ + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \ + { \ + ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \ + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + }); + + INIT_SUBSCRIPTION(1) + INIT_SUBSCRIPTION(2) + INIT_SUBSCRIPTION(3) + + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + + // Prepare 1MB message +#define ONE_MEGABYTE (1024 * 1024) + char test_string[ONE_MEGABYTE]; + memset(test_string, 'a', ONE_MEGABYTE); + test_string[ONE_MEGABYTE - 1] = '\0'; + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + + ret = rcl_publish(&publisher, &msg, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription1, context_ptr, 10, 100)); + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription2, context_ptr, 10, 100)); + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription3, context_ptr, 10, 100)); + + int i = 0; + for (; i < 500; i++) { + ret = rcl_publish(&publisher, &msg, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ret = rcl_publisher_wait_for_all_acked( + &publisher, + RCL_MS_TO_NS(500)); + EXPECT_EQ(RCL_RET_OK, ret); +} From 071b068aa2cceec803874fcbe3a1d8139530fda4 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 27 Apr 2021 10:33:35 +0800 Subject: [PATCH 3/7] Use rcutils_set_env Signed-off-by: Barry Xu --- rcl/test/rcl/test_publisher_wait_all_ack.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp index 9729d6f92..19bf2a2e6 100644 --- a/rcl/test/rcl/test_publisher_wait_all_ack.cpp +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -21,6 +21,7 @@ #include "rcl/publisher.h" #include "rcl/subscription.h" #include "rcpputils/filesystem_helper.hpp" +#include "rcutils/env.h" #include "rcl/rcl.h" #include "test_msgs/msg/strings.h" @@ -60,11 +61,9 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes // mode via fastdds profile file. rcpputils::fs::path fastdds_profile(TEST_RESOURCES_DIRECTORY); fastdds_profile /= "test_profile/disable_intraprocess.xml"; -#ifdef _WIN32 - _putenv(env_var.c_str()); -#else - ASSERT_EQ(setenv("FASTRTPS_DEFAULT_PROFILES_FILE", fastdds_profile.string().c_str(), 1), 0); -#endif + ASSERT_EQ( + rcutils_set_env("FASTRTPS_DEFAULT_PROFILES_FILE", fastdds_profile.string().c_str()), + true); } rcl_ret_t ret; From 3356151d9ec9c8c951425462001a9c35e2a5178b Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 4 Aug 2021 10:19:40 +0800 Subject: [PATCH 4/7] Update the description and test codes Signed-off-by: Barry Xu --- rcl/include/rcl/publisher.h | 6 +- rcl/src/rcl/publisher.c | 5 +- rcl/test/rcl/test_publisher_wait_all_ack.cpp | 100 ++++--------------- 3 files changed, 24 insertions(+), 87 deletions(-) diff --git a/rcl/include/rcl/publisher.h b/rcl/include/rcl/publisher.h index 685e9673f..409eeffe6 100644 --- a/rcl/include/rcl/publisher.h +++ b/rcl/include/rcl/publisher.h @@ -439,7 +439,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); /** * This function waits until all published message data were acknowledged by peer node or timeout. * - * The unit of timeout is nanoseconds. + * The timeout unit is nanoseconds. * If the timeout is negative then this function will block indefinitely until all published message * data were acknowledged. * If the timeout is 0 then this function will be non-blocking; checking all published message data @@ -449,7 +449,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); * elapsed (return RCL_RET_TIMEOUT) or all published message data were acknowledged (return * RCL_RET_OK). * - * This function only works effectively while QOS profile of publisher is set to RELIABLE. + * This function only waits for acknowledgments if the publisher's QOS profile is RELIABLE. * Otherwise this function will immediately return RCL_RET_OK. * *
@@ -464,7 +464,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); * \param[in] timeout the duration to wait for all published message data were acknowledged, in * nanoseconds. * \return #RCL_RET_OK if successful, or - * \return #RCL_RET_TIMEOUT if wait timed out, or + * \return #RCL_RET_TIMEOUT if timed out, or * \return #RCL_RET_PUBLISHER_INVALID if publisher is invalid, or * \return #RCL_RET_ERROR if an unspecified error occurs, or * \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature. diff --git a/rcl/src/rcl/publisher.c b/rcl/src/rcl/publisher.c index 1e5bb959d..a3a97471c 100644 --- a/rcl/src/rcl/publisher.c +++ b/rcl/src/rcl/publisher.c @@ -327,10 +327,11 @@ rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, rcl_duration rmw_ret_t ret = rmw_publisher_wait_for_all_acked(publisher->impl->rmw_handle, rmw_timeout); if (ret != RMW_RET_OK) { - RCL_SET_ERROR_MSG(rmw_get_error_string().str); if (ret == RMW_RET_TIMEOUT) { return RCL_RET_TIMEOUT; - } else if (ret == RMW_RET_UNSUPPORTED) { + } + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + if (ret == RMW_RET_UNSUPPORTED) { return RCL_RET_UNSUPPORTED; } else { return RCL_RET_ERROR; diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp index 19bf2a2e6..25b852a28 100644 --- a/rcl/test/rcl/test_publisher_wait_all_ack.cpp +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -51,9 +51,7 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes void SetUp() { - is_fastdds = (std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") == 0); - is_cyclonedds = (std::string(rmw_get_implementation_identifier()).find("rmw_cyclonedds") == 0); - is_connextdds = (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + bool is_fastdds = (std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") == 0); if (is_fastdds) { // By default, fastdds use intraprocess mode in this scenario. But this leads to high-speed @@ -100,13 +98,24 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes delete this->context_ptr; EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; } - -protected: - bool is_fastdds; - bool is_cyclonedds; - bool is_connextdds; }; +#define INIT_SUBSCRIPTION(idx) \ + rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \ + ret = rcl_subscription_init( \ + &subscription ## idx, \ + this->node_ptr, \ + ts, \ + topic_name, \ + &subscription_options); \ + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \ + { \ + ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \ + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ + }); + +#define ONE_MEGABYTE (1024 * 1024) TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for_all_acked) { rcl_ret_t ret; @@ -129,29 +138,12 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for subscription_options.qos.depth = 1; subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; -#define INIT_SUBSCRIPTION(idx) \ - rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \ - ret = rcl_subscription_init( \ - &subscription ## idx, \ - this->node_ptr, \ - ts, \ - topic_name, \ - &subscription_options); \ - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \ - { \ - ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \ - EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ - }); - INIT_SUBSCRIPTION(1) INIT_SUBSCRIPTION(2) INIT_SUBSCRIPTION(3) ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); - // Prepare 1MB message -#define ONE_MEGABYTE (1024 * 1024) char test_string[ONE_MEGABYTE]; memset(test_string, 'a', ONE_MEGABYTE); test_string[ONE_MEGABYTE - 1] = '\0'; @@ -179,13 +171,8 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for ret = rcl_publisher_wait_for_all_acked( &publisher, RCL_MS_TO_NS(500)); + EXPECT_TRUE(ret == RCL_RET_OK || ret == RCL_RET_TIMEOUT); - if (is_cyclonedds) { - // cyclonedds use sync publish, so above scenario cannot lead to timeout. - EXPECT_EQ(RCL_RET_OK, ret); - } else { - EXPECT_EQ(RCL_RET_TIMEOUT, ret); - } ret = rcl_publisher_wait_for_all_acked(&publisher, -1); EXPECT_EQ(RCL_RET_OK, ret); } @@ -210,57 +197,6 @@ TEST_F( EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; }); - rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); - subscription_options.qos.depth = 1; - subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - -#define INIT_SUBSCRIPTION(idx) \ - rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \ - ret = rcl_subscription_init( \ - &subscription ## idx, \ - this->node_ptr, \ - ts, \ - topic_name, \ - &subscription_options); \ - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \ - { \ - ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \ - EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \ - }); - - INIT_SUBSCRIPTION(1) - INIT_SUBSCRIPTION(2) - INIT_SUBSCRIPTION(3) - - ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); - - // Prepare 1MB message -#define ONE_MEGABYTE (1024 * 1024) - char test_string[ONE_MEGABYTE]; - memset(test_string, 'a', ONE_MEGABYTE); - test_string[ONE_MEGABYTE - 1] = '\0'; - test_msgs__msg__Strings msg; - test_msgs__msg__Strings__init(&msg); - ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - test_msgs__msg__Strings__fini(&msg); - }); - - ret = rcl_publish(&publisher, &msg, nullptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - - ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription1, context_ptr, 10, 100)); - ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription2, context_ptr, 10, 100)); - ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription3, context_ptr, 10, 100)); - - int i = 0; - for (; i < 500; i++) { - ret = rcl_publish(&publisher, &msg, nullptr); - ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - } - ret = rcl_publisher_wait_for_all_acked( &publisher, RCL_MS_TO_NS(500)); From 8cf52b34efebee41dfb81696ed5ca98d6fcf1474 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 5 Aug 2021 09:56:11 +0800 Subject: [PATCH 5/7] Fix test codes Signed-off-by: Barry Xu --- rcl/test/rcl/test_publisher_wait_all_ack.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp index 25b852a28..aec7fea58 100644 --- a/rcl/test/rcl/test_publisher_wait_all_ack.cpp +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -131,7 +131,9 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( { rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); - EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + if (ret != RCL_RET_OK) { + FAIL() << rcl_get_error_string().str; + } }); rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); @@ -191,14 +193,12 @@ TEST_F( publisher_options.qos.depth = 10000; ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic_name, &publisher_options); ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); - EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; - }); ret = rcl_publisher_wait_for_all_acked( &publisher, RCL_MS_TO_NS(500)); EXPECT_EQ(RCL_RET_OK, ret); + + ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; } From cc75ee4a73191f6a9cda74d5c057dcf0c579ed4a Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Sun, 15 Aug 2021 16:58:06 +0800 Subject: [PATCH 6/7] Fix wrong macro and build error on Windows Signed-off-by: Barry Xu --- rcl/include/rcl/publisher.h | 4 ++-- rcl/test/rcl/test_publisher_wait_all_ack.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rcl/include/rcl/publisher.h b/rcl/include/rcl/publisher.h index 409eeffe6..8a8a2ca9f 100644 --- a/rcl/include/rcl/publisher.h +++ b/rcl/include/rcl/publisher.h @@ -469,8 +469,8 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher); * \return #RCL_RET_ERROR if an unspecified error occurs, or * \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature. */ -RMW_PUBLIC -RMW_WARN_UNUSED +RCL_PUBLIC +RCL_WARN_UNUSED rcl_ret_t rcl_publisher_wait_for_all_acked( const rcl_publisher_t * publisher, diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp index aec7fea58..2d30f68bb 100644 --- a/rcl/test/rcl/test_publisher_wait_all_ack.cpp +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -88,7 +88,7 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes void TearDown() { - unsetenv("FASTRTPS_DEFAULT_PROFILES_FILE"); + rcutils_set_env("FASTRTPS_DEFAULT_PROFILES_FILE", NULL); rcl_ret_t ret = rcl_node_fini(this->node_ptr); delete this->node_ptr; EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; From 7406137c957c6d238eb5ff64c6db1b71619ccb4d Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 24 Aug 2021 13:48:58 +0800 Subject: [PATCH 7/7] Fix stack overflow error on Window platform Signed-off-by: Barry Xu --- rcl/test/rcl/test_publisher_wait_all_ack.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rcl/test/rcl/test_publisher_wait_all_ack.cpp b/rcl/test/rcl/test_publisher_wait_all_ack.cpp index 2d30f68bb..c931c09aa 100644 --- a/rcl/test/rcl/test_publisher_wait_all_ack.cpp +++ b/rcl/test/rcl/test_publisher_wait_all_ack.cpp @@ -18,6 +18,7 @@ #include #include +#include "rcl/allocator.h" #include "rcl/publisher.h" #include "rcl/subscription.h" #include "rcpputils/filesystem_helper.hpp" @@ -146,16 +147,19 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); - char test_string[ONE_MEGABYTE]; + rcl_allocator_t allocator = rcl_get_default_allocator(); + char * test_string = static_cast(allocator.allocate(ONE_MEGABYTE, allocator.state)); + ASSERT_TRUE(test_string != NULL); memset(test_string, 'a', ONE_MEGABYTE); test_string[ONE_MEGABYTE - 1] = '\0'; test_msgs__msg__Strings msg; test_msgs__msg__Strings__init(&msg); - ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( { + allocator.deallocate(test_string, allocator.state); test_msgs__msg__Strings__fini(&msg); }); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); ret = rcl_publish(&publisher, &msg, nullptr); ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;