Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for services #7

Merged
merged 9 commits into from
Jan 17, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <rclcpp/subscription.hpp>
#include <rclcpp/timer.hpp>
#include <rclcpp/service.hpp>

namespace rclcpp
{
Expand Down Expand Up @@ -62,9 +63,16 @@ class CallbackGroup
timer_ptrs_.push_back(timer_ptr);
}

void
add_service(const service::ServiceBase::SharedPtr &service_ptr)
{
service_ptrs_.push_back(service_ptr);
}

CallbackGroupType type_;
std::vector<subscription::SubscriptionBase::SharedPtr> subscription_ptrs_;
std::vector<timer::TimerBase::SharedPtr> timer_ptrs_;
std::vector<service::ServiceBase::SharedPtr> service_ptrs_;
std::atomic_bool can_be_taken_from_;

};
Expand Down
70 changes: 70 additions & 0 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/* Copyright 2014 Open Source Robotics Foundation, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef RCLCPP_RCLCPP_CLIENT_HPP_
#define RCLCPP_RCLCPP_CLIENT_HPP_

#include <memory>

#include <ros_middleware_interface/functions.h>
#include <ros_middleware_interface/handles.h>

#include <rclcpp/macros.hpp>

namespace rclcpp
{

// Forward declaration for friend statement
namespace node {class Node;}

namespace client
{

template<typename ServiceT>
class Client
{
public:
RCLCPP_MAKE_SHARED_DEFINITIONS(Client);

Client(ros_middleware_interface::ClientHandle client_handle,
std::string& service_name)
: client_handle_(client_handle), service_name_(service_name)
{}

std::shared_ptr<typename ServiceT::Response>
send_request(std::shared_ptr<typename ServiceT::Request> &req, long timeout=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use std::chrono for the timeout.

{
::ros_middleware_interface::send_request(client_handle_, req.get());

std::shared_ptr<typename ServiceT::Response> res = std::make_shared<typename ServiceT::Response>();
bool received = ::ros_middleware_interface::receive_response(client_handle_, res.get(), timeout);
if(!received)
{
// TODO: use custom exception
throw std::runtime_error("Timed out while waiting for response");
}
return res;
}

private:
ros_middleware_interface::ClientHandle client_handle_;
std::string service_name_;

};

} /* namespace client */
} /* namespace rclcpp */

#endif /* RCLCPP_RCLCPP_CLIENT_HPP_ */
172 changes: 172 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class Executor
// Either the subscription or the timer will be set, but not both
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
rclcpp::node::Node::SharedPtr node;
Expand All @@ -126,6 +127,10 @@ class Executor
{
execute_subscription(any_exec->subscription);
}
if (any_exec->service)
{
execute_service(any_exec->service);
}
// Reset the callback_group, regardless of type
any_exec->callback_group->can_be_taken_from_.store(true);
// Wake the wait, because it may need to be recalculated or work that
Expand Down Expand Up @@ -161,6 +166,28 @@ class Executor
timer->callback_();
}

static void
execute_service(
rclcpp::service::ServiceBase::SharedPtr &service)
{
std::shared_ptr<void> request = service->create_request();
std::shared_ptr<void> request_header = service->create_request_header();
bool taken = ros_middleware_interface::take_request(
service->service_handle_,
request.get(),
request_header.get());
if (taken)
{
service->handle_request(request, request_header);
}
else
{
std::cout << "[rclcpp::error] take failed for service on service: "
<< service->get_service_name()
<< std::endl;
}
}

/*** Populate class storage from stored weak node pointers and wait. ***/

void
Expand All @@ -170,6 +197,7 @@ class Executor
bool has_invalid_weak_nodes = false;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
std::vector<rclcpp::timer::TimerBase::SharedPtr> timers;
std::vector<rclcpp::service::ServiceBase::SharedPtr> services;
for (auto &weak_node : weak_nodes_)
{
auto node = weak_node.lock();
Expand All @@ -193,6 +221,10 @@ class Executor
{
timers.push_back(timer);
}
for (auto &service : group->service_ptrs_)
{
services.push_back(service);
}
}
}
// Clean up any invalid nodes, if they were detected
Expand Down Expand Up @@ -224,6 +256,28 @@ class Executor
subscription->subscription_handle_.data_;
subscriber_handle_index += 1;
}

// Use the number of services to allocate memory in the handles
size_t number_of_services = services.size();
ros_middleware_interface::ServiceHandles service_handles;
service_handles.service_count_ = number_of_services;
// TODO: Avoid redundant malloc's
service_handles.services_ = static_cast<void **>(
std::malloc(sizeof(void *) * number_of_services));
if (service_handles.services_ == NULL)
{
// TODO: Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for service pointers.");
}
// Then fill the ServiceHandles with ready services
size_t service_handle_index = 0;
for (auto &service : services)
{
service_handles.services_[service_handle_index] = \
service->service_handle_.data_;
service_handle_index += 1;
}

// Use the number of guard conditions to allocate memory in the handles
// Add 2 to the number for the ctrl-c guard cond and the executor's
size_t start_of_timer_guard_conds = 2;
Expand Down Expand Up @@ -254,16 +308,19 @@ class Executor
timer->guard_condition_.data_;
guard_cond_handle_index += 1;
}

// Now wait on the waitable subscriptions and timers
ros_middleware_interface::wait(subscriber_handles,
guard_condition_handles,
service_handles,
nonblocking);
// If ctrl-c guard condition, return directly
if (guard_condition_handles.guard_conditions_[0] != 0)
{
// Make sure to free memory
// TODO: Remove theses when "Avoid redundant malloc's" todo is addressed
std::free(subscriber_handles.subscribers_);
std::free(service_handles.services_);
std::free(guard_condition_handles.guard_conditions_);
return;
}
Expand All @@ -286,9 +343,20 @@ class Executor
guard_condition_handles_.push_back(handle);
}
}
// Then the services
for (size_t i = 0; i < number_of_services; ++i)
{
void *handle = service_handles.services_[i];
if (handle)
{
service_handles_.push_back(handle);
}
}

// Make sure to free memory
// TODO: Remove theses when "Avoid redundant malloc's" todo is addressed
std::free(subscriber_handles.subscribers_);
std::free(service_handles.services_);
std::free(guard_condition_handles.guard_conditions_);
}

Expand Down Expand Up @@ -352,6 +420,35 @@ class Executor
return rclcpp::timer::TimerBase::SharedPtr();
}

rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle)
{
for (auto weak_node : weak_nodes_)
{
auto node = weak_node.lock();
if (!node)
{
continue;
}
for (auto weak_group : node->callback_groups_)
{
auto group = weak_group.lock();
if (!group)
{
continue;
}
for (auto service : group->service_ptrs_)
{
if (service->service_handle_.data_ == service_handle)
{
return service;
}
}
}
}
return rclcpp::service::ServiceBase::SharedPtr();
}

void
remove_subscriber_handle_from_subscriber_handles(void *handle)
{
Expand All @@ -364,6 +461,12 @@ class Executor
guard_condition_handles_.remove(handle);
}

void
remove_service_handle_from_service_handles(void *handle)
{
service_handles_.remove(handle);
}

rclcpp::node::Node::SharedPtr
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group)
{
Expand Down Expand Up @@ -516,6 +619,67 @@ class Executor
}
}

rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service(
rclcpp::service::ServiceBase::SharedPtr &service)
{
for (auto &weak_node : weak_nodes_)
{
auto node = weak_node.lock();
if (!node)
{
continue;
}
for (auto &weak_group : node->callback_groups_)
{
auto group = weak_group.lock();
for (auto &serv : group->service_ptrs_)
{
if (serv == service)
{
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}

void
get_next_service(std::shared_ptr<AnyExecutable> &any_exec)
{
for (auto handle : service_handles_)
{
auto service = get_service_by_handle(handle);
if (service)
{
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_service(service);
if (!group)
{
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
remove_service_handle_from_service_handles(handle);
continue;
}
if (!group->can_be_taken_from_.load())
{
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
remove_service_handle_from_service_handles(handle);
return;
}
// Else, the service is no longer valid, remove it and continue
remove_service_handle_from_service_handles(handle);
}
}

std::shared_ptr<AnyExecutable>
get_next_ready_executable()
{
Expand All @@ -532,6 +696,12 @@ class Executor
{
return any_exec;
}
// Check the services to see if there are any that are ready
get_next_service(any_exec);
if (any_exec->service)
{
return any_exec;
}
// If there is neither a ready timer nor subscription, return a null ptr
any_exec.reset();
return any_exec;
Expand Down Expand Up @@ -580,6 +750,8 @@ class Executor
SubscriberHandles subscriber_handles_;
typedef std::list<void*> GuardConditionHandles;
GuardConditionHandles guard_condition_handles_;
typedef std::list<void*> ServiceHandles;
ServiceHandles service_handles_;

};

Expand Down
Loading