Skip to content

Commit

Permalink
use std::condition_variable instead of rcl_guard_condition_t
Browse files Browse the repository at this point in the history
  • Loading branch information
Soragna, Alberto committed Apr 2, 2020
1 parent ea3f97c commit 609dd6e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 35 deletions.
39 changes: 6 additions & 33 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,6 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
qos_profile,
allocator);

// Create the guard condition.
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options();

gc_ = rcl_get_zero_initialized_guard_condition();
rcl_ret_t ret = rcl_guard_condition_init(
&gc_, context->get_rcl_context().get(), guard_condition_options);

if (RCL_RET_OK != ret) {
throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
}

wait_set_ = rcl_get_zero_initialized_wait_set();
ret = rcl_wait_set_init(&wait_set_, 0, 1, 0, 0, 0, 0, context->get_rcl_context().get(), rcl_get_default_allocator());

if (RCL_RET_OK != ret) {
throw std::runtime_error("SubscriptionIntraProcess init error initializing wait set");
}

TRACEPOINT(
rclcpp_subscription_callback_added,
(const void *)this,
Expand Down Expand Up @@ -143,23 +124,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase

void consume_messages_task() override
{
rcl_ret_t ret;
do {
ret = rcl_wait_set_clear(&wait_set_);
ret = rcl_wait_set_add_guard_condition(&wait_set_, &gc_, NULL);
std::unique_lock<std::mutex> lock(reentrant_mutex_);

// Wait until guard condition is triggered
ret = rcl_wait(&wait_set_, -1);
gc_.wait(lock, [this]{return buffer_->has_data();});

// If guard condition is triggered, check buffer for data
while (is_ready(&wait_set_)) {
// Process the message
execute();
}
} while (rclcpp::ok());
// Process the message
execute();

ret = rcl_wait_set_fini(&wait_set_);
(void)ret;
} while (rclcpp::ok());
}

private:
Expand All @@ -168,7 +141,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
{
// Publisher pushed message into the buffer.
// Notify subscription context
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
gc_.notify_one();
(void)ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <rmw/rmw.h>

#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -75,8 +76,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
rcl_wait_set_t wait_set_;
std::condition_variable gc_;

private:
virtual void
Expand Down

0 comments on commit 609dd6e

Please sign in to comment.