Skip to content

Commit

Permalink
cleanup intra-process-manager (#1695)
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
  • Loading branch information
alsora authored Jun 23, 2021
1 parent 7d8b269 commit e9e398d
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp"
#include "rclcpp/intra_process_buffer_type.hpp"
#include "rclcpp/qos.hpp"

namespace rclcpp
{
Expand All @@ -37,13 +38,13 @@ template<
typename rclcpp::experimental::buffers::IntraProcessBuffer<MessageT, Alloc, Deleter>::UniquePtr
create_intra_process_buffer(
IntraProcessBufferType buffer_type,
rmw_qos_profile_t qos,
const rclcpp::QoS & qos,
std::shared_ptr<Alloc> allocator)
{
using MessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;

size_t buffer_size = qos.depth;
size_t buffer_size = qos.depth();

using rclcpp::experimental::buffers::IntraProcessBuffer;
typename IntraProcessBuffer<MessageT, Alloc, Deleter>::UniquePtr buffer;
Expand Down
31 changes: 7 additions & 24 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,36 +305,17 @@ class IntraProcessManager
get_subscription_intra_process(uint64_t intra_process_subscription_id);

private:
struct SubscriptionInfo
{
SubscriptionInfo() = default;

rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr subscription;
rmw_qos_profile_t qos;
const char * topic_name;
bool use_take_shared_method;
};

struct PublisherInfo
{
PublisherInfo() = default;

rclcpp::PublisherBase::WeakPtr publisher;
rmw_qos_profile_t qos;
const char * topic_name;
};

struct SplittedSubscriptions
{
std::vector<uint64_t> take_shared_subscriptions;
std::vector<uint64_t> take_ownership_subscriptions;
};

using SubscriptionMap =
std::unordered_map<uint64_t, SubscriptionInfo>;
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;

using PublisherMap =
std::unordered_map<uint64_t, PublisherInfo>;
std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;

using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;
Expand All @@ -350,7 +331,9 @@ class IntraProcessManager

RCLCPP_PUBLIC
bool
can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const;
can_communicate(
rclcpp::PublisherBase::SharedPtr pub,
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const;

template<
typename MessageT,
Expand All @@ -366,7 +349,7 @@ class IntraProcessManager
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription.lock();
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
Expand Down Expand Up @@ -404,7 +387,7 @@ class IntraProcessManager
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription.lock();
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -72,7 +73,7 @@ class SubscriptionIntraProcess
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(
allocator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "rcl/error_handling.h"

#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"

Expand All @@ -39,7 +40,9 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
SubscriptionIntraProcessBase(
const std::string & topic_name,
const rclcpp::QoS & qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
{}

Expand Down Expand Up @@ -71,7 +74,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
get_topic_name() const;

RCLCPP_PUBLIC
rmw_qos_profile_t
QoS
get_actual_qos() const;

protected:
Expand All @@ -83,7 +86,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
trigger_guard_condition() = 0;

std::string topic_name_;
rmw_qos_profile_t qos_profile_;
QoS qos_profile_;
};

} // namespace experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -64,7 +65,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBase(topic_name, qos_profile)
{
Expand Down
8 changes: 4 additions & 4 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,15 @@ class Publisher : public PublisherBase
// Get the intra process manager instance for this context.
auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
// Register the publisher with the intra process manager.
if (qos.get_rmw_qos_profile().history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
if (qos.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication is not allowed with keep all history qos policy");
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos.get_rmw_qos_profile().depth == 0) {
if (qos.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with a zero qos history depth value");
}
if (qos.get_rmw_qos_profile().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
if (qos.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ class Subscription : public SubscriptionBase
using rclcpp::detail::resolve_intra_process_buffer_type;

// Check if the QoS is compatible with intra-process.
rmw_qos_profile_t qos_profile = get_actual_qos().get_rmw_qos_profile();
if (qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
auto qos_profile = get_actual_qos();
if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication is not allowed with keep all history qos policy");
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos_profile.depth == 0) {
if (qos_profile.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with 0 depth qos policy");
}
if (qos_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}
Expand Down
60 changes: 28 additions & 32 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,50 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

auto id = IntraProcessManager::get_next_unique_id();
uint64_t pub_id = IntraProcessManager::get_next_unique_id();

publishers_[id].publisher = publisher;
publishers_[id].topic_name = publisher->get_topic_name();
publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
publishers_[pub_id] = publisher;

// Initialize the subscriptions storage for this publisher.
pub_to_subs_[id] = SplittedSubscriptions();
pub_to_subs_[pub_id] = SplittedSubscriptions();

// create an entry for the publisher id and populate with already existing subscriptions
for (auto & pair : subscriptions_) {
if (can_communicate(publishers_[id], pair.second)) {
insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method);
auto subscription = pair.second.lock();
if (!subscription) {
continue;
}
if (can_communicate(publisher, subscription)) {
uint64_t sub_id = pair.first;
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
}
}

return id;
return pub_id;
}

uint64_t
IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

auto id = IntraProcessManager::get_next_unique_id();
uint64_t sub_id = IntraProcessManager::get_next_unique_id();

subscriptions_[id].subscription = subscription;
subscriptions_[id].topic_name = subscription->get_topic_name();
subscriptions_[id].qos = subscription->get_actual_qos();
subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method();
subscriptions_[sub_id] = subscription;

// adds the subscription id to all the matchable publishers
for (auto & pair : publishers_) {
if (can_communicate(pair.second, subscriptions_[id])) {
insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method);
auto publisher = pair.second.lock();
if (!publisher) {
continue;
}
if (can_communicate(publisher, subscription)) {
uint64_t pub_id = pair.first;
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
}
}

return id;
return sub_id;
}

void
Expand Down Expand Up @@ -116,7 +121,7 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.publisher.lock();
auto publisher = publisher_pair.second.lock();
if (!publisher) {
continue;
}
Expand Down Expand Up @@ -157,7 +162,7 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc
if (subscription_it == subscriptions_.end()) {
return nullptr;
} else {
auto subscription = subscription_it->second.subscription.lock();
auto subscription = subscription_it->second.lock();
if (subscription) {
return subscription;
} else {
Expand Down Expand Up @@ -204,25 +209,16 @@ IntraProcessManager::insert_sub_id_for_pub(

bool
IntraProcessManager::can_communicate(
PublisherInfo pub_info,
SubscriptionInfo sub_info) const
rclcpp::PublisherBase::SharedPtr pub,
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const
{
// publisher and subscription must be on the same topic
if (strcmp(pub_info.topic_name, sub_info.topic_name) != 0) {
return false;
}

// TODO(alsora): the following checks for qos compatibility should be provided by the RMW
// a reliable subscription can't be connected with a best effort publisher
if (
sub_info.qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE &&
pub_info.qos.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT)
{
if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) {
return false;
}

// a publisher and a subscription with different durability can't communicate
if (sub_info.qos.durability != pub_info.qos.durability) {
auto check_result = rclcpp::qos_check_compatible(pub->get_actual_qos(), sub->get_actual_qos());
if (check_result.compatibility == rclcpp::QoSCompatibility::Error) {
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ SubscriptionIntraProcessBase::get_topic_name() const
return topic_name_.c_str();
}

rmw_qos_profile_t
rclcpp::QoS
SubscriptionIntraProcessBase::get_actual_qos() const
{
return qos_profile_;
Expand Down
Loading

0 comments on commit e9e398d

Please sign in to comment.