Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use events exec instaed of single threded one #1

Open
wants to merge 22 commits into
base: iron
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
03bd491
use events exec instaed of single threded one
jplapp Dec 26, 2023
4459c7d
disabe parameter events publishing
Jan 3, 2024
ee36bc8
add backward_ros
Jan 3, 2024
018eb9b
Declare rclcpp callbacks before the rcl entities (#2024)
mauropasse May 15, 2023
4120b79
add mutex to protect events_executor current entity collection (#2187)
alsora May 18, 2023
74f5221
Remove an unused variable from the events executor tests. (#2270)
clalancette Aug 9, 2023
a1a356c
fix(ClientGoalHandle): Made mutex recursive to prevent deadlocks (#2267)
jmachowinski Sep 7, 2023
07e90fd
Add locking to protect the TimeSource::NodeState::node_base_ (#2320)
clalancette Oct 3, 2023
9e4aaed
Revert "Add locking to protect the TimeSource::NodeState::node_base_ …
jplapp Jan 25, 2024
b0a956b
Revert "fix(ClientGoalHandle): Made mutex recursive to prevent deadlo…
jplapp Jan 25, 2024
6698620
Revert "Remove an unused variable from the events executor tests. (#2…
jplapp Jan 25, 2024
16aa7a1
Revert "add mutex to protect events_executor current entity collectio…
jplapp Jan 25, 2024
0e7c782
Revert "Declare rclcpp callbacks before the rcl entities (#2024)"
jplapp Jan 25, 2024
f6dd91d
Add transient local durability support to publisher and subscriptions…
jefferyyjhsu Jan 18, 2024
463fc84
.
HovorunB Jan 29, 2024
eb55410
.
HovorunB Jan 31, 2024
cb248c3
Merge pull request #2 from pixel-robotics/AMRNAV-6258_use_ipc
HovorunB Jan 31, 2024
0b8e250
Updated GenericSubscription to AnySubscriptionCallback (#1928)
DensoADAS Dec 19, 2023
c5cc2c1
Squashed commit of the following:
HovorunB Feb 6, 2024
8ef1ae3
.
HovorunB Feb 8, 2024
4506162
fix
HovorunB Feb 8, 2024
24be5b2
Merge pull request #3 from pixel-robotics/AMRNAV-6339
HovorunB Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down Expand Up @@ -71,7 +72,6 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/experimental/timers_manager.cpp
src/rclcpp/future_return_code.cpp
src/rclcpp/generic_publisher.cpp
src/rclcpp/generic_subscription.cpp
src/rclcpp/graph_listener.cpp
src/rclcpp/guard_condition.cpp
src/rclcpp/init_options.cpp
Expand Down
34 changes: 29 additions & 5 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/detail/subscription_callback_type_helper.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/message_info.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_adapter.hpp"

Expand Down Expand Up @@ -158,13 +159,14 @@ struct AnySubscriptionCallbackPossibleTypes
template<
typename MessageT,
typename AllocatorT,
bool is_adapted_type = rclcpp::TypeAdapter<MessageT>::is_specialized::value
bool is_adapted_type = rclcpp::TypeAdapter<MessageT>::is_specialized::value,
bool is_serialized_type = serialization_traits::is_serialized_message_class<MessageT>::value
>
struct AnySubscriptionCallbackHelper;

/// Specialization for when MessageT is not a TypeAdapter.
template<typename MessageT, typename AllocatorT>
struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, false>
struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, false, false>
{
using CallbackTypes = AnySubscriptionCallbackPossibleTypes<MessageT, AllocatorT>;

Expand Down Expand Up @@ -194,7 +196,7 @@ struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, false>

/// Specialization for when MessageT is a TypeAdapter.
template<typename MessageT, typename AllocatorT>
struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, true>
struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, true, false>
{
using CallbackTypes = AnySubscriptionCallbackPossibleTypes<MessageT, AllocatorT>;

Expand Down Expand Up @@ -232,6 +234,26 @@ struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, true>
>;
};

/// Specialization for when MessageT is a SerializedMessage to exclude duplicated declarations.
template<typename MessageT, typename AllocatorT>
struct AnySubscriptionCallbackHelper<MessageT, AllocatorT, false, true>
{
using CallbackTypes = AnySubscriptionCallbackPossibleTypes<MessageT, AllocatorT>;

using variant_type = std::variant<
typename CallbackTypes::ConstRefSerializedMessageCallback,
typename CallbackTypes::ConstRefSerializedMessageWithInfoCallback,
typename CallbackTypes::UniquePtrSerializedMessageCallback,
typename CallbackTypes::UniquePtrSerializedMessageWithInfoCallback,
typename CallbackTypes::SharedConstPtrSerializedMessageCallback,
typename CallbackTypes::SharedConstPtrSerializedMessageWithInfoCallback,
typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageCallback,
typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageWithInfoCallback,
typename CallbackTypes::SharedPtrSerializedMessageCallback,
typename CallbackTypes::SharedPtrSerializedMessageWithInfoCallback
>;
};

} // namespace detail

template<
Expand Down Expand Up @@ -487,7 +509,9 @@ class AnySubscriptionCallback
}

// Dispatch when input is a ros message and the output could be anything.
void
template<typename TMsg = ROSMessageType>
typename std::enable_if<!serialization_traits::is_serialized_message_class<TMsg>::value,
void>::type
dispatch(
std::shared_ptr<ROSMessageType> message,
const rclcpp::MessageInfo & message_info)
Expand Down Expand Up @@ -589,7 +613,7 @@ class AnySubscriptionCallback
// Dispatch when input is a serialized message and the output could be anything.
void
dispatch(
std::shared_ptr<rclcpp::SerializedMessage> serialized_message,
std::shared_ptr<const rclcpp::SerializedMessage> serialized_message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), false);
Expand Down
67 changes: 57 additions & 10 deletions rclcpp/include/rclcpp/create_generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,89 @@ namespace rclcpp
*
* \param topics_interface NodeTopicsInterface pointer used in parts of the setup.
* \param topic_name Topic name
* \param topic_type Topic type
* \param type_support_handle Topic type support
* \param qos %QoS settings
* \param callback Callback for new messages of serialized form
* \param options %Publisher options.
* Not all publisher options are currently respected, the only relevant options for this
* publisher are `event_callbacks`, `use_default_callbacks`, and `%callback_group`.
*/
template<typename AllocatorT = std::allocator<void>>
template<
typename CallbackT,
typename AllocatorT = std::allocator<void>>
std::shared_ptr<GenericSubscription> create_generic_subscription(
rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface,
const std::string & topic_name,
const std::string & topic_type,
const rosidl_message_type_support_t & type_support_handle,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>)> callback,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
)
)
{
auto ts_lib = rclcpp::get_typesupport_library(
topic_type, "rosidl_typesupport_cpp");
auto allocator = options.get_allocator();
typename GenericSubscription::MessageMemoryStrategyType::SharedPtr msg_mem_strat = (
GenericSubscription::MessageMemoryStrategyType::create_default()
);

using rclcpp::AnySubscriptionCallback;
AnySubscriptionCallback<rclcpp::SerializedMessage, AllocatorT>
any_subscription_callback(*allocator);
any_subscription_callback.set(std::forward<CallbackT>(callback));

auto subscription = std::make_shared<GenericSubscription>(
topics_interface->get_node_base_interface(),
std::move(ts_lib),
type_support_handle,
topic_name,
topic_type,
qos,
callback,
options);
any_subscription_callback,
options,
msg_mem_strat);

topics_interface->add_subscription(subscription, options.callback_group);

return subscription;
}

/// Create and return a GenericSubscription.
/**
* The returned pointer will never be empty, but this function can throw various exceptions, for
* instance when the message's package can not be found on the AMENT_PREFIX_PATH.
*
* \param topics_interface NodeTopicsInterface pointer used in parts of the setup.
* \param topic_name Topic name
* \param topic_type Topic type
* \param qos %QoS settings
* \param callback Callback for new messages of serialized form
* \param options %Publisher options.
* Not all publisher options are currently respected, the only relevant options for this
* publisher are `event_callbacks`, `use_default_callbacks`, and `%callback_group`.
*/
template<
typename CallbackT,
typename AllocatorT = std::allocator<void>>
std::shared_ptr<GenericSubscription> create_generic_subscription(
rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface,
const std::string & topic_name,
const std::string & topic_type,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
)
)
{
auto ts_lib = rclcpp::get_typesupport_library(
topic_type, "rosidl_typesupport_cpp");

return create_generic_subscription<CallbackT, AllocatorT>(
topics_interface, topic_name,
*rclcpp::get_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib),
qos, std::forward<CallbackT>(callback), options
);
}

} // namespace rclcpp

#endif // RCLCPP__CREATE_GENERIC_SUBSCRIPTION_HPP_
47 changes: 47 additions & 0 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,53 @@ namespace rclcpp

namespace detail
{
/// Create and return a publisher of the given MessageT type.
/**
* The NodeT type only needs to have a method called get_node_topics_interface()
* which returns a shared_ptr to a NodeTopicsInterface.
*/
template<
typename MessageT = SerializedMessage,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
NodeT & node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
)
)
{
// Extract the NodeTopicsInterface from the NodeT.
using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics_interface = get_node_topics_interface(node);
auto node_parameters = node.get_node_parameters_interface();
const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ?
rclcpp::detail::declare_qos_parameters(
options.qos_overriding_options, node_parameters,
node_topics_interface->resolve_topic_name(topic_name),
qos, rclcpp::detail::PublisherQosParametersTraits{}) :
qos;

// Create the publisher.
auto pub = node_topics_interface->create_publisher(
topic_name,
rclcpp::create_publisher_factory<AllocatorT, PublisherT>(
options,
type_support),
actual_qos
);

// Add the publisher to the node topics interface.
node_topics_interface->add_publisher(pub, options.callback_group);

return std::dynamic_pointer_cast<PublisherT>(pub);
}

/// Create and return a publisher of the given MessageT type.
template<
typename MessageT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ resolve_intra_process_buffer_type(
return resolved_buffer_type;
}

RCLCPP_PUBLIC
rclcpp::IntraProcessBufferType
resolve_intra_process_buffer_type(
const rclcpp::IntraProcessBufferType buffer_type);

} // namespace detail

} // namespace rclcpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_

#include <vector>

namespace rclcpp
{
namespace experimental
Expand All @@ -31,6 +33,8 @@ class BufferImplementationBase
virtual BufferT dequeue() = 0;
virtual void enqueue(BufferT request) = 0;

virtual std::vector<BufferT> get_all_data() = 0;

virtual void clear() = 0;
virtual bool has_data() const = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <vector>

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
Expand Down Expand Up @@ -65,6 +66,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase

virtual MessageSharedPtr consume_shared() = 0;
virtual MessageUniquePtr consume_unique() = 0;

virtual std::vector<MessageSharedPtr> get_all_data_shared() = 0;
virtual std::vector<MessageUniquePtr> get_all_data_unique() = 0;
};

template<
Expand Down Expand Up @@ -128,6 +132,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
return consume_unique_impl<BufferT>();
}

std::vector<MessageSharedPtr> get_all_data_shared() override
{
return get_all_data_shared_impl();
}

std::vector<MessageUniquePtr> get_all_data_unique() override
{
return get_all_data_unique_impl();
}

bool has_data() const override
{
return buffer_->has_data();
Expand Down Expand Up @@ -237,6 +251,71 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
{
return buffer_->dequeue();
}

// MessageSharedPtr to MessageSharedPtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageSharedPtr>::value,
std::vector<MessageSharedPtr>
>::type
get_all_data_shared_impl()
{
return buffer_->get_all_data();
}

// MessageUniquePtr to MessageSharedPtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageUniquePtr>::value,
std::vector<MessageSharedPtr>
>::type
get_all_data_shared_impl()
{
std::vector<MessageSharedPtr> result;
auto uni_ptr_vec = buffer_->get_all_data();
result.reserve(uni_ptr_vec.size());
for (MessageUniquePtr & uni_ptr : uni_ptr_vec) {
result.emplace_back(std::move(uni_ptr));
}
return result;
}

// MessageSharedPtr to MessageUniquePtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageSharedPtr>::value,
std::vector<MessageUniquePtr>
>::type
get_all_data_unique_impl()
{
std::vector<MessageUniquePtr> result;
auto shared_ptr_vec = buffer_->get_all_data();
result.reserve(shared_ptr_vec.size());
for (MessageSharedPtr shared_msg : shared_ptr_vec) {
MessageUniquePtr unique_msg;
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
if (deleter) {
unique_msg = MessageUniquePtr(ptr, *deleter);
} else {
unique_msg = MessageUniquePtr(ptr);
}
result.push_back(std::move(unique_msg));
}
return result;
}

// MessageUniquePtr to MessageUniquePtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageUniquePtr>::value,
std::vector<MessageUniquePtr>
>::type
get_all_data_unique_impl()
{
return buffer_->get_all_data();
}
};

} // namespace buffers
Expand Down
Loading