diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index bd705be06d..d403b7f6b4 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -43,6 +43,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp + src/rclcpp/detail/resolve_intra_process_buffer_type.cpp src/rclcpp/detail/resolve_parameter_overrides.cpp src/rclcpp/detail/rmw_implementation_specific_payload.cpp src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp @@ -71,7 +72,6 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/experimental/timers_manager.cpp src/rclcpp/future_return_code.cpp src/rclcpp/generic_publisher.cpp - src/rclcpp/generic_subscription.cpp src/rclcpp/graph_listener.cpp src/rclcpp/guard_condition.cpp src/rclcpp/init_options.cpp diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 65b29d8535..c524daa3c4 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -30,6 +30,7 @@ #include "rclcpp/detail/subscription_callback_type_helper.hpp" #include "rclcpp/function_traits.hpp" #include "rclcpp/message_info.hpp" +#include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/type_adapter.hpp" @@ -158,13 +159,14 @@ struct AnySubscriptionCallbackPossibleTypes template< typename MessageT, typename AllocatorT, - bool is_adapted_type = rclcpp::TypeAdapter::is_specialized::value + bool is_adapted_type = rclcpp::TypeAdapter::is_specialized::value, + bool is_serialized_type = serialization_traits::is_serialized_message_class::value > struct AnySubscriptionCallbackHelper; /// Specialization for when MessageT is not a TypeAdapter. template -struct AnySubscriptionCallbackHelper +struct AnySubscriptionCallbackHelper { using CallbackTypes = AnySubscriptionCallbackPossibleTypes; @@ -194,7 +196,7 @@ struct AnySubscriptionCallbackHelper /// Specialization for when MessageT is a TypeAdapter. template -struct AnySubscriptionCallbackHelper +struct AnySubscriptionCallbackHelper { using CallbackTypes = AnySubscriptionCallbackPossibleTypes; @@ -232,6 +234,26 @@ struct AnySubscriptionCallbackHelper >; }; +/// Specialization for when MessageT is a SerializedMessage to exclude duplicated declarations. +template +struct AnySubscriptionCallbackHelper +{ + using CallbackTypes = AnySubscriptionCallbackPossibleTypes; + + using variant_type = std::variant< + typename CallbackTypes::ConstRefSerializedMessageCallback, + typename CallbackTypes::ConstRefSerializedMessageWithInfoCallback, + typename CallbackTypes::UniquePtrSerializedMessageCallback, + typename CallbackTypes::UniquePtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedPtrSerializedMessageCallback, + typename CallbackTypes::SharedPtrSerializedMessageWithInfoCallback + >; +}; + } // namespace detail template< @@ -487,7 +509,9 @@ class AnySubscriptionCallback } // Dispatch when input is a ros message and the output could be anything. - void + template + typename std::enable_if::value, + void>::type dispatch( std::shared_ptr message, const rclcpp::MessageInfo & message_info) @@ -589,7 +613,7 @@ class AnySubscriptionCallback // Dispatch when input is a serialized message and the output could be anything. void dispatch( - std::shared_ptr serialized_message, + std::shared_ptr serialized_message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), false); diff --git a/rclcpp/include/rclcpp/create_generic_subscription.hpp b/rclcpp/include/rclcpp/create_generic_subscription.hpp index f5281cc673..56b009e898 100644 --- a/rclcpp/include/rclcpp/create_generic_subscription.hpp +++ b/rclcpp/include/rclcpp/create_generic_subscription.hpp @@ -38,42 +38,89 @@ namespace rclcpp * * \param topics_interface NodeTopicsInterface pointer used in parts of the setup. * \param topic_name Topic name - * \param topic_type Topic type + * \param type_support_handle Topic type support * \param qos %QoS settings * \param callback Callback for new messages of serialized form * \param options %Publisher options. * Not all publisher options are currently respected, the only relevant options for this * publisher are `event_callbacks`, `use_default_callbacks`, and `%callback_group`. */ -template> +template< + typename CallbackT, + typename AllocatorT = std::allocator> std::shared_ptr create_generic_subscription( rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface, const std::string & topic_name, - const std::string & topic_type, + const rosidl_message_type_support_t & type_support_handle, const rclcpp::QoS & qos, - std::function)> callback, + CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( rclcpp::SubscriptionOptionsWithAllocator() ) ) { - auto ts_lib = rclcpp::get_typesupport_library( - topic_type, "rosidl_typesupport_cpp"); + auto allocator = options.get_allocator(); + typename GenericSubscription::MessageMemoryStrategyType::SharedPtr msg_mem_strat = ( + GenericSubscription::MessageMemoryStrategyType::create_default() + ); + + using rclcpp::AnySubscriptionCallback; + AnySubscriptionCallback + any_subscription_callback(*allocator); + any_subscription_callback.set(std::forward(callback)); auto subscription = std::make_shared( topics_interface->get_node_base_interface(), - std::move(ts_lib), + type_support_handle, topic_name, - topic_type, qos, - callback, - options); + any_subscription_callback, + options, + msg_mem_strat); topics_interface->add_subscription(subscription, options.callback_group); return subscription; } +/// Create and return a GenericSubscription. +/** + * The returned pointer will never be empty, but this function can throw various exceptions, for + * instance when the message's package can not be found on the AMENT_PREFIX_PATH. + * + * \param topics_interface NodeTopicsInterface pointer used in parts of the setup. + * \param topic_name Topic name + * \param topic_type Topic type + * \param qos %QoS settings + * \param callback Callback for new messages of serialized form + * \param options %Publisher options. + * Not all publisher options are currently respected, the only relevant options for this + * publisher are `event_callbacks`, `use_default_callbacks`, and `%callback_group`. + */ +template< + typename CallbackT, + typename AllocatorT = std::allocator> +std::shared_ptr create_generic_subscription( + rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface, + const std::string & topic_name, + const std::string & topic_type, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ) +) +{ + auto ts_lib = rclcpp::get_typesupport_library( + topic_type, "rosidl_typesupport_cpp"); + + return create_generic_subscription( + topics_interface, topic_name, + *rclcpp::get_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib), + qos, std::forward(callback), options + ); +} + } // namespace rclcpp #endif // RCLCPP__CREATE_GENERIC_SUBSCRIPTION_HPP_ diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index f6088a33c3..f87ebb1b16 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -35,6 +35,53 @@ namespace rclcpp namespace detail { +/// Create and return a publisher of the given MessageT type. +/** + * The NodeT type only needs to have a method called get_node_topics_interface() + * which returns a shared_ptr to a NodeTopicsInterface. + */ +template< + typename MessageT = SerializedMessage, + typename AllocatorT = std::allocator, + typename PublisherT = rclcpp::Publisher, + typename NodeT> +std::shared_ptr +create_publisher( + NodeT & node, + const std::string & topic_name, + const rosidl_message_type_support_t & type_support, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options = ( + rclcpp::PublisherOptionsWithAllocator() + ) +) +{ + // Extract the NodeTopicsInterface from the NodeT. + using rclcpp::node_interfaces::get_node_topics_interface; + auto node_topics_interface = get_node_topics_interface(node); + auto node_parameters = node.get_node_parameters_interface(); + const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ? + rclcpp::detail::declare_qos_parameters( + options.qos_overriding_options, node_parameters, + node_topics_interface->resolve_topic_name(topic_name), + qos, rclcpp::detail::PublisherQosParametersTraits{}) : + qos; + + // Create the publisher. + auto pub = node_topics_interface->create_publisher( + topic_name, + rclcpp::create_publisher_factory( + options, + type_support), + actual_qos + ); + + // Add the publisher to the node topics interface. + node_topics_interface->add_publisher(pub, options.callback_group); + + return std::dynamic_pointer_cast(pub); +} + /// Create and return a publisher of the given MessageT type. template< typename MessageT, diff --git a/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp b/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp index e7196a1e11..234316a8f0 100644 --- a/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp +++ b/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp @@ -47,6 +47,11 @@ resolve_intra_process_buffer_type( return resolved_buffer_type; } +RCLCPP_PUBLIC +rclcpp::IntraProcessBufferType +resolve_intra_process_buffer_type( + const rclcpp::IntraProcessBufferType buffer_type); + } // namespace detail } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp index 1e5346116a..2d3c0a02f5 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_ +#include + namespace rclcpp { namespace experimental @@ -31,6 +33,8 @@ class BufferImplementationBase virtual BufferT dequeue() = 0; virtual void enqueue(BufferT request) = 0; + virtual std::vector get_all_data() = 0; + virtual void clear() = 0; virtual bool has_data() const = 0; }; diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index 05092bb23b..744bff132a 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" @@ -65,6 +66,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase virtual MessageSharedPtr consume_shared() = 0; virtual MessageUniquePtr consume_unique() = 0; + + virtual std::vector get_all_data_shared() = 0; + virtual std::vector get_all_data_unique() = 0; }; template< @@ -128,6 +132,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer(); } + std::vector get_all_data_shared() override + { + return get_all_data_shared_impl(); + } + + std::vector get_all_data_unique() override + { + return get_all_data_unique_impl(); + } + bool has_data() const override { return buffer_->has_data(); @@ -237,6 +251,71 @@ class TypedIntraProcessBuffer : public IntraProcessBufferdequeue(); } + + // MessageSharedPtr to MessageSharedPtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_shared_impl() + { + return buffer_->get_all_data(); + } + + // MessageUniquePtr to MessageSharedPtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_shared_impl() + { + std::vector result; + auto uni_ptr_vec = buffer_->get_all_data(); + result.reserve(uni_ptr_vec.size()); + for (MessageUniquePtr & uni_ptr : uni_ptr_vec) { + result.emplace_back(std::move(uni_ptr)); + } + return result; + } + + // MessageSharedPtr to MessageUniquePtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_unique_impl() + { + std::vector result; + auto shared_ptr_vec = buffer_->get_all_data(); + result.reserve(shared_ptr_vec.size()); + for (MessageSharedPtr shared_msg : shared_ptr_vec) { + MessageUniquePtr unique_msg; + MessageDeleter * deleter = std::get_deleter(shared_msg); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg); + if (deleter) { + unique_msg = MessageUniquePtr(ptr, *deleter); + } else { + unique_msg = MessageUniquePtr(ptr); + } + result.push_back(std::move(unique_msg)); + } + return result; + } + + // MessageUniquePtr to MessageUniquePtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_unique_impl() + { + return buffer_->get_all_data(); + } }; } // namespace buffers diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index 2c06ea6cbe..e2800f493f 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_ #define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_ +#include #include #include #include @@ -110,6 +111,17 @@ class RingBufferImplementation : public BufferImplementationBase return request; } + /// Get all the elements from the ring buffer + /** + * This member function is thread-safe. + * + * \return a vector containing all the elements from the ring buffer + */ + std::vector get_all_data() override + { + return get_all_data_impl(); + } + /// Get the next index value for the ring buffer /** * This member function is thread-safe. @@ -189,6 +201,72 @@ class RingBufferImplementation : public BufferImplementationBase return size_ == capacity_; } + + /// Traits for checking if a type is std::unique_ptr + template + struct is_std_unique_ptr final : std::false_type {}; + template + struct is_std_unique_ptr> final : std::true_type + { + typedef T Ptr_type; + }; + + /// Get all the elements from the ring buffer + /** + * This member function is thread-safe. + * Two versions for the implementation of the function. + * One for buffer containing unique_ptr and the other for other types + * + * \return a vector containing all the elements from the ring buffer + */ + template::value && + std::is_copy_constructible< + typename is_std_unique_ptr::Ptr_type + >::value, + void> * = nullptr> + std::vector get_all_data_impl() + { + std::lock_guard lock(mutex_); + std::vector result_vtr; + result_vtr.reserve(size_); + for (size_t id = 0; id < size_; ++id) { + result_vtr.emplace_back( + new typename is_std_unique_ptr::Ptr_type( + *(ring_buffer_[(read_index_ + id) % capacity_]))); + } + return result_vtr; + } + + template::value, void> * = nullptr> + std::vector get_all_data_impl() + { + std::lock_guard lock(mutex_); + std::vector result_vtr; + result_vtr.reserve(size_); + for (size_t id = 0; id < size_; ++id) { + result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]); + } + return result_vtr; + } + + template::value && + !std::is_copy_constructible::value, void> * = nullptr> + std::vector get_all_data_impl() + { + throw std::logic_error("Underlined type results in invalid get_all_data_impl()"); + return {}; + } + + template::value && + !std::is_copy_constructible::Ptr_type>::value, + void> * = nullptr> + std::vector get_all_data_impl() + { + throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()"); + return {}; + } + size_t capacity_; std::vector ring_buffer_; diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index cfd82eebcf..e1478c6320 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -28,6 +28,7 @@ #include #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" @@ -36,9 +37,14 @@ #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_adapter.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/asserts.hpp" + namespace rclcpp { @@ -112,9 +118,40 @@ class IntraProcessManager * \param subscription the SubscriptionIntraProcess to register. * \return an unsigned 64-bit integer which is the subscription's unique id. */ - RCLCPP_PUBLIC + template< + typename ROSMessageType, + typename Alloc = std::allocator + > uint64_t - add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + std::unique_lock lock(mutex_); + + uint64_t sub_id = IntraProcessManager::get_next_unique_id(); + + subscriptions_[sub_id] = subscription; + + // adds the subscription id to all the matchable publishers + for (auto & pair : publishers_) { + auto publisher = pair.second.lock(); + if (!publisher) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t pub_id = pair.first; + insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method(), subscription->is_serialized()); + if (publisher->is_durability_transient_local() && + subscription->is_durability_transient_local()) + { + do_transient_local_publish( + pub_id, sub_id, + subscription->use_take_shared_method()); + } + } + } + + return sub_id; + } /// Unregister a subscription using the subscription's unique id. /** @@ -131,14 +168,21 @@ class IntraProcessManager * This method stores the publisher intra process object, together with * the information of its wrapped publisher (i.e. topic name and QoS). * + * If the publisher's durability is transient local, its buffer pointer should + * be passed and the method will store it as well. + * * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. + * \param buffer publisher's buffer to be stored if its duability is transient local. * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher); + add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer = + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr()); /// Unregister a publisher using the publisher's unique id. /** @@ -185,6 +229,56 @@ class IntraProcessManager uint64_t intra_process_publisher_id, std::unique_ptr message, typename allocator::AllocRebind::allocator_type & allocator) + { + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indices = SplitSubscriptionsIndices; + + std::shared_lock lock(mutex_); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling do_intra_process_publish for invalid or no longer existing publisher id"); + return; + } + const auto & sub_ids = publisher_it->second; + + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indices::ownership_other].size() + + sub_ids.take_subscriptions[Indices::shared_other].size() > 0) + { + do_intra_process_publish_other_type( + intra_process_publisher_id, + message.get(), allocator, + sub_ids.take_subscriptions[Indices::ownership_other], + sub_ids.take_subscriptions[Indices::shared_other] + ); + } + + do_intra_process_publish_same_type( + intra_process_publisher_id, + std::move(message), allocator, + sub_ids.take_subscriptions[Indices::ownership_same], + sub_ids.take_subscriptions[Indices::shared_same] + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete + > + void + do_intra_process_publish_same_type( + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -199,41 +293,40 @@ class IntraProcessManager "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return; } - const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { + if (take_ownership_subscriptions.empty()) { // None of the buffers require ownership, so we promote the pointer std::shared_ptr msg = std::move(message); this->template add_shared_msg_to_buffers( - msg, sub_ids.take_shared_subscriptions); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() <= 1) + msg, take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) { // There is at maximum 1 buffer that does not require ownership. // So this case is equivalent to all the buffers requiring ownership // Merge the two vector of ids into a unique one - std::vector concatenated_vector(sub_ids.take_shared_subscriptions); + std::vector concatenated_vector(take_shared_subscriptions); concatenated_vector.insert( concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); this->template add_owned_msg_to_buffers( std::move(message), concatenated_vector, allocator); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() > 1) + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) { // Construct a new shared pointer from the message // for the buffers that do not require ownership auto shared_msg = std::allocate_shared(allocator, *message); this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); + shared_msg, take_shared_subscriptions); this->template add_owned_msg_to_buffers( - std::move(message), sub_ids.take_ownership_subscriptions, allocator); + std::move(message), take_ownership_subscriptions, allocator); } } @@ -248,6 +341,57 @@ class IntraProcessManager uint64_t intra_process_publisher_id, std::unique_ptr message, typename allocator::AllocRebind::allocator_type & allocator) + { + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indices = SplitSubscriptionsIndices; + + std::shared_lock lock(mutex_); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling do_intra_process_publish for invalid or no longer existing publisher id"); + return nullptr; + } + const auto & sub_ids = publisher_it->second; + + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indices::ownership_other].size() + + sub_ids.take_subscriptions[Indices::shared_other].size() > 0) + { + do_intra_process_publish_other_type( + intra_process_publisher_id, + message.get(), allocator, + sub_ids.take_subscriptions[Indices::ownership_other], + sub_ids.take_subscriptions[Indices::shared_other] + ); + } + + return do_intra_process_publish_and_return_shared_same_type( + intra_process_publisher_id, + std::move(message), allocator, + sub_ids.take_subscriptions[Indices::ownership_same], + sub_ids.take_subscriptions[Indices::shared_same] + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete + > + std::shared_ptr + do_intra_process_publish_and_return_shared_same_type( + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -262,14 +406,13 @@ class IntraProcessManager "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return nullptr; } - const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { + if (take_ownership_subscriptions.empty()) { // If there are no owning, just convert to shared. std::shared_ptr shared_msg = std::move(message); - if (!sub_ids.take_shared_subscriptions.empty()) { + if (!take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); + shared_msg, take_shared_subscriptions); } return shared_msg; } else { @@ -277,21 +420,49 @@ class IntraProcessManager // do not require ownership and to return. auto shared_msg = std::allocate_shared(allocator, *message); - if (!sub_ids.take_shared_subscriptions.empty()) { + if (!take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( shared_msg, - sub_ids.take_shared_subscriptions); + take_shared_subscriptions); } - if (!sub_ids.take_ownership_subscriptions.empty()) { + if (!take_ownership_subscriptions.empty()) { this->template add_owned_msg_to_buffers( std::move(message), - sub_ids.take_ownership_subscriptions, + take_ownership_subscriptions, allocator); } return shared_msg; } } + template< + typename MessageT, + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_shared_msg_to_buffer( + std::shared_ptr message, + uint64_t subscription_id) + { + add_shared_msg_to_buffers(message, {subscription_id}); + } + + template< + typename MessageT, + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_owned_msg_to_buffer( + std::unique_ptr message, + uint64_t subscription_id, + typename allocator::AllocRebind::allocator_type & allocator) + { + add_owned_msg_to_buffers( + std::move(message), {subscription_id}, allocator); + } + /// Return true if the given rmw_gid_t matches any stored Publishers. RCLCPP_PUBLIC bool @@ -309,8 +480,39 @@ class IntraProcessManager private: struct SplittedSubscriptions { - std::vector take_shared_subscriptions; - std::vector take_ownership_subscriptions; + enum + { + IndexSharedTyped = 0u, IndexSharedSerialized = 1u, + IndexOwnershipTyped = 2u, IndexOwnershipSerialized = 3u, + IndexNum = 4u + }; + + /// get the index for "take_subscriptions" depending on shared/serialized + constexpr static uint32_t + get_index(const bool is_shared, const bool is_serialized) + { + return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) + + (is_shared ? IndexSharedTyped : IndexOwnershipTyped); + } + + std::vector take_subscriptions[IndexNum]; + }; + + template + struct SplitSubscriptionsIndices + { + constexpr static auto ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized); + constexpr static auto shared_same = SplittedSubscriptions::get_index( + true, + is_serialized); + constexpr static auto ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized); + constexpr static auto shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized); }; using SubscriptionMap = @@ -319,6 +521,9 @@ class IntraProcessManager using PublisherMap = std::unordered_map; + using PublisherBufferMap = + std::unordered_map; + using PublisherToSubscriptionIdsMap = std::unordered_map; @@ -329,7 +534,9 @@ class IntraProcessManager RCLCPP_PUBLIC void - insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + insert_sub_id_for_pub( + uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method, + bool is_serialized_subscriber); RCLCPP_PUBLIC bool @@ -337,6 +544,206 @@ class IntraProcessManager rclcpp::PublisherBase::SharedPtr pub, rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; + template< + typename ROSMessageType, + typename Alloc = std::allocator + > + void do_transient_local_publish( + const uint64_t pub_id, const uint64_t sub_id, + const bool use_take_shared_method) + { + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + auto publisher_buffer = publisher_buffers_[pub_id].lock(); + if (!publisher_buffer) { + throw std::runtime_error("publisher buffer has unexpectedly gone out of scope"); + } + auto buffer = std::dynamic_pointer_cast< + rclcpp::experimental::buffers::IntraProcessBuffer< + ROSMessageType, + ROSMessageTypeAllocator, + ROSMessageTypeDeleter + > + >(publisher_buffer); + if (!buffer) { + throw std::runtime_error( + "failed to dynamic cast publisher's IntraProcessBufferBase to " + "IntraProcessBuffer which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + if (use_take_shared_method) { + auto data_vec = buffer->get_all_data_shared(); + for (auto shared_data : data_vec) { + this->template add_shared_msg_to_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( + shared_data, sub_id); + } + } else { + auto data_vec = buffer->get_all_data_unique(); + for (auto & owned_data : data_vec) { + auto allocator = ROSMessageTypeAllocator(); + this->template add_owned_msg_to_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( + std::move(owned_data), sub_id, allocator); + } + } + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete> + typename std::enable_if_t< + std::is_same::value, + void + > + do_intra_process_publish_other_type( + uint64_t intra_process_publisher_id, + const MessageT * unsupported_message, + typename allocator::AllocRebind::allocator_type &, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + typedef std::allocator SerializedAlloc; + using SerializedAllocatorTraits = allocator::AllocRebind; + + SerializedAllocatorTraits::allocator_type serialized_allocator; + + auto ptr = SerializedAllocatorTraits::allocate(serialized_allocator, 1); + SerializedAllocatorTraits::construct(serialized_allocator, ptr); + std::unique_ptr serialized_message(ptr); + if constexpr (!std::is_same::value) { + Serialization serializer; + serializer.serialize_message(unsupported_message, serialized_message.get()); + } else { + (void)unsupported_message; + throw std::runtime_error("Serialized message cannot be serialized."); + } + + this->template do_intra_process_publish_and_return_shared_same_type( + intra_process_publisher_id, + std::move(serialized_message), + serialized_allocator, + take_ownership_subscriptions, + take_shared_subscriptions + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete> + typename std::enable_if_t< + !std::is_same::value, + void + > + do_intra_process_publish_other_type( + uint64_t intra_process_publisher_id, + const MessageT * unsupported_message, + typename allocator::AllocRebind::allocator_type &, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + typedef std::allocator SerializedAlloc; + using SerializedAllocatorTraits = allocator::AllocRebind; + + SerializedAllocatorTraits::allocator_type serialized_allocator; + + auto ptr = SerializedAllocatorTraits::allocate(serialized_allocator, 1); + SerializedAllocatorTraits::construct(serialized_allocator, ptr); + std::unique_ptr serialized_message(ptr); + Serialization serializer; + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message( + *unsupported_message, + ros_msg); + serializer.serialize_message(&ros_msg, serialized_message.get()); + + this->template do_intra_process_publish_and_return_shared_same_type( + intra_process_publisher_id, + std::move(serialized_message), + serialized_allocator, + take_ownership_subscriptions, + take_shared_subscriptions + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete> + typename std::enable_if_t< + !std::is_same::value, + void + > + do_intra_process_publish_other_type( + uint64_t intra_process_publisher_id, + const rclcpp::SerializedMessage * unsupported_message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + for (const auto id : take_ownership_subscriptions) { + auto subscription_it = subscriptions_.find(id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.lock(); + if (subscription_base == nullptr) { + subscriptions_.erase(id); + continue; + } + + if (subscription_base->serve_serialized_message( + unsupported_message, + this, + intra_process_publisher_id, + static_cast(&allocator), + take_ownership_subscriptions, + take_shared_subscriptions + )) + { + // message was successfully converted and forwarded, so stop further forwarding + return; + } + } + + for (const auto id : take_shared_subscriptions) { + auto subscription_it = subscriptions_.find(id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.lock(); + if (subscription_base == nullptr) { + subscriptions_.erase(id); + continue; + } + + if (subscription_base->serve_serialized_message( + unsupported_message, + this, + intra_process_publisher_id, + static_cast(&allocator), + take_ownership_subscriptions, + take_shared_subscriptions + )) + { + // message was successfully converted and forwarded, so stop further forwarding + return; + } + } + } + template< typename MessageT, typename Alloc, @@ -515,10 +922,45 @@ class IntraProcessManager PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; + PublisherBufferMap publisher_buffers_; mutable std::shared_timed_mutex mutex_; }; +namespace detail +{ +/** + * Helper function to expose method of IntraProcessManager + * for publishing messages with same data type. + * + * This is needed as the publisher of serialized message is not aware of the subscribed + * data type. While the subscription has all needed information (MessageT, Allocator) to + * cast and deserialize the message. The type information is forwarded by the helper function. + */ +template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete +> +void do_intra_process_publish_same_type( + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename rclcpp::allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) +{ + intraprocess_manager->template do_intra_process_publish_same_type( + intra_process_publisher_id, + std::move(message), + allocator, + take_ownership_subscriptions, + take_shared_subscriptions); +} +} // namespace detail + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 6583e74ae7..7643d98a68 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -19,13 +19,16 @@ #include #include #include +#include #include "rcl/wait.h" #include "rmw/impl/cpp/demangle.hpp" +#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/waitable.hpp" namespace rclcpp @@ -33,6 +36,25 @@ namespace rclcpp namespace experimental { +// Forward declarations to reuse methods from IntraProcessManager +class IntraProcessManager; +namespace detail +{ +template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete +> +void do_intra_process_publish_same_type( + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions); +} // namespace detail + class SubscriptionIntraProcessBase : public rclcpp::Waitable { public: @@ -62,6 +84,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable void add_to_wait_set(rcl_wait_set_t * wait_set) override; + RCLCPP_PUBLIC + bool + is_durability_transient_local() const; + bool is_ready(rcl_wait_set_t * wait_set) override = 0; @@ -171,6 +197,36 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable on_new_message_callback_ = nullptr; } + /// Check if subscription type is a serialized message type. + RCLCPP_PUBLIC + virtual bool + is_serialized() const = 0; + + /// Convert serialized message to ROS message type and serve it back to IPM. + /** + * Convert serialized message to ROS message type and serve it back to IPM. + * This is needed as the publisher of serialized message is not aware of the subscribed + * data type. While the subscription has all needed information (MessageT, Allocator) to + * cast and deserialize the message. + * + * \param serialized_message serialized message which needs to be de-serialized. + * \param intraprocess_manager intraprocess manger to which the de-serialized messaged should be forwarded. + * \param intra_process_publisher_id id of publisher. + * \param untyped_allocator pointer to allocator of message. + * \param take_ownership_subscriptions subscription ids which takes ownership. + * \param take_shared_subscriptions subscription ids with shared ownership. + * \return true for success. + */ + RCLCPP_PUBLIC + virtual bool + serve_serialized_message( + const rclcpp::SerializedMessage * serialized_message, + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + void * untyped_allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) = 0; + protected: std::recursive_mutex callback_mutex_; std::function on_new_message_callback_ {nullptr}; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index 30debed83a..882533b578 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl/error_handling.h" #include "rcl/guard_condition.h" @@ -169,6 +170,72 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff return buffer_->use_take_shared_method(); } + bool + is_serialized() const override + { + return serialization_traits::is_serialized_message_class::value; + } + + SubscribedTypeUniquePtr deserialize_message( + const rclcpp::SerializedMessage * serialized_message) + { + if (serialized_message == nullptr) { + // The method is only allowed to be called with valid data (serialized case). + throw std::runtime_error("nullptr provided for serialized message"); + } + + auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); + SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr); + if constexpr (std::is_same::value) { + (void)serialized_message; + throw std::runtime_error("Serialized message cannot be serialized."); + } else if constexpr (std::is_same::value) { + Serialization serializer; + serializer.deserialize_message(serialized_message, ptr); + } else { + ROSMessageType ros_msg; + Serialization serializer; + serializer.deserialize_message(serialized_message, &ros_msg); + rclcpp::TypeAdapter::convert_to_custom(ros_msg, *ptr); + } + + return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_); + } + + bool serve_serialized_message( + const rclcpp::SerializedMessage * serialized_message, + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + void * untyped_allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) override + { + if (intraprocess_manager == nullptr) { + // The method is only allowed to be called with valid data (serialized case). + throw std::runtime_error("nullptr provided for serialized inter-process manager"); + } + + typedef typename allocator::AllocRebind::allocator_type AllocType; + auto allocator = static_cast(untyped_allocator); + + if (allocator == nullptr) { + throw std::runtime_error( + "failed to cast allocator " + "which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + + detail::do_intra_process_publish_same_type( + intraprocess_manager, + intra_process_publisher_id, + deserialize_message(serialized_message), + *allocator, + take_ownership_subscriptions, + take_shared_subscriptions); + + return true; + } + protected: void trigger_guard_condition() override diff --git a/rclcpp/include/rclcpp/generic_subscription.hpp b/rclcpp/include/rclcpp/generic_subscription.hpp index 975a9d0d0d..9f0a41d994 100644 --- a/rclcpp/include/rclcpp/generic_subscription.hpp +++ b/rclcpp/include/rclcpp/generic_subscription.hpp @@ -16,145 +16,12 @@ #ifndef RCLCPP__GENERIC_SUBSCRIPTION_HPP_ #define RCLCPP__GENERIC_SUBSCRIPTION_HPP_ -#include -#include -#include - -#include "rcpputils/shared_library.hpp" - -#include "rclcpp/callback_group.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/node_interfaces/node_base_interface.hpp" -#include "rclcpp/node_interfaces/node_topics_interface.hpp" -#include "rclcpp/qos.hpp" -#include "rclcpp/serialized_message.hpp" -#include "rclcpp/subscription_base.hpp" -#include "rclcpp/typesupport_helpers.hpp" -#include "rclcpp/visibility_control.hpp" +#include "rclcpp/subscription.hpp" namespace rclcpp { -/// %Subscription for serialized messages whose type is not known at compile time. -/** - * Since the type is not known at compile time, this is not a template, and the dynamic library - * containing type support information has to be identified and loaded based on the type name. - * - * It does not support intra-process handling. - */ -class GenericSubscription : public rclcpp::SubscriptionBase -{ -public: - // cppcheck-suppress unknownMacro - RCLCPP_SMART_PTR_DEFINITIONS(GenericSubscription) - - /// Constructor. - /** - * In order to properly subscribe to a topic, this subscription needs to be added to - * the node_topic_interface of the node passed into this constructor. - * - * \sa rclcpp::Node::create_generic_subscription() or rclcpp::create_generic_subscription() for - * creating an instance of this class and adding it to the node_topic_interface. - * - * \param node_base Pointer to parent node's NodeBaseInterface - * \param ts_lib Type support library, needs to correspond to topic_type - * \param topic_name Topic name - * \param topic_type Topic type - * \param qos %QoS settings - * \param callback Callback for new messages of serialized form - * \param options %Subscription options. - * Not all subscription options are currently respected, the only relevant options for this - * subscription are `event_callbacks`, `use_default_callbacks`, `ignore_local_publications`, and - * `%callback_group`. - */ - template> - GenericSubscription( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const std::shared_ptr ts_lib, - const std::string & topic_name, - const std::string & topic_type, - const rclcpp::QoS & qos, - // TODO(nnmm): Add variant for callback with message info. See issue #1604. - std::function)> callback, - const rclcpp::SubscriptionOptionsWithAllocator & options) - : SubscriptionBase( - node_base, - *rclcpp::get_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib), - topic_name, - options.to_rcl_subscription_options(qos), - options.event_callbacks, - options.use_default_callbacks, - DeliveredMessageKind::SERIALIZED_MESSAGE), - callback_(callback), - ts_lib_(ts_lib) - {} - - RCLCPP_PUBLIC - virtual ~GenericSubscription() = default; - - // Same as create_serialized_message() as the subscription is to serialized_messages only - RCLCPP_PUBLIC - std::shared_ptr create_message() override; - - RCLCPP_PUBLIC - std::shared_ptr create_serialized_message() override; - - /// Cast the message to a rclcpp::SerializedMessage and call the callback. - RCLCPP_PUBLIC - void handle_message( - std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override; - - /// Handle dispatching rclcpp::SerializedMessage to user callback. - RCLCPP_PUBLIC - void - handle_serialized_message( - const std::shared_ptr & serialized_message, - const rclcpp::MessageInfo & message_info) override; - - /// This function is currently not implemented. - RCLCPP_PUBLIC - void handle_loaned_message( - void * loaned_message, const rclcpp::MessageInfo & message_info) override; - - // Same as return_serialized_message() as the subscription is to serialized_messages only - RCLCPP_PUBLIC - void return_message(std::shared_ptr & message) override; - - RCLCPP_PUBLIC - void return_serialized_message(std::shared_ptr & message) override; - - - // DYNAMIC TYPE ================================================================================== - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr get_shared_dynamic_message_type() - override; - - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr get_shared_dynamic_message() override; - - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr - get_shared_dynamic_serialization_support() override; - - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override; - - RCLCPP_PUBLIC - void return_dynamic_message( - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) override; - - RCLCPP_PUBLIC - void handle_dynamic_message( - const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message, - const rclcpp::MessageInfo & message_info) override; - -private: - RCLCPP_DISABLE_COPY(GenericSubscription) - - std::function)> callback_; - // The type support library should stay loaded, so it is stored in the GenericSubscription - std::shared_ptr ts_lib_; -}; +using GenericSubscription = rclcpp::Subscription; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 59863df91e..a35f31e19f 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -356,12 +356,42 @@ class Node : public std::enable_shared_from_this * `%callback_group`. * \return Shared pointer to the created generic subscription. */ - template> + template< + typename CallbackT, + typename AllocatorT = std::allocator> std::shared_ptr create_generic_subscription( const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos, - std::function)> callback, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ) + ); + + /// Create and return a GenericSubscription. + /** + * The returned pointer will never be empty, but this function can throw various exceptions, for + * instance when the message's package can not be found on the AMENT_PREFIX_PATH. + * + * \param[in] topic_name Topic name + * \param[in] type_support_handle Topic type support + * \param[in] qos %QoS settings + * \param[in] callback Callback for new messages of serialized form + * \param[in] options %Subscription options. + * Not all subscription options are currently respected, the only relevant options for this + * subscription are `event_callbacks`, `use_default_callbacks`, `ignore_local_publications`, and + * `%callback_group`. + * \return Shared pointer to the created generic subscription. + */ + template< + typename CallbackT, + typename AllocatorT = std::allocator> + std::shared_ptr create_generic_subscription( + const std::string & topic_name, + const rosidl_message_type_support_t & type_support_handle, + const rclcpp::QoS & qos, + CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( rclcpp::SubscriptionOptionsWithAllocator() ) diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index d6b090d4d1..b3e54794c2 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -219,13 +219,13 @@ Node::create_generic_publisher( ); } -template +template std::shared_ptr Node::create_generic_subscription( const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos, - std::function)> callback, + CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options) { return rclcpp::create_generic_subscription( @@ -233,7 +233,26 @@ Node::create_generic_subscription( extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), topic_type, qos, - std::move(callback), + std::forward(callback), + options + ); +} + +template +std::shared_ptr +Node::create_generic_subscription( + const std::string & topic_name, + const rosidl_message_type_support_t & type_support_handle, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options) +{ + return rclcpp::create_generic_subscription( + node_topics_, + extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), + type_support_handle, + qos, + std::forward(callback), options ); } diff --git a/rclcpp/include/rclcpp/node_options.hpp b/rclcpp/include/rclcpp/node_options.hpp index 71b0d997cf..dee2b87b7e 100644 --- a/rclcpp/include/rclcpp/node_options.hpp +++ b/rclcpp/include/rclcpp/node_options.hpp @@ -433,7 +433,7 @@ class NodeOptions bool start_parameter_services_ {true}; - bool start_parameter_event_publisher_ {true}; + bool start_parameter_event_publisher_ {false}; rcl_clock_type_t clock_type_ {RCL_ROS_TIME}; diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index f474a67192..9f221f2f09 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -32,6 +32,9 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" +#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/get_message_type_support_handle.hpp" #include "rclcpp/is_ros_compatible_type.hpp" @@ -78,7 +81,8 @@ class Publisher : public PublisherBase { public: static_assert( - rclcpp::is_ros_compatible_type::value, + rclcpp::is_ros_compatible_type::value || std::is_same::value, "given message type is not compatible with ROS and cannot be used with a Publisher"); /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. @@ -109,6 +113,12 @@ class Publisher : public PublisherBase [[deprecated("use std::shared_ptr")]] = std::shared_ptr; + using BufferSharedPtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< + ROSMessageType, + ROSMessageTypeAllocator, + ROSMessageTypeDeleter + >::SharedPtr; + RCLCPP_SMART_PTR_DEFINITIONS(Publisher) /// Default constructor. @@ -121,16 +131,18 @@ class Publisher : public PublisherBase * \param[in] topic Name of the topic to publish to. * \param[in] qos QoS profile for Subcription. * \param[in] options Options for the subscription. + * \param[in] type_support Type support for the subscribed topic. */ Publisher( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) : PublisherBase( node_base, topic, - rclcpp::get_message_type_support_handle(), + type_support, options.template to_rcl_publisher_options(qos), // NOTE(methylDragon): Passing these args separately is necessary for event binding options.event_callbacks, @@ -144,6 +156,19 @@ class Publisher : public PublisherBase // Setup continues in the post construction method, post_init_setup(). } + Publisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + : Publisher( + node_base, + topic, + qos, + options, + rclcpp::get_message_type_support_handle()) + {} + /// Called post construction, so that construction may continue after shared_from_this() works. virtual void @@ -171,11 +196,14 @@ class Publisher : public PublisherBase throw std::invalid_argument( "intraprocess communication is not allowed with a zero qos history depth value"); } - if (qos.durability() != rclcpp::DurabilityPolicy::Volatile) { - throw std::invalid_argument( - "intraprocess communication allowed only with volatile durability"); + if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + buffer_ = rclcpp::experimental::create_intra_process_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>( + rclcpp::detail::resolve_intra_process_buffer_type(options_.intra_process_buffer_type), + qos, + std::make_shared(ros_message_type_allocator_)); } - uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); + uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_); this->setup_intra_process( intra_process_publisher_id, ipm); @@ -242,8 +270,51 @@ class Publisher : public PublisherBase if (inter_process_publish_needed) { auto shared_msg = this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); + if (buffer_) { + buffer_->add_shared(shared_msg); + } this->do_inter_process_publish(*shared_msg); } else { + if (buffer_) { + auto shared_msg = + this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); + buffer_->add_shared(shared_msg); + } else { + this->do_intra_process_ros_message_publish(std::move(msg)); + } + } + } + + /// Publish a serialized message on the topic. + /** + * This signature is enabled if the element_type of the std::unique_ptr is + * a rclcpp::SerializedMessage, as opposed to the custom_type of a TypeAdapter, + * and that type matches the type given when creating the publisher. + * + * This signature allows the user to give ownership of the message to rclcpp, + * allowing for more efficient intra-process communication optimizations. + * + * \param[in] msg A unique pointer to the message to send. + */ + template + typename std::enable_if_t< + std::is_same::value && + std::is_same::value + > + publish(std::unique_ptr msg) + { + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + auto status = rcl_publish_serialized_message( + publisher_handle_.get(), &msg->get_rcl_serialized_message(), nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } + } + + if (intra_process_is_enabled_) { this->do_intra_process_ros_message_publish(std::move(msg)); } } @@ -309,14 +380,22 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - ROSMessageType ros_msg; + auto ros_msg_ptr = std::make_shared(); // TODO(clalancette): This is unnecessarily doing an additional conversion // that may have already been done in do_intra_process_publish_and_return_shared(). // We should just reuse that effort. - rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); this->do_intra_process_publish(std::move(msg)); - this->do_inter_process_publish(ros_msg); + this->do_inter_process_publish(*ros_msg_ptr); + if (buffer_) { + buffer_->add_shared(ros_msg_ptr); + } } else { + if (buffer_) { + auto ros_msg_ptr = std::make_shared(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); + buffer_->add_shared(ros_msg_ptr); + } this->do_intra_process_publish(std::move(msg)); } } @@ -580,6 +659,8 @@ class Publisher : public PublisherBase PublishedTypeDeleter published_type_deleter_; ROSMessageTypeAllocator ros_message_type_allocator_; ROSMessageTypeDeleter ros_message_type_deleter_; + + BufferSharedPtr buffer_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index d9181bea43..7137dfb364 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -139,6 +139,12 @@ class PublisherBase : public std::enable_shared_from_this size_t get_intra_process_subscription_count() const; + /// Get if durability is transient local + /** \return If durability is transient local*/ + RCLCPP_PUBLIC + bool + is_durability_transient_local() const; + /// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC). /** * If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 87def3cc17..f9ca967e4d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -60,6 +60,35 @@ struct PublisherFactory const PublisherFactoryFunction create_typed_publisher; }; +/// Return a PublisherFactory with functions setup for creating a PublisherT. +template +PublisherFactory +create_publisher_factory( + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) +{ + PublisherFactory factory { + // factory function that creates a specific PublisherT + [options, type_support]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> std::shared_ptr + { + auto publisher = std::make_shared( + node_base, topic_name, qos, options, type_support); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + publisher->post_init_setup(node_base, topic_name, qos, options); + return publisher; + } + }; + + // return the factory now that it is populated + return factory; +} + /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp index 3501834dd1..01fd314f49 100644 --- a/rclcpp/include/rclcpp/publisher_options.hpp +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -24,6 +24,7 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/detail/rmw_implementation_specific_publisher_payload.hpp" +#include "rclcpp/intra_process_buffer_type.hpp" #include "rclcpp/intra_process_setting.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/event_handler.hpp" @@ -40,6 +41,9 @@ struct PublisherOptionsBase /// Setting to explicitly set intraprocess communications. IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; + /// Setting the data-type stored in the intraprocess buffer + IntraProcessBufferType intra_process_buffer_type = IntraProcessBufferType::SharedPtr; + /// Callbacks for various events related to publishers. PublisherEventCallbacks event_callbacks; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index eb1a980dd3..8330cec00c 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -153,6 +153,14 @@ class Subscription : public SubscriptionBase if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; + if (callback.is_serialized_message_callback() && + !serialization_traits::is_serialized_message_class::value) + { + throw std::invalid_argument( + "intraprocess communication for serialized callback " + "allowed only with rclcpp::SerializedMessage subscription type"); + } + // Check if the QoS is compatible with intra-process. auto qos_profile = get_actual_qos(); if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { @@ -163,10 +171,6 @@ class Subscription : public SubscriptionBase throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } - if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { - throw std::invalid_argument( - "intraprocess communication allowed only with volatile durability"); - } using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, @@ -193,7 +197,8 @@ class Subscription : public SubscriptionBase // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; auto ipm = context->get_sub_context(); - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_); + uint64_t intra_process_subscription_id = ipm->template add_subscription< + ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_); this->setup_intra_process(intra_process_subscription_id, ipm); } @@ -326,6 +331,11 @@ class Subscription : public SubscriptionBase const rclcpp::MessageInfo & message_info) override { // TODO(wjwwood): enable topic statistics for serialized messages + if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { + // In this case, the message will be delivered via intra process and + // we should ignore this copy of the message. + return; + } any_callback_.dispatch(serialized_message, message_info); } diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a1727eab5a..82bd47fcd1 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -125,6 +125,65 @@ create_subscription_factory( // return the factory now that it is populated return factory; } +/// Return a SubscriptionFactory setup to create a SubscriptionT. +template< + typename MessageT = void, + typename CallbackT, + typename AllocatorT, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> +SubscriptionFactory +create_subscription_factory( + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options, + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + const rosidl_message_type_support_t & type_support, + std::shared_ptr> + subscription_topic_stats = nullptr) +{ + auto allocator = options.get_allocator(); + + using rclcpp::AnySubscriptionCallback; + AnySubscriptionCallback any_subscription_callback(*allocator); + any_subscription_callback.set(std::forward(callback)); + + SubscriptionFactory factory { + // factory function that creates a MessageT specific SubscriptionT + [options, msg_mem_strat, any_subscription_callback, type_support, subscription_topic_stats]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> rclcpp::SubscriptionBase::SharedPtr + { + using rclcpp::Subscription; + using rclcpp::SubscriptionBase; + + auto sub = Subscription::make_shared( + node_base, + type_support, + topic_name, + qos, + any_subscription_callback, + options, + msg_mem_strat, + subscription_topic_stats); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + sub->post_init_setup(node_base, qos, options); + auto sub_base_ptr = std::dynamic_pointer_cast(sub); + return sub_base_ptr; + } + }; + + // return the factory now that it is populated + return factory; +} } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp b/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp new file mode 100644 index 0000000000..1ca9892ac4 --- /dev/null +++ b/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp @@ -0,0 +1,37 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace rclcpp +{ + +namespace detail +{ +rclcpp::IntraProcessBufferType +resolve_intra_process_buffer_type( + const rclcpp::IntraProcessBufferType buffer_type) +{ + if (buffer_type == IntraProcessBufferType::CallbackDefault) { + throw std::invalid_argument( + "IntraProcessBufferType::CallbackDefault is not allowed " + "when there is no callback function"); + } + + return buffer_type; +} + +} // namespace detail + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/generic_subscription.cpp b/rclcpp/src/rclcpp/generic_subscription.cpp deleted file mode 100644 index e6e61add24..0000000000 --- a/rclcpp/src/rclcpp/generic_subscription.cpp +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2018, Bosch Software Innovations GmbH. -// Copyright 2021, Apex.AI Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/generic_subscription.hpp" - -#include -#include - -#include "rcl/subscription.h" - -#include "rclcpp/exceptions.hpp" - -namespace rclcpp -{ - -std::shared_ptr -GenericSubscription::create_message() -{ - return create_serialized_message(); -} - -std::shared_ptr -GenericSubscription::create_serialized_message() -{ - return std::make_shared(0); -} - -void -GenericSubscription::handle_message( - std::shared_ptr &, - const rclcpp::MessageInfo &) -{ - throw rclcpp::exceptions::UnimplementedError( - "handle_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::handle_serialized_message( - const std::shared_ptr & message, - const rclcpp::MessageInfo &) -{ - callback_(message); -} - -void -GenericSubscription::handle_loaned_message( - void * message, const rclcpp::MessageInfo & message_info) -{ - (void) message; - (void) message_info; - throw rclcpp::exceptions::UnimplementedError( - "handle_loaned_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::return_message(std::shared_ptr & message) -{ - auto typed_message = std::static_pointer_cast(message); - return_serialized_message(typed_message); -} - -void -GenericSubscription::return_serialized_message( - std::shared_ptr & message) -{ - message.reset(); -} - - -// DYNAMIC TYPE ==================================================================================== -// TODO(methylDragon): Reorder later -rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr -GenericSubscription::get_shared_dynamic_message_type() -{ - throw rclcpp::exceptions::UnimplementedError( - "get_shared_dynamic_message_type is not implemented for GenericSubscription"); -} - -rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr -GenericSubscription::get_shared_dynamic_message() -{ - throw rclcpp::exceptions::UnimplementedError( - "get_shared_dynamic_message is not implemented for GenericSubscription"); -} - -rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr -GenericSubscription::get_shared_dynamic_serialization_support() -{ - throw rclcpp::exceptions::UnimplementedError( - "get_shared_dynamic_serialization_support is not implemented for GenericSubscription"); -} - -rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr -GenericSubscription::create_dynamic_message() -{ - throw rclcpp::exceptions::UnimplementedError( - "create_dynamic_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::return_dynamic_message( - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) -{ - (void) message; - throw rclcpp::exceptions::UnimplementedError( - "return_dynamic_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::handle_dynamic_message( - const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message, - const rclcpp::MessageInfo & message_info) -{ - (void) message; - (void) message_info; - throw rclcpp::exceptions::UnimplementedError( - "handle_dynamic_message is not implemented for GenericSubscription"); -} - -} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index efce4afeaf..c00ce79297 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -32,13 +32,24 @@ IntraProcessManager::~IntraProcessManager() {} uint64_t -IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer) { std::unique_lock lock(mutex_); uint64_t pub_id = IntraProcessManager::get_next_unique_id(); publishers_[pub_id] = publisher; + if (publisher->is_durability_transient_local()) { + if (buffer) { + publisher_buffers_[pub_id] = buffer; + } else { + throw std::runtime_error( + "transient_local publisher needs to pass" + "a valid publisher buffer ptr when calling add_publisher()"); + } + } // Initialize the subscriptions storage for this publisher. pub_to_subs_[pub_id] = SplittedSubscriptions(); @@ -51,37 +62,15 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) } if (can_communicate(publisher, subscription)) { uint64_t sub_id = pair.first; - insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + insert_sub_id_for_pub( + sub_id, pub_id, + subscription->use_take_shared_method(), subscription->is_serialized()); } } return pub_id; } -uint64_t -IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) -{ - std::unique_lock lock(mutex_); - - uint64_t sub_id = IntraProcessManager::get_next_unique_id(); - - subscriptions_[sub_id] = subscription; - - // adds the subscription id to all the matchable publishers - for (auto & pair : publishers_) { - auto publisher = pair.second.lock(); - if (!publisher) { - continue; - } - if (can_communicate(publisher, subscription)) { - uint64_t pub_id = pair.first; - insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); - } - } - - return sub_id; -} - void IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) { @@ -90,19 +79,14 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) subscriptions_.erase(intra_process_subscription_id); for (auto & pair : pub_to_subs_) { - pair.second.take_shared_subscriptions.erase( - std::remove( - pair.second.take_shared_subscriptions.begin(), - pair.second.take_shared_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_shared_subscriptions.end()); - - pair.second.take_ownership_subscriptions.erase( - std::remove( - pair.second.take_ownership_subscriptions.begin(), - pair.second.take_ownership_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_ownership_subscriptions.end()); + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + pair.second.take_subscriptions[i].erase( + std::remove( + pair.second.take_subscriptions[i].begin(), + pair.second.take_subscriptions[i].end(), + intra_process_subscription_id), + pair.second.take_subscriptions[i].end()); + } } } @@ -112,6 +96,7 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) std::unique_lock lock(mutex_); publishers_.erase(intra_process_publisher_id); + publisher_buffers_.erase(intra_process_publisher_id); pub_to_subs_.erase(intra_process_publisher_id); } @@ -146,9 +131,10 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) return 0; } - auto count = - publisher_it->second.take_shared_subscriptions.size() + - publisher_it->second.take_ownership_subscriptions.size(); + size_t count = 0u; + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + count += publisher_it->second.take_subscriptions[i].size(); + } return count; } @@ -198,13 +184,12 @@ void IntraProcessManager::insert_sub_id_for_pub( uint64_t sub_id, uint64_t pub_id, - bool use_take_shared_method) + bool use_take_shared_method, + bool is_serialized_subscriber) { - if (use_take_shared_method) { - pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); - } else { - pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); - } + pub_to_subs_[pub_id].take_subscriptions[SplittedSubscriptions::get_index( + use_take_shared_method, + is_serialized_subscriber)].push_back(sub_id); } bool diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 698db2d559..156c6e26e9 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -270,6 +270,13 @@ PublisherBase::get_intra_process_subscription_count() const return ipm->get_subscription_count(intra_process_publisher_id_); } +bool +PublisherBase::is_durability_transient_local() const +{ + return rcl_publisher_get_actual_qos(publisher_handle_.get())->durability == + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; +} + rclcpp::QoS PublisherBase::get_actual_qos() const { diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index b13123b65b..6a10f88dcd 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -20,7 +20,7 @@ using rclcpp::experimental::SubscriptionIntraProcessBase; void SubscriptionIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) { - detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); + rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); } const char * @@ -34,3 +34,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; } + +bool +SubscriptionIntraProcessBase::is_durability_transient_local() const +{ + return qos_profile_.durability() == rclcpp::DurabilityPolicy::TransientLocal; +} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 8c31a95415..c93509ffd0 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -178,6 +178,17 @@ if(TARGET test_intra_process_buffer) ) target_link_libraries(test_intra_process_buffer ${PROJECT_NAME}) endif() +ament_add_gtest(test_intra_process_subscriber test_intra_process_subscriber.cpp) +if(TARGET test_intra_process_subscriber) + ament_target_dependencies(test_intra_process_subscriber + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_intra_process_subscriber ${PROJECT_NAME}) +endif() ament_add_gtest(test_loaned_message test_loaned_message.cpp) ament_target_dependencies(test_loaned_message diff --git a/rclcpp/test/rclcpp/test_generic_pubsub.cpp b/rclcpp/test/rclcpp/test_generic_pubsub.cpp index f4cef0b757..3be3b414c3 100644 --- a/rclcpp/test/rclcpp/test_generic_pubsub.cpp +++ b/rclcpp/test/rclcpp/test_generic_pubsub.cpp @@ -33,6 +33,7 @@ using namespace ::testing; // NOLINT using namespace rclcpp; // NOLINT +using namespace std::chrono_literals; class RclcppGenericNodeFixture : public Test { @@ -61,21 +62,24 @@ class RclcppGenericNodeFixture : public Test publishers_.push_back(publisher); } - template + template> std::vector subscribe_raw_messages( - size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type) + size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + )) { std::vector messages; size_t counter = 0; auto subscription = node_->create_generic_subscription( topic_name, type, rclcpp::QoS(1), - [&counter, &messages, this](std::shared_ptr message) { + [&counter, &messages, this](const std::shared_ptr message) { T2 deserialized_message; rclcpp::Serialization serializer; serializer.deserialize_message(message.get(), &deserialized_message); messages.push_back(this->get_data_from_msg(deserialized_message)); counter++; - }); + }, options); while (counter < expected_recv_msg_count) { rclcpp::spin_some(node_); @@ -84,14 +88,14 @@ class RclcppGenericNodeFixture : public Test } template - rclcpp::SerializedMessage serialize_message(const T1 & data) + std::unique_ptr serialize_message(const T1 & data) { T2 message; write_message(data, message); rclcpp::Serialization ser; - SerializedMessage result; - ser.serialize_message(&message, &result); + auto result = std::make_unique(); + ser.serialize_message(&message, result.get()); return result; } @@ -171,7 +175,7 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work) ASSERT_TRUE(success); for (const auto & message : test_messages) { - publisher->publish(serialize_message(message)); + publisher->publish(*serialize_message(message)); } auto subscribed_messages = subscriber_future_.get(); @@ -209,7 +213,7 @@ TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work) for (const auto & message : test_messages) { publisher->publish_as_loaned_msg( - serialize_message(message)); + *serialize_message(message)); } auto subscribed_messages = subscriber_future_.get(); @@ -219,7 +223,7 @@ TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work) ASSERT_THROW( { publisher->publish_as_loaned_msg( - serialize_message(test_messages[0])); + *serialize_message(test_messages[0])); }, rclcpp::exceptions::RCLError); } } @@ -236,7 +240,7 @@ TEST_F(RclcppGenericNodeFixture, generic_subscription_uses_qos) auto publisher = node_->create_publisher(topic_name, qos); auto subscription = node_->create_generic_subscription( topic_name, topic_type, qos, - [](std::shared_ptr/* message */) {}); + [](std::shared_ptr/* message */) {}); auto connected = [publisher, subscription]() -> bool { return publisher->get_subscription_count() && subscription->get_publisher_count(); }; @@ -263,3 +267,111 @@ TEST_F(RclcppGenericNodeFixture, generic_publisher_uses_qos) // It normally takes < 20ms, 5s chosen as "a very long time" ASSERT_TRUE(wait_for(connected, 5s)); } + +TEST_F(RclcppGenericNodeFixture, generic_subscription_different_callbacks) +{ + // If the GenericSubscription does not use the provided QoS profile, + // its request will be incompatible with the Publisher's offer and no messages will be passed. + using namespace std::chrono_literals; + std::string topic_name = "string_topic"; + std::string topic_type = "test_msgs/msg/Strings"; + rclcpp::QoS qos = rclcpp::QoS(1); + + auto publisher = node_->create_publisher(topic_name, qos); + + // Test shared_ptr for const messages + { + auto subscription = node_->create_generic_subscription( + topic_name, topic_type, qos, + [](const std::shared_ptr/* message */) {}); + auto connected = [publisher, subscription]() -> bool { + return publisher->get_subscription_count() && subscription->get_publisher_count(); + }; + // It normally takes < 20ms, 5s chosen as "a very long time" + ASSERT_TRUE(wait_for(connected, 5s)); + } + + // Test unique_ptr + { + auto subscription = node_->create_generic_subscription( + topic_name, topic_type, qos, + [](std::unique_ptr/* message */) {}); + auto connected = [publisher, subscription]() -> bool { + return publisher->get_subscription_count() && subscription->get_publisher_count(); + }; + // It normally takes < 20ms, 5s chosen as "a very long time" + ASSERT_TRUE(wait_for(connected, 5s)); + } + + // Test message callback + { + auto subscription = node_->create_generic_subscription( + topic_name, topic_type, qos, + [](rclcpp::SerializedMessage /* message */) {}); + auto connected = [publisher, subscription]() -> bool { + return publisher->get_subscription_count() && subscription->get_publisher_count(); + }; + // It normally takes < 20ms, 5s chosen as "a very long time" + ASSERT_TRUE(wait_for(connected, 5s)); + } +} + +TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_with_intraprocess) +{ + // We currently publish more messages because they can get lost + std::vector test_messages = {"Hello World"}; + std::string topic_name = "/string_topic"; + std::string type = "test_msgs/msg/Strings"; + + // add a dummy subscriber without ipm + auto node = std::make_shared("test_string_msg_listener_node"); + auto string_msgs_sub = node->create_subscription( + topic_name, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {}); + + auto publisher_options = rclcpp::PublisherOptionsWithAllocator>(); + publisher_options.use_intra_process_comm = IntraProcessSetting::Enable; + auto ts_lib = rclcpp::get_typesupport_library( + type, "rosidl_typesupport_cpp"); + auto type_support = *rclcpp::get_typesupport_handle(type, "rosidl_typesupport_cpp", *ts_lib); + + auto publisher = + rclcpp::detail::create_publisher( + *node, topic_name, + type_support, + rclcpp::QoS(1), + publisher_options + ); + + auto subscriber_future_ = std::async( + std::launch::async, [this, topic_name, type] { + auto subscriber_options = rclcpp::SubscriptionOptionsWithAllocator>(); + subscriber_options.use_intra_process_comm = IntraProcessSetting::Enable; + return subscribe_raw_messages( + 1, topic_name, type, + subscriber_options); + }); + + // TODO(karsten1987): Port 'wait_for_sub' to rclcpp + auto allocator = node_->get_node_options().allocator(); + auto success = false; + auto ret = rcl_wait_for_subscribers( + node_->get_node_base_interface()->get_rcl_node_handle(), + &allocator, + topic_name.c_str(), + 1u, + static_cast(1e9), + &success); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(success); + + std::this_thread::sleep_for(100ms); + + for (const auto & message : test_messages) { + publisher->publish(std::move(serialize_message(message))); + std::this_thread::sleep_for(100ms); + } + + auto subscribed_messages = subscriber_future_.get(); + EXPECT_THAT(subscribed_messages, SizeIs(1)); + EXPECT_THAT(subscribed_messages[0], StrEq("Hello World")); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp index 16c457c96f..be6d1fc35e 100644 --- a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp @@ -190,6 +190,33 @@ TEST(TestIntraProcessBuffer, shared_buffer_consume) { EXPECT_EQ(1L, original_shared_msg.use_count()); EXPECT_EQ(*original_shared_msg, *popped_unique_msg); EXPECT_NE(original_message_pointer, popped_message_pointer); + + original_shared_msg = std::make_shared('c'); + original_message_pointer = reinterpret_cast(original_shared_msg.get()); + auto original_shared_msg_2 = std::make_shared('d'); + auto original_message_pointer_2 = reinterpret_cast(original_shared_msg_2.get()); + intra_process_buffer.add_shared(original_shared_msg); + intra_process_buffer.add_shared(original_shared_msg_2); + + auto shared_data_vec = intra_process_buffer.get_all_data_shared(); + EXPECT_EQ(2L, shared_data_vec.size()); + EXPECT_EQ(3L, original_shared_msg.use_count()); + EXPECT_EQ(original_shared_msg.use_count(), shared_data_vec[0].use_count()); + EXPECT_EQ(*original_shared_msg, *shared_data_vec[0]); + EXPECT_EQ(original_message_pointer, reinterpret_cast(shared_data_vec[0].get())); + EXPECT_EQ(3L, original_shared_msg_2.use_count()); + EXPECT_EQ(original_shared_msg_2.use_count(), shared_data_vec[1].use_count()); + EXPECT_EQ(*original_shared_msg_2, *shared_data_vec[1]); + EXPECT_EQ(original_message_pointer_2, reinterpret_cast(shared_data_vec[1].get())); + + auto unique_data_vec = intra_process_buffer.get_all_data_unique(); + EXPECT_EQ(2L, unique_data_vec.size()); + EXPECT_EQ(3L, original_shared_msg.use_count()); + EXPECT_EQ(*original_shared_msg, *unique_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(unique_data_vec[0].get())); + EXPECT_EQ(3L, original_shared_msg_2.use_count()); + EXPECT_EQ(*original_shared_msg_2, *unique_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } /* @@ -237,4 +264,31 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) { EXPECT_EQ(original_value, *popped_unique_msg); EXPECT_EQ(original_message_pointer, popped_message_pointer); + + original_unique_msg = std::make_unique('c'); + original_message_pointer = reinterpret_cast(original_unique_msg.get()); + original_value = *original_unique_msg; + auto original_unique_msg_2 = std::make_unique('d'); + auto original_message_pointer_2 = reinterpret_cast(original_unique_msg.get()); + auto original_value_2 = *original_unique_msg_2; + intra_process_buffer.add_unique(std::move(original_unique_msg)); + intra_process_buffer.add_unique(std::move(original_unique_msg_2)); + + auto shared_data_vec = intra_process_buffer.get_all_data_shared(); + EXPECT_EQ(2L, shared_data_vec.size()); + EXPECT_EQ(1L, shared_data_vec[0].use_count()); + EXPECT_EQ(original_value, *shared_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(shared_data_vec[0].get())); + EXPECT_EQ(1L, shared_data_vec[1].use_count()); + EXPECT_EQ(original_value_2, *shared_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(shared_data_vec[1].get())); + + auto unique_data_vec = intra_process_buffer.get_all_data_unique(); + EXPECT_EQ(2L, unique_data_vec.size()); + EXPECT_EQ(1L, shared_data_vec[0].use_count()); + EXPECT_EQ(original_value, *unique_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(unique_data_vec[0].get())); + EXPECT_EQ(1L, shared_data_vec[1].use_count()); + EXPECT_EQ(original_value_2, *unique_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 45d916b004..f2f72501e4 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -24,6 +24,7 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/context.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/qos.hpp" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -31,6 +32,111 @@ // NOLINTNEXTLINE(build/include_order) #include +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ +namespace mock +{ + +class IntraProcessBufferBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBufferBase) + + virtual ~IntraProcessBufferBase() {} +}; + +} // namespace mock +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ +namespace mock +{ +template< + typename MessageT, + typename Alloc = std::allocator, + typename MessageDeleter = std::default_delete> +class IntraProcessBuffer : public IntraProcessBufferBase +{ +public: + using ConstMessageSharedPtr = std::shared_ptr; + using MessageUniquePtr = std::unique_ptr; + + RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessBuffer) + + IntraProcessBuffer() + {} + + void add(ConstMessageSharedPtr msg) + { + message_ptr = reinterpret_cast(msg.get()); + shared_msg = msg; + ++num_msgs; + } + + void add(MessageUniquePtr msg) + { + message_ptr = reinterpret_cast(msg.get()); + unique_msg = std::move(msg); + ++num_msgs; + } + + void pop(std::uintptr_t & msg_ptr) + { + msg_ptr = message_ptr; + message_ptr = 0; + --num_msgs; + } + + size_t size() const + { + return num_msgs; + } + + std::vector get_all_data_shared() + { + if (shared_msg) { + return {shared_msg}; + } else if (unique_msg) { + return {std::make_shared(*unique_msg)}; + } + return {}; + } + + std::vector get_all_data_unique() + { + std::vector result; + if (shared_msg) { + result.push_back(std::make_unique(*shared_msg)); + } else if (unique_msg) { + result.push_back(std::make_unique(*unique_msg)); + } + return result; + } + + // need to store the messages somewhere otherwise the memory address will be reused + ConstMessageSharedPtr shared_msg; + MessageUniquePtr unique_msg; + + std::uintptr_t message_ptr; + // count add and pop + size_t num_msgs = 0u; +}; + +} // namespace mock +} // namespace buffers +} // namespace experimental +} // namespace rclcpp namespace rclcpp { // forward declaration @@ -80,6 +186,12 @@ class PublisherBase return qos_profile; } + bool + is_durability_transient_local() const + { + return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; + } + bool operator==(const rmw_gid_t & gid) const { @@ -117,6 +229,9 @@ class Publisher : public PublisherBase { auto allocator = std::make_shared(); message_allocator_ = std::make_shared(*allocator.get()); + if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + buffer = std::make_shared>(); + } } // The following functions use the IntraProcessManager @@ -124,65 +239,12 @@ class Publisher : public PublisherBase void publish(MessageUniquePtr msg); std::shared_ptr message_allocator_; + typename rclcpp::experimental::buffers::mock::IntraProcessBuffer::SharedPtr buffer{nullptr}; }; } // namespace mock } // namespace rclcpp -namespace rclcpp -{ -namespace experimental -{ -namespace buffers -{ -namespace mock -{ -template< - typename MessageT, - typename Alloc = std::allocator, - typename MessageDeleter = std::default_delete> -class IntraProcessBuffer -{ -public: - using ConstMessageSharedPtr = std::shared_ptr; - using MessageUniquePtr = std::unique_ptr; - - RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessBuffer) - - IntraProcessBuffer() - {} - - void add(ConstMessageSharedPtr msg) - { - message_ptr = reinterpret_cast(msg.get()); - shared_msg = msg; - } - - void add(MessageUniquePtr msg) - { - message_ptr = reinterpret_cast(msg.get()); - unique_msg = std::move(msg); - } - - void pop(std::uintptr_t & msg_ptr) - { - msg_ptr = message_ptr; - message_ptr = 0; - } - - // need to store the messages somewhere otherwise the memory address will be reused - ConstMessageSharedPtr shared_msg; - MessageUniquePtr unique_msg; - - std::uintptr_t message_ptr; -}; - -} // namespace mock -} // namespace buffers -} // namespace experimental -} // namespace rclcpp - - namespace rclcpp { namespace experimental @@ -221,6 +283,29 @@ class SubscriptionIntraProcessBase return topic_name.c_str(); } + bool + is_durability_transient_local() const + { + return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; + } + + is_serialized() const + { + return false; + } + + bool + serve_serialized_message( + const rclcpp::SerializedMessage * /*serialized_message*/, + IntraProcessManager * /*intraprocess_manager*/, + uint64_t /*intra_process_publisher_id*/, + void * /*untyped_allocator*/, + const std::vector & /*take_ownership_subscriptions*/, + const std::vector & /*take_shared_subscriptions*/) + { + return true; + } + rclcpp::QoS qos_profile; std::string topic_name; }; @@ -311,12 +396,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< // Prevent the header files of the mocked classes to be included #define RCLCPP__PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase +#define IntraProcessBufferBase mock::IntraProcessBufferBase #define IntraProcessBuffer mock::IntraProcessBuffer #define SubscriptionIntraProcessBase mock::SubscriptionIntraProcessBase #define SubscriptionIntraProcessBuffer mock::SubscriptionIntraProcessBuffer @@ -348,28 +435,36 @@ void Publisher::publish(MessageUniquePtr msg) throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( - intra_process_publisher_id_, - std::move(msg), - *message_allocator_); + if (buffer) { + auto shared_msg = ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id_, + std::move(msg), + *message_allocator_); + buffer->add(shared_msg); + } else { + ipm->template do_intra_process_publish( + intra_process_publisher_id_, + std::move(msg), + *message_allocator_); + } } } // namespace mock } // namespace rclcpp /* - This tests how the class connects and disconnects publishers and subscriptions: - - Creates 2 publishers on different topics and a subscription to one of them. - Add everything to the intra-process manager. - - All the entities are expected to have different ids. - - Check the subscriptions count for each publisher. - - One of the publishers is expected to have 1 subscription, while the other 0. - - Check the subscription count for a non existing publisher id, should return 0. - - Add a new publisher and a new subscription both with reliable QoS. - - The subscriptions count of the previous publisher is expected to remain unchanged, - while the new publisher is expected to have 2 subscriptions (it's compatible with both QoS). - - Remove the just added subscriptions. - - The count for the last publisher is expected to decrease to 1. + * This tests how the class connects and disconnects publishers and subscriptions: + * - Creates 2 publishers on different topics and a subscription to one of them. + * Add everything to the intra-process manager. + * - All the entities are expected to have different ids. + * - Check the subscriptions count for each publisher. + * - One of the publishers is expected to have 1 subscription, while the other 0. + * - Check the subscription count for a non existing publisher id, should return 0. + * - Add a new publisher and a new subscription both with reliable QoS. + * - The subscriptions count of the previous publisher is expected to remain unchanged, + * while the new publisher is expected to have 2 subscriptions (it's compatible with both QoS). + * - Remove the just added subscriptions. + * - The count for the last publisher is expected to decrease to 1. */ TEST(TestIntraProcessManager, add_pub_sub) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -388,7 +483,7 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto p1_id = ipm->add_publisher(p1); auto p2_id = ipm->add_publisher(p2); - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); bool unique_ids = p1_id != p2_id && p2_id != s1_id; ASSERT_TRUE(unique_ids); @@ -404,7 +499,7 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto s2 = std::make_shared(rclcpp::QoS(10).reliable()); - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto p3_id = ipm->add_publisher(p3); p1_subs = ipm->get_subscription_count(p1_id); @@ -424,14 +519,14 @@ TEST(TestIntraProcessManager, add_pub_sub) { } /* - This tests the minimal usage of the class where there is a single subscription per publisher: - - Publishes a unique_ptr message with a subscription requesting ownership. - - The received message is expected to be the same. - - Remove the first subscription from ipm and add a new one. - - Publishes a unique_ptr message with a subscription not requesting ownership. - - The received message is expected to be the same, the first subscription do not receive it. - - Publishes a shared_ptr message with a subscription not requesting ownership. - - The received message is expected to be the same. + * This tests the minimal usage of the class where there is a single subscription per publisher: + * - Publishes a unique_ptr message with a subscription requesting ownership. + * - The received message is expected to be the same. + * - Remove the first subscription from ipm and add a new one. + * - Publishes a unique_ptr message with a subscription not requesting ownership. + * - The received message is expected to be the same, the first subscription do not receive it. + * - Publishes a shared_ptr message with a subscription not requesting ownership. + * - The received message is expected to be the same. */ TEST(TestIntraProcessManager, single_subscription) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -447,7 +542,7 @@ TEST(TestIntraProcessManager, single_subscription) { auto s1 = std::make_shared(); s1->take_shared_method = false; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -458,7 +553,7 @@ TEST(TestIntraProcessManager, single_subscription) { ipm->remove_subscription(s1_id); auto s2 = std::make_shared(); s2->take_shared_method = true; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); (void)s2_id; unique_msg = std::make_unique(); @@ -477,15 +572,15 @@ TEST(TestIntraProcessManager, single_subscription) { } /* - This tests the usage of the class where there are multiple subscriptions of the same type: - - Publishes a unique_ptr message with 2 subscriptions requesting ownership. - - One is expected to receive the published message, while the other will receive a copy. - - Publishes a unique_ptr message with 2 subscriptions not requesting ownership. - - Both received messages are expected to be the same as the published one. - - Publishes a shared_ptr message with 2 subscriptions requesting ownership. - - Both received messages are expected to be a copy of the published one. - - Publishes a shared_ptr message with 2 subscriptions not requesting ownership. - - Both received messages are expected to be the same as the published one. + * This tests the usage of the class where there are multiple subscriptions of the same type: + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership. + * - One is expected to receive the published message, while the other will receive a copy. + * - Publishes a unique_ptr message with 2 subscriptions not requesting ownership. + * - Both received messages are expected to be the same as the published one. + * - Publishes a shared_ptr message with 2 subscriptions requesting ownership. + * - Both received messages are expected to be a copy of the published one. + * - Publishes a shared_ptr message with 2 subscriptions not requesting ownership. + * - Both received messages are expected to be the same as the published one. */ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -501,11 +596,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s1 = std::make_shared(); s1->take_shared_method = false; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto s2 = std::make_shared(); s2->take_shared_method = false; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -521,11 +616,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s3 = std::make_shared(); s3->take_shared_method = true; - auto s3_id = ipm->add_subscription(s3); + auto s3_id = ipm->template add_subscription(s3); auto s4 = std::make_shared(); s4->take_shared_method = true; - auto s4_id = ipm->add_subscription(s4); + auto s4_id = ipm->template add_subscription(s4); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -540,11 +635,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s5 = std::make_shared(); s5->take_shared_method = false; - auto s5_id = ipm->add_subscription(s5); + auto s5_id = ipm->template add_subscription(s5); auto s6 = std::make_shared(); s6->take_shared_method = false; - auto s6_id = ipm->add_subscription(s6); + auto s6_id = ipm->template add_subscription(s6); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -560,12 +655,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s7 = std::make_shared(); s7->take_shared_method = true; - auto s7_id = ipm->add_subscription(s7); + auto s7_id = ipm->template add_subscription(s7); (void)s7_id; auto s8 = std::make_shared(); s8->take_shared_method = true; - auto s8_id = ipm->add_subscription(s8); + auto s8_id = ipm->template add_subscription(s8); (void)s8_id; unique_msg = std::make_unique(); @@ -578,20 +673,20 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { } /* - This tests the usage of the class where there are multiple subscriptions of different types: - - Publishes a unique_ptr message with 1 subscription requesting ownership and 1 not. - - The one requesting ownership is expected to receive the published message, - while the other is expected to receive a copy. - - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 1 not. - - One of the subscriptions requesting ownership is expected to receive the published message, - while both other subscriptions are expected to receive different copies. - - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 2 not. - - The 2 subscriptions not requesting ownership are expected to both receive the same copy - of the message, one of the subscription requesting ownership is expected to receive a - different copy, while the last is expected to receive the published message. - - Publishes a shared_ptr message with 1 subscription requesting ownership and 1 not. - - The subscription requesting ownership is expected to receive a copy of the message, while - the other is expected to receive the published message + * This tests the usage of the class where there are multiple subscriptions of different types: + * - Publishes a unique_ptr message with 1 subscription requesting ownership and 1 not. + * - The one requesting ownership is expected to receive the published message, + * while the other is expected to receive a copy. + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 1 not. + * - One of the subscriptions requesting ownership is expected to receive the published message, + * while both other subscriptions are expected to receive different copies. + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 2 not. + * - The 2 subscriptions not requesting ownership are expected to both receive the same copy + * of the message, one of the subscription requesting ownership is expected to receive a + * different copy, while the last is expected to receive the published message. + * - Publishes a shared_ptr message with 1 subscription requesting ownership and 1 not. + * - The subscription requesting ownership is expected to receive a copy of the message, while + * the other is expected to receive the published message */ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -607,11 +702,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s1 = std::make_shared(); s1->take_shared_method = true; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto s2 = std::make_shared(); s2->take_shared_method = false; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -626,15 +721,15 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s3 = std::make_shared(); s3->take_shared_method = false; - auto s3_id = ipm->add_subscription(s3); + auto s3_id = ipm->template add_subscription(s3); auto s4 = std::make_shared(); s4->take_shared_method = false; - auto s4_id = ipm->add_subscription(s4); + auto s4_id = ipm->template add_subscription(s4); auto s5 = std::make_shared(); s5->take_shared_method = true; - auto s5_id = ipm->add_subscription(s5); + auto s5_id = ipm->template add_subscription(s5); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -658,19 +753,19 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s6 = std::make_shared(); s6->take_shared_method = true; - auto s6_id = ipm->add_subscription(s6); + auto s6_id = ipm->template add_subscription(s6); auto s7 = std::make_shared(); s7->take_shared_method = true; - auto s7_id = ipm->add_subscription(s7); + auto s7_id = ipm->template add_subscription(s7); auto s8 = std::make_shared(); s8->take_shared_method = false; - auto s8_id = ipm->add_subscription(s8); + auto s8_id = ipm->template add_subscription(s8); auto s9 = std::make_shared(); s9->take_shared_method = false; - auto s9_id = ipm->add_subscription(s9); + auto s9_id = ipm->template add_subscription(s9); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -696,12 +791,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s10 = std::make_shared(); s10->take_shared_method = false; - auto s10_id = ipm->add_subscription(s10); + auto s10_id = ipm->template add_subscription(s10); (void)s10_id; auto s11 = std::make_shared(); s11->take_shared_method = true; - auto s11_id = ipm->add_subscription(s11); + auto s11_id = ipm->template add_subscription(s11); (void)s11_id; unique_msg = std::make_unique(); @@ -712,3 +807,82 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { EXPECT_EQ(original_message_pointer, received_message_pointer_10); EXPECT_NE(original_message_pointer, received_message_pointer_11); } + +/* + * This tests the check inside add_publisher for transient_local + * durability publishers + * - add_publisher should throw runtime_error when no valid buffer ptr + * is passed with a transient_local publisher + */ +TEST(TestIntraProcessManager, transient_local_invalid_buffer) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + ASSERT_THROW( + { + ipm->add_publisher(p1, nullptr); + }, std::runtime_error); +} + +/* + * This tests publishing function for transient_local durability publihers + * - A message is published before three transient_local subscriptions are added to + * ipm. Two of the subscriptions use take_shared method. Delivery of the message is verified + * along with the contents and pointer addresses from the subscriptions. + */ +TEST(TestIntraProcessManager, transient_local) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess; + + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + auto s1 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s2 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s3 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + s1->take_shared_method = false; + s2->take_shared_method = true; + s3->take_shared_method = true; + + auto p1_id = ipm->add_publisher(p1, p1->buffer); + + p1->set_intra_process_manager(p1_id, ipm); + + auto unique_msg = std::make_unique(); + unique_msg->msg = "Test"; + p1->publish(std::move(unique_msg)); + + ipm->template add_subscription(s1); + ipm->template add_subscription(s2); + ipm->template add_subscription(s3); + + auto received_message_pointer_1 = s1->pop(); + auto received_message_pointer_2 = s2->pop(); + auto received_message_pointer_3 = s3->pop(); + ASSERT_NE(0u, received_message_pointer_1); + ASSERT_NE(0u, received_message_pointer_2); + ASSERT_NE(0u, received_message_pointer_3); + ASSERT_EQ(received_message_pointer_3, received_message_pointer_2); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_2)->msg); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_3)->msg); + ASSERT_EQ("Test", reinterpret_cast(received_message_pointer_1)->msg); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_subscriber.cpp b/rclcpp/test/rclcpp/test_intra_process_subscriber.cpp new file mode 100755 index 0000000000..5beb6f270a --- /dev/null +++ b/rclcpp/test/rclcpp/test_intra_process_subscriber.cpp @@ -0,0 +1,236 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include +#include + +#include "gtest/gtest.h" + +#include "rclcpp/rclcpp.hpp" +#include "test_msgs/msg/empty.hpp" + +class TestIntraProcessSubscriber : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions(); + node = std::make_shared("my_node", "/ns", node_options); + } + + void TearDown() + { + node.reset(); + rclcpp::shutdown(); + } + + rclcpp::Node::SharedPtr node; +}; + +/* + Construtctor + */ +TEST_F(TestIntraProcessSubscriber, constructor) { + using MessageT = test_msgs::msg::Empty; + using Alloc = std::allocator; + + rclcpp::AnySubscriptionCallback callback; + + auto allocator = std::make_shared(); + auto subscriber = std::make_shared>( + callback, + allocator, + rclcpp::contexts::get_global_default_context(), + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::SharedPtr); + + EXPECT_NE(nullptr, subscriber); +} + +/* + Test methods of SubscriptionIntraProcess with shared callback: + * is_serialized + * deserialize_message + * serve_serialized_message + */ +TEST_F(TestIntraProcessSubscriber, methods_shared) { + using MessageT = test_msgs::msg::Empty; + using Alloc = std::allocator; + + // create IPM subscriber + uint32_t counter = 0u; + rclcpp::AnySubscriptionCallback callback; + callback.set( + [&counter](const std::shared_ptr &) { + ++counter; + }); + + auto allocator = std::make_shared(); + auto context = rclcpp::contexts::get_global_default_context(); + auto subscriber = std::make_shared>( + callback, + allocator, + context, + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::SharedPtr); + + EXPECT_NE(nullptr, subscriber); + EXPECT_EQ(false, subscriber->is_serialized()); + + // generate a serialized message + MessageT msg; + rclcpp::SerializedMessage serialized_message; + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized_message); + + // test deserialization + auto deserialized_msg = subscriber->deserialize_message(&serialized_message); + EXPECT_EQ(msg, *deserialized_msg); + + // Get the intra process manager instance for this context. + auto ipm = context->get_sub_context(); + const auto sub_id = ipm->add_subscription(subscriber); + auto publisher = node->create_publisher("topic", 42); + const auto intra_process_publisher_id = ipm->add_publisher(publisher); + + std::vector take_ownership_subscriptions; + std::vector take_shared_subscriptions = {sub_id}; + subscriber->serve_serialized_message( + &serialized_message, + ipm.get(), + intra_process_publisher_id, + reinterpret_cast(allocator.get()), + take_ownership_subscriptions, + take_shared_subscriptions); + + // execute subscriber callback + { + auto data = subscriber->take_data(); + subscriber->execute(data); + } + + // check if message was received + EXPECT_EQ(1u, counter); + + // add a 2nd shared subscriber to cover more code + auto subscriber2 = std::make_shared>( + callback, + allocator, + context, + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::SharedPtr); + const auto sub2_id = ipm->add_subscription(subscriber2); + take_shared_subscriptions.push_back(sub2_id); + + subscriber->serve_serialized_message( + &serialized_message, + ipm.get(), + intra_process_publisher_id, + reinterpret_cast(allocator.get()), + take_ownership_subscriptions, + take_shared_subscriptions); + + // execute subscribers callback + { + auto data = subscriber->take_data(); + subscriber->execute(data); + } + + // check if message was received + EXPECT_EQ(2u, counter); + + { + auto data = subscriber2->take_data(); + subscriber2->execute(data); + } + + // check if message was received + EXPECT_EQ(3u, counter); +} + +/* + Test methods of SubscriptionIntraProcess with unique callback: + * is_serialized + * deserialize_message + * serve_serialized_message + */ +TEST_F(TestIntraProcessSubscriber, methods_unique) { + using MessageT = test_msgs::msg::Empty; + using Alloc = std::allocator; + + // create IPM subscriber + uint32_t counter = 0u; + rclcpp::AnySubscriptionCallback callback; + callback.set( + [&counter](std::unique_ptr) { + ++counter; + }); + + auto allocator = std::make_shared(); + auto context = rclcpp::contexts::get_global_default_context(); + auto subscriber = std::make_shared>( + callback, + allocator, + context, + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::UniquePtr); + + EXPECT_NE(nullptr, subscriber); + EXPECT_EQ(false, subscriber->is_serialized()); + + // generate a serialized message + MessageT msg; + rclcpp::SerializedMessage serialized_message; + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized_message); + + // test deserialization + auto deserialized_msg = subscriber->deserialize_message(&serialized_message); + EXPECT_EQ(msg, *deserialized_msg); + + // Get the intra process manager instance for this context. + auto ipm = context->get_sub_context(); + const auto sub_id = ipm->add_subscription(subscriber); + auto publisher = node->create_publisher("topic", 42); + const auto intra_process_publisher_id = ipm->add_publisher(publisher); + + std::vector take_ownership_subscriptions = {sub_id}; + std::vector take_shared_subscriptions; + subscriber->serve_serialized_message( + &serialized_message, + ipm.get(), + intra_process_publisher_id, + reinterpret_cast(allocator.get()), + take_ownership_subscriptions, + take_shared_subscriptions); + + // execute subscriber callback + { + auto data = subscriber->take_data(); + subscriber->execute(data); + } + + // check if message was received + EXPECT_EQ(1u, counter); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 20a46194fc..15d2698aed 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -196,11 +196,7 @@ static std::vector invalid_qos_profiles() { std::vector parameters; - parameters.reserve(2); - parameters.push_back( - TestParameters( - rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(), - "transient_local_qos")); + parameters.reserve(1); parameters.push_back( TestParameters( rclcpp::QoS(rclcpp::KeepAll()), @@ -312,6 +308,23 @@ TEST_F(TestPublisher, serialized_message_publish) { EXPECT_NO_THROW(publisher->publish(serialized_msg.get_rcl_serialized_message())); } +TEST_F(TestPublisher, serialized_message_intra_process_publish) { + initialize(); + rclcpp::PublisherOptionsWithAllocator> options; + options.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + auto publisher = rclcpp::detail::create_publisher( + *node, "topic", + rclcpp::get_message_type_support_handle(), 10, options); + + rclcpp::SerializedMessage serialized_msg; + RCLCPP_EXPECT_THROW_EQ( + publisher->publish(serialized_msg), + std::runtime_error("storing serialized messages in intra process is not supported yet")); + + auto serialized_unique_msg_ptr = std::make_unique(); + EXPECT_NO_THROW(publisher->publish(std::move(serialized_unique_msg_ptr))); +} + TEST_F(TestPublisher, rcl_publisher_init_error) { initialize(); auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_publisher_init, RCL_RET_ERROR); @@ -639,3 +652,88 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::QoS(1).best_effort(), rclcpp::QoS(1).best_effort()), std::pair( rclcpp::QoS(1).reliable(), rclcpp::QoS(1).best_effort()))); + +TEST_F(TestPublisher, intra_process_transient_local) { + constexpr auto history_depth = 10u; + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + rclcpp::PublisherOptionsWithAllocator> pub_options_ipm_disabled; + pub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + + rclcpp::PublisherOptionsWithAllocator> pub_options_ipm_enabled; + pub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + + auto pub_ipm_enabled_transient_local_enabled = node->create_publisher( + "topic1", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_enabled); + auto pub_ipm_disabled_transient_local_enabled = node->create_publisher( + "topic2", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_disabled); + auto pub_ipm_enabled_transient_local_disabled = node->create_publisher( + "topic3", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), pub_options_ipm_enabled); + auto pub_ipm_disabled_transient_local_disabled = node->create_publisher( + "topic4", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), pub_options_ipm_disabled); + + test_msgs::msg::Empty msg; + pub_ipm_enabled_transient_local_enabled->publish(msg); + pub_ipm_disabled_transient_local_enabled->publish(msg); + pub_ipm_enabled_transient_local_disabled->publish(msg); + pub_ipm_disabled_transient_local_disabled->publish(msg); + + auto do_nothing = [](std::shared_ptr) {}; + struct IntraProcessCallback + { + void callback_fun(size_t s) + { + (void) s; + called = true; + } + bool called = false; + }; + rclcpp::SubscriptionOptions sub_options_ipm_disabled; + sub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + rclcpp::SubscriptionOptions sub_options_ipm_enabled; + sub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + IntraProcessCallback callback1, callback2, callback3, callback4; + auto sub_ipm_enabled_transient_local_enabled = node->create_subscription( + "topic1", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), + do_nothing, sub_options_ipm_enabled); + sub_ipm_enabled_transient_local_enabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback1, std::placeholders::_1)); + auto sub_ipm_disabled_transient_local_enabled = node->create_subscription( + "topic2", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), + do_nothing, sub_options_ipm_disabled); + sub_ipm_disabled_transient_local_enabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback2, std::placeholders::_1)); + auto sub_ipm_enabled_transient_local_disabled = node->create_subscription( + "topic3", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), + do_nothing, sub_options_ipm_enabled); + sub_ipm_enabled_transient_local_disabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback3, std::placeholders::_1)); + auto sub_ipm_disabled_transient_local_disabled = node->create_subscription( + "topic4", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), + do_nothing, sub_options_ipm_disabled); + sub_ipm_disabled_transient_local_disabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback4, std::placeholders::_1)); + + EXPECT_TRUE(pub_ipm_enabled_transient_local_enabled->is_durability_transient_local()); + EXPECT_TRUE(pub_ipm_disabled_transient_local_enabled->is_durability_transient_local()); + EXPECT_FALSE(pub_ipm_enabled_transient_local_disabled->is_durability_transient_local()); + EXPECT_FALSE(pub_ipm_disabled_transient_local_disabled->is_durability_transient_local()); + + EXPECT_EQ(1, pub_ipm_enabled_transient_local_enabled->get_intra_process_subscription_count()); + EXPECT_EQ(0, pub_ipm_disabled_transient_local_enabled->get_intra_process_subscription_count()); + EXPECT_EQ(1, pub_ipm_enabled_transient_local_disabled->get_intra_process_subscription_count()); + EXPECT_EQ(0, pub_ipm_disabled_transient_local_disabled->get_intra_process_subscription_count()); + + + EXPECT_TRUE(callback1.called); + EXPECT_FALSE(callback2.called); + EXPECT_FALSE(callback3.called); + EXPECT_FALSE(callback4.called); +} diff --git a/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp b/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp index 07dfd8d584..0abd9b1a89 100644 --- a/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp +++ b/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp @@ -22,7 +22,7 @@ #include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp" /* - Construtctor + * Construtctor */ TEST(TestRingBufferImplementation, constructor) { // Cannot create a buffer of size zero. @@ -37,10 +37,11 @@ TEST(TestRingBufferImplementation, constructor) { } /* - Basic usage - - insert data and check that it has data - - extract data - - overwrite old data writing over the buffer capacity + * Basic usage + * - insert data and check that it has data + * - get all data + * - extract data + * - overwrite old data writing over the buffer capacity */ TEST(TestRingBufferImplementation, basic_usage) { rclcpp::experimental::buffers::RingBufferImplementation rb(2); @@ -64,6 +65,12 @@ TEST(TestRingBufferImplementation, basic_usage) { rb.enqueue('d'); + const auto all_data_vec = rb.get_all_data(); + + EXPECT_EQ(2u, all_data_vec.size()); + EXPECT_EQ('c', all_data_vec[0]); + EXPECT_EQ('d', all_data_vec[1]); + EXPECT_EQ(true, rb.has_data()); EXPECT_EQ(true, rb.is_full()); @@ -79,3 +86,56 @@ TEST(TestRingBufferImplementation, basic_usage) { EXPECT_EQ(false, rb.has_data()); EXPECT_EQ(false, rb.is_full()); } + +/* + * Basic usage with unique_ptr + * - insert unique_ptr data and check that it has data + * - get all data + * - extract data + * - overwrite old data writing over the buffer capacity + */ +TEST(TestRingBufferImplementation, basic_usage_unique_ptr) { + rclcpp::experimental::buffers::RingBufferImplementation> rb(2); + + auto a = std::make_unique('a'); + auto b = std::make_unique('b'); + auto original_b_pointer = reinterpret_cast(b.get()); + auto c = std::make_unique('c'); + auto original_c_pointer = reinterpret_cast(c.get()); + + rb.enqueue(std::move(a)); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); + + rb.enqueue(std::move(b)); + rb.enqueue(std::move(c)); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(true, rb.is_full()); + + const auto all_data_vec = rb.get_all_data(); + + EXPECT_EQ(2u, all_data_vec.size()); + EXPECT_EQ('b', *all_data_vec[0]); + EXPECT_EQ('c', *all_data_vec[1]); + EXPECT_NE(original_b_pointer, reinterpret_cast(all_data_vec[0].get())); + EXPECT_NE(original_c_pointer, reinterpret_cast(all_data_vec[1].get())); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(true, rb.is_full()); + + auto uni_ptr = rb.dequeue(); + + EXPECT_EQ('b', *uni_ptr); + EXPECT_EQ(original_b_pointer, reinterpret_cast(uni_ptr.get())); + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); + + uni_ptr = rb.dequeue(); + + EXPECT_EQ('c', *uni_ptr); + EXPECT_EQ(original_c_pointer, reinterpret_cast(uni_ptr.get())); + EXPECT_EQ(false, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); +} diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 6facada4dd..0142cf9cc3 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -680,3 +680,72 @@ TEST_F(TestSubscription, get_network_flow_endpoints_errors) { EXPECT_NO_THROW(subscription->get_network_flow_endpoints()); } } +/* + Testing on_new_intra_process_message callbacks. + */ +TEST_F(TestSubscription, on_new_intra_process_serialized_message_callback) { + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + + auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto sub = + node_->create_generic_subscription("~/test_take", "test_msgs/msg/Empty", 10, do_nothing); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c1_cb); + + auto pub = node_->create_publisher("~/test_take", 1); + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + auto start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c2_cb); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + sub->clear_on_new_intra_process_message_callback(); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + pub->publish(msg); + pub->publish(msg); + } + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(sub->set_on_new_intra_process_message_callback(invalid_cb), std::invalid_argument); +} diff --git a/rclcpp_components/CMakeLists.txt b/rclcpp_components/CMakeLists.txt index 121ef434c2..95842f9e1e 100644 --- a/rclcpp_components/CMakeLists.txt +++ b/rclcpp_components/CMakeLists.txt @@ -14,6 +14,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") ) endif() +find_package(backward_ros REQUIRED) find_package(ament_cmake_ros REQUIRED) find_package(ament_index_cpp REQUIRED) find_package(class_loader REQUIRED) diff --git a/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp b/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp index e2061e4da2..5dc5728779 100644 --- a/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp +++ b/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp @@ -24,12 +24,15 @@ #include #include "rclcpp_components/component_manager.hpp" +#include + +using rclcpp::experimental::executors::EventsExecutor; namespace rclcpp_components { /// ComponentManagerIsolated uses dedicated single-threaded executors for each components. -template +template class ComponentManagerIsolated : public rclcpp_components::ComponentManager { using rclcpp_components::ComponentManager::ComponentManager; diff --git a/rclcpp_components/package.xml b/rclcpp_components/package.xml index 87cf22f849..23fb219275 100644 --- a/rclcpp_components/package.xml +++ b/rclcpp_components/package.xml @@ -16,6 +16,7 @@ ament_cmake_ros + backward_ros ament_index_cpp class_loader composition_interfaces diff --git a/rclcpp_components/src/component_container_isolated.cpp b/rclcpp_components/src/component_container_isolated.cpp index 96ba8b1a03..33da6fd03c 100644 --- a/rclcpp_components/src/component_container_isolated.cpp +++ b/rclcpp_components/src/component_container_isolated.cpp @@ -20,6 +20,10 @@ #include "rclcpp/utilities.hpp" #include "rclcpp_components/component_manager_isolated.hpp" +#include + +using rclcpp::experimental::executors::EventsExecutor; + int main(int argc, char * argv[]) { /// Component container with dedicated single-threaded executors for each components. @@ -33,7 +37,7 @@ int main(int argc, char * argv[]) } } // create executor and component manager - auto exec = std::make_shared(); + auto exec = std::make_shared(); rclcpp::Node::SharedPtr node; if (use_multi_threaded_executor) { using ComponentManagerIsolated = @@ -41,7 +45,7 @@ int main(int argc, char * argv[]) node = std::make_shared(exec); } else { using ComponentManagerIsolated = - rclcpp_components::ComponentManagerIsolated; + rclcpp_components::ComponentManagerIsolated; node = std::make_shared(exec); } exec->add_node(node);