From a46a0fe7f69cdd702ff15aaca3c2a1260a9be98f Mon Sep 17 00:00:00 2001 From: Felix Ruess Date: Thu, 20 Jul 2017 00:23:24 +0200 Subject: [PATCH] Add SteadyTimer (#1014) * add SteadyTimer based on SteadyTime (which uses the CLOCK_MONOTONIC). This timer is not influenced by time jumps of the system time, so ideal for things like periodic checks of timeout/heartbeat, etc... * fix timer_manager to really use a steady clock when needed This is a bit of a hack, since up to boost version 1.61 the time of the steady clock is always converted to system clock, which is then used for the wait... which obviously doesn't help if we explicitly want the steady clock. So as a workaround, include the backported versions of the boost condition variable if boost version is not recent enough. * add tests for SteadyTimer * [test] add -pthread to make some tests compile not sure if this is only need in my case on ROS indigo... * use SteadyTime for some timeouts * add some checks to make sure the backported boost headers are included if needed * specialize TimerManager threadFunc for SteadyTimer to avoid the typeid check and make really sure the correct boost condition wait_until implementation is used * Revert "[test] add -pthread to make some tests compile" This reverts commit f62a3f2bef03efc668501084dd87f3002766f8e7. * set minimum version for rostime * mostly spaces --- clients/roscpp/CMakeLists.txt | 3 +- .../include/boost_161_condition_variable.h | 23 + .../boost_161_pthread_condition_variable.h | 431 ++++++++++++++++++ ...boost_161_pthread_condition_variable_fwd.h | 378 +++++++++++++++ clients/roscpp/include/ros/forwards.h | 18 + .../include/ros/internal_timer_manager.h | 2 +- clients/roscpp/include/ros/node_handle.h | 80 ++++ clients/roscpp/include/ros/steady_timer.h | 127 ++++++ .../roscpp/include/ros/steady_timer_options.h | 86 ++++ clients/roscpp/include/ros/timer_manager.h | 20 +- .../include/ros/transport_publisher_link.h | 6 +- clients/roscpp/package.xml | 4 +- .../src/libros/internal_timer_manager.cpp | 13 +- clients/roscpp/src/libros/master.cpp | 4 +- clients/roscpp/src/libros/node_handle.cpp | 35 +- clients/roscpp/src/libros/steady_timer.cpp | 224 +++++++++ .../src/libros/transport_publisher_link.cpp | 8 +- test/test_roscpp/test/src/timer_callbacks.cpp | 313 +++++++++++++ 18 files changed, 1757 insertions(+), 18 deletions(-) create mode 100644 clients/roscpp/include/boost_161_condition_variable.h create mode 100644 clients/roscpp/include/boost_161_pthread_condition_variable.h create mode 100644 clients/roscpp/include/boost_161_pthread_condition_variable_fwd.h create mode 100644 clients/roscpp/include/ros/steady_timer.h create mode 100644 clients/roscpp/include/ros/steady_timer_options.h create mode 100644 clients/roscpp/src/libros/steady_timer.cpp diff --git a/clients/roscpp/CMakeLists.txt b/clients/roscpp/CMakeLists.txt index 633102919e..ad6ccda1c2 100644 --- a/clients/roscpp/CMakeLists.txt +++ b/clients/roscpp/CMakeLists.txt @@ -22,7 +22,7 @@ list(GET roscpp_VERSION_LIST 2 roscpp_VERSION_PATCH) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/ros/common.h.in ${CATKIN_DEVEL_PREFIX}/${CATKIN_GLOBAL_INCLUDE_DESTINATION}/ros/common.h) -find_package(Boost REQUIRED COMPONENTS signals filesystem system) +find_package(Boost REQUIRED COMPONENTS chrono filesystem signals system) include_directories(include ${CATKIN_DEVEL_PREFIX}/${CATKIN_GLOBAL_INCLUDE_DESTINATION}/ros ${catkin_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) @@ -118,6 +118,7 @@ add_library(roscpp src/libros/poll_set.cpp src/libros/service.cpp src/libros/this_node.cpp + src/libros/steady_timer.cpp ) add_dependencies(roscpp ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS}) diff --git a/clients/roscpp/include/boost_161_condition_variable.h b/clients/roscpp/include/boost_161_condition_variable.h new file mode 100644 index 0000000000..ee2ff46188 --- /dev/null +++ b/clients/roscpp/include/boost_161_condition_variable.h @@ -0,0 +1,23 @@ +#ifndef BOOST_THREAD_CONDITION_VARIABLE_HPP +#define BOOST_THREAD_CONDITION_VARIABLE_HPP + +// condition_variable.hpp +// +// (C) Copyright 2007 Anthony Williams +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#if defined(BOOST_THREAD_PLATFORM_WIN32) +#include +#elif defined(BOOST_THREAD_PLATFORM_PTHREAD) +//#include +#include "boost_161_pthread_condition_variable.h" +#else +#error "Boost threads unavailable on this platform" +#endif + +#endif + diff --git a/clients/roscpp/include/boost_161_pthread_condition_variable.h b/clients/roscpp/include/boost_161_pthread_condition_variable.h new file mode 100644 index 0000000000..8100447bae --- /dev/null +++ b/clients/roscpp/include/boost_161_pthread_condition_variable.h @@ -0,0 +1,431 @@ +#ifndef BOOST_THREAD_CONDITION_VARIABLE_PTHREAD_HPP +#define BOOST_THREAD_CONDITION_VARIABLE_PTHREAD_HPP +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// (C) Copyright 2007-10 Anthony Williams +// (C) Copyright 2011-2012 Vicente J. Botet Escriba + +// make sure we include our backported version first!! +// (before the system version might be included via some of the other header files) +#include "boost_161_pthread_condition_variable_fwd.h" + +#include +#include +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS +#include +#endif +//#include +#ifdef BOOST_THREAD_USES_CHRONO +#include +#include +#endif +#include + +#include + +namespace boost +{ +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + namespace this_thread + { + void BOOST_THREAD_DECL interruption_point(); + } +#endif + + namespace thread_cv_detail + { + template + struct lock_on_exit + { + MutexType* m; + + lock_on_exit(): + m(0) + {} + + void activate(MutexType& m_) + { + m_.unlock(); + m=&m_; + } + ~lock_on_exit() + { + if(m) + { + m->lock(); + } + } + }; + } + + inline void condition_variable::wait(unique_lock& m) + { +#if defined BOOST_THREAD_THROW_IF_PRECONDITION_NOT_SATISFIED + if(! m.owns_lock()) + { + boost::throw_exception(condition_error(-1, "boost::condition_variable::wait() failed precondition mutex not owned")); + } +#endif + int res=0; + { +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + thread_cv_detail::lock_on_exit > guard; + detail::interruption_checker check_for_interruption(&internal_mutex,&cond); + pthread_mutex_t* the_mutex = &internal_mutex; + guard.activate(m); +#else + pthread_mutex_t* the_mutex = m.mutex()->native_handle(); +#endif + res = pthread_cond_wait(&cond,the_mutex); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + this_thread::interruption_point(); +#endif + if(res && res != EINTR) + { + boost::throw_exception(condition_error(res, "boost::condition_variable::wait failed in pthread_cond_wait")); + } + } + + inline bool condition_variable::do_wait_until( + unique_lock& m, + struct timespec const &timeout) + { +#if defined BOOST_THREAD_THROW_IF_PRECONDITION_NOT_SATISFIED + if (!m.owns_lock()) + { + boost::throw_exception(condition_error(EPERM, "boost::condition_variable::do_wait_until() failed precondition mutex not owned")); + } +#endif + int cond_res; + { +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + thread_cv_detail::lock_on_exit > guard; + detail::interruption_checker check_for_interruption(&internal_mutex,&cond); + pthread_mutex_t* the_mutex = &internal_mutex; + guard.activate(m); +#else + pthread_mutex_t* the_mutex = m.mutex()->native_handle(); +#endif + cond_res=pthread_cond_timedwait(&cond,the_mutex,&timeout); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + this_thread::interruption_point(); +#endif + if(cond_res==ETIMEDOUT) + { + return false; + } + if(cond_res) + { + boost::throw_exception(condition_error(cond_res, "boost::condition_variable::do_wait_until failed in pthread_cond_timedwait")); + } + return true; + } + + inline void condition_variable::notify_one() BOOST_NOEXCEPT + { +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); +#endif + BOOST_VERIFY(!pthread_cond_signal(&cond)); + } + + inline void condition_variable::notify_all() BOOST_NOEXCEPT + { +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); +#endif + BOOST_VERIFY(!pthread_cond_broadcast(&cond)); + } + + class condition_variable_any + { + pthread_mutex_t internal_mutex; + pthread_cond_t cond; + + public: + BOOST_THREAD_NO_COPYABLE(condition_variable_any) + condition_variable_any() + { + int const res=pthread_mutex_init(&internal_mutex,NULL); + if(res) + { + boost::throw_exception(thread_resource_error(res, "boost::condition_variable_any::condition_variable_any() failed in pthread_mutex_init")); + } + int const res2 = detail::monotonic_pthread_cond_init(cond); + if(res2) + { + BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex)); + boost::throw_exception(thread_resource_error(res2, "boost::condition_variable_any::condition_variable_any() failed in detail::monotonic_pthread_cond_init")); + } + } + ~condition_variable_any() + { + BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex)); + BOOST_VERIFY(!pthread_cond_destroy(&cond)); + } + + template + void wait(lock_type& m) + { + int res=0; + { + thread_cv_detail::lock_on_exit guard; +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + detail::interruption_checker check_for_interruption(&internal_mutex,&cond); +#else + boost::pthread::pthread_mutex_scoped_lock check_for_interruption(&internal_mutex); +#endif + guard.activate(m); + res=pthread_cond_wait(&cond,&internal_mutex); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + this_thread::interruption_point(); +#endif + if(res) + { + boost::throw_exception(condition_error(res, "boost::condition_variable_any::wait() failed in pthread_cond_wait")); + } + } + + template + void wait(lock_type& m,predicate_type pred) + { + while(!pred()) wait(m); + } + +#if defined BOOST_THREAD_USES_DATETIME + template + bool timed_wait(lock_type& m,boost::system_time const& abs_time) + { + struct timespec const timeout=detail::to_timespec(abs_time); + return do_wait_until(m, timeout); + } + template + bool timed_wait(lock_type& m,xtime const& abs_time) + { + return timed_wait(m,system_time(abs_time)); + } + + template + bool timed_wait(lock_type& m,duration_type const& wait_duration) + { + return timed_wait(m,get_system_time()+wait_duration); + } + + template + bool timed_wait(lock_type& m,boost::system_time const& abs_time, predicate_type pred) + { + while (!pred()) + { + if(!timed_wait(m, abs_time)) + return pred(); + } + return true; + } + + template + bool timed_wait(lock_type& m,xtime const& abs_time, predicate_type pred) + { + return timed_wait(m,system_time(abs_time),pred); + } + + template + bool timed_wait(lock_type& m,duration_type const& wait_duration,predicate_type pred) + { + return timed_wait(m,get_system_time()+wait_duration,pred); + } +#endif +#ifndef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + +#ifdef BOOST_THREAD_USES_CHRONO + template + cv_status + wait_until( + lock_type& lock, + const chrono::time_point& t) + { + using namespace chrono; + typedef time_point nano_sys_tmpt; + wait_until(lock, + nano_sys_tmpt(ceil(t.time_since_epoch()))); + return system_clock::now() < t ? cv_status::no_timeout : + cv_status::timeout; + } + + template + cv_status + wait_until( + lock_type& lock, + const chrono::time_point& t) + { + using namespace chrono; + system_clock::time_point s_now = system_clock::now(); + typename Clock::time_point c_now = Clock::now(); + wait_until(lock, s_now + ceil(t - c_now)); + return Clock::now() < t ? cv_status::no_timeout : cv_status::timeout; + } + + template + cv_status + wait_for( + lock_type& lock, + const chrono::duration& d) + { + using namespace chrono; + system_clock::time_point s_now = system_clock::now(); + steady_clock::time_point c_now = steady_clock::now(); + wait_until(lock, s_now + ceil(d)); + return steady_clock::now() - c_now < d ? cv_status::no_timeout : + cv_status::timeout; + + } + + template + cv_status wait_until( + lock_type& lk, + chrono::time_point tp) + { + using namespace chrono; + nanoseconds d = tp.time_since_epoch(); + timespec ts = boost::detail::to_timespec(d); + if (do_wait_until(lk, ts)) return cv_status::no_timeout; + else return cv_status::timeout; + } +#endif +#else // defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#ifdef BOOST_THREAD_USES_CHRONO + + template + cv_status + wait_until( + lock_type& lock, + const chrono::time_point& t) + { + using namespace chrono; + typedef time_point nano_sys_tmpt; + wait_until(lock, + nano_sys_tmpt(ceil(t.time_since_epoch()))); + return steady_clock::now() < t ? cv_status::no_timeout : + cv_status::timeout; + } + + template + cv_status + wait_until( + lock_type& lock, + const chrono::time_point& t) + { + using namespace chrono; + steady_clock::time_point s_now = steady_clock::now(); + typename Clock::time_point c_now = Clock::now(); + wait_until(lock, s_now + ceil(t - c_now)); + return Clock::now() < t ? cv_status::no_timeout : cv_status::timeout; + } + + template + cv_status + wait_for( + lock_type& lock, + const chrono::duration& d) + { + using namespace chrono; + steady_clock::time_point c_now = steady_clock::now(); + wait_until(lock, c_now + ceil(d)); + return steady_clock::now() - c_now < d ? cv_status::no_timeout : + cv_status::timeout; + } + + template + inline cv_status wait_until( + lock_type& lock, + chrono::time_point tp) + { + using namespace chrono; + nanoseconds d = tp.time_since_epoch(); + timespec ts = boost::detail::to_timespec(d); + if (do_wait_until(lock, ts)) return cv_status::no_timeout; + else return cv_status::timeout; + } + +#endif +#endif // defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + +#ifdef BOOST_THREAD_USES_CHRONO + template + bool + wait_until( + lock_type& lock, + const chrono::time_point& t, + Predicate pred) + { + while (!pred()) + { + if (wait_until(lock, t) == cv_status::timeout) + return pred(); + } + return true; + } + + template + bool + wait_for( + lock_type& lock, + const chrono::duration& d, + Predicate pred) + { + return wait_until(lock, chrono::steady_clock::now() + d, boost::move(pred)); + } +#endif + + void notify_one() BOOST_NOEXCEPT + { + boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); + BOOST_VERIFY(!pthread_cond_signal(&cond)); + } + + void notify_all() BOOST_NOEXCEPT + { + boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); + BOOST_VERIFY(!pthread_cond_broadcast(&cond)); + } + private: // used by boost::thread::try_join_until + + template + bool do_wait_until( + lock_type& m, + struct timespec const &timeout) + { + int res=0; + { + thread_cv_detail::lock_on_exit guard; +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + detail::interruption_checker check_for_interruption(&internal_mutex,&cond); +#else + boost::pthread::pthread_mutex_scoped_lock check_for_interruption(&internal_mutex); +#endif + guard.activate(m); + res=pthread_cond_timedwait(&cond,&internal_mutex,&timeout); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + this_thread::interruption_point(); +#endif + if(res==ETIMEDOUT) + { + return false; + } + if(res) + { + boost::throw_exception(condition_error(res, "boost::condition_variable_any::do_wait_until() failed in pthread_cond_timedwait")); + } + return true; + } + }; + +} + +#include + +#endif diff --git a/clients/roscpp/include/boost_161_pthread_condition_variable_fwd.h b/clients/roscpp/include/boost_161_pthread_condition_variable_fwd.h new file mode 100644 index 0000000000..2e4e4c6761 --- /dev/null +++ b/clients/roscpp/include/boost_161_pthread_condition_variable_fwd.h @@ -0,0 +1,378 @@ +#ifndef BOOST_THREAD_PTHREAD_CONDITION_VARIABLE_FWD_HPP +#define BOOST_THREAD_PTHREAD_CONDITION_VARIABLE_FWD_HPP +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// (C) Copyright 2007-8 Anthony Williams +// (C) Copyright 2011-2012 Vicente J. Botet Escriba + +// define to check if this backported version was included +#define USING_BACKPORTED_BOOST_CONDITION_VARIABLE 1 + +#include +#include +#include +#include +#include +#include +#include +#include +#if defined BOOST_THREAD_USES_DATETIME +#include +#endif + +#ifdef BOOST_THREAD_USES_CHRONO +#include +#include +#endif +#include +#include + +#include + +namespace boost +{ + namespace detail { + inline int monotonic_pthread_cond_init(pthread_cond_t& cond) { + +#ifdef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + pthread_condattr_t attr; + int res = pthread_condattr_init(&attr); + if (res) + { + return res; + } + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + res=pthread_cond_init(&cond,&attr); + pthread_condattr_destroy(&attr); + return res; +#else + return pthread_cond_init(&cond,NULL); +#endif + + } + } + + class condition_variable + { + private: +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + pthread_mutex_t internal_mutex; +#endif + pthread_cond_t cond; + + public: + //private: // used by boost::thread::try_join_until + + inline bool do_wait_until( + unique_lock& lock, + struct timespec const &timeout); + + bool do_wait_for( + unique_lock& lock, + struct timespec const &timeout) + { +#if ! defined BOOST_THREAD_USEFIXES_TIMESPEC + return do_wait_until(lock, boost::detail::timespec_plus(timeout, boost::detail::timespec_now())); +#elif ! defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + //using namespace chrono; + //nanoseconds ns = chrono::system_clock::now().time_since_epoch(); + + struct timespec ts = boost::detail::timespec_now_realtime(); + //ts.tv_sec = static_cast(chrono::duration_cast(ns).count()); + //ts.tv_nsec = static_cast((ns - chrono::duration_cast(ns)).count()); + return do_wait_until(lock, boost::detail::timespec_plus(timeout, ts)); +#else + // old behavior was fine for monotonic + return do_wait_until(lock, boost::detail::timespec_plus(timeout, boost::detail::timespec_now_realtime())); +#endif + } + + public: + BOOST_THREAD_NO_COPYABLE(condition_variable) + condition_variable() + { + int res; +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + res=pthread_mutex_init(&internal_mutex,NULL); + if(res) + { + boost::throw_exception(thread_resource_error(res, "boost::condition_variable::condition_variable() constructor failed in pthread_mutex_init")); + } +#endif + res = detail::monotonic_pthread_cond_init(cond); + if (res) + { +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex)); +#endif + boost::throw_exception(thread_resource_error(res, "boost::condition_variable::condition_variable() constructor failed in detail::monotonic_pthread_cond_init")); + } + } + ~condition_variable() + { + int ret; +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + do { + ret = pthread_mutex_destroy(&internal_mutex); + } while (ret == EINTR); + BOOST_ASSERT(!ret); +#endif + do { + ret = pthread_cond_destroy(&cond); + } while (ret == EINTR); + BOOST_ASSERT(!ret); + } + + void wait(unique_lock& m); + + template + void wait(unique_lock& m,predicate_type pred) + { + while(!pred()) wait(m); + } + +#if defined BOOST_THREAD_USES_DATETIME + inline bool timed_wait( + unique_lock& m, + boost::system_time const& abs_time) + { +#if defined BOOST_THREAD_WAIT_BUG + struct timespec const timeout=detail::to_timespec(abs_time + BOOST_THREAD_WAIT_BUG); + return do_wait_until(m, timeout); +#else + struct timespec const timeout=detail::to_timespec(abs_time); + return do_wait_until(m, timeout); +#endif + } + bool timed_wait( + unique_lock& m, + xtime const& abs_time) + { + return timed_wait(m,system_time(abs_time)); + } + + template + bool timed_wait( + unique_lock& m, + duration_type const& wait_duration) + { + if (wait_duration.is_pos_infinity()) + { + wait(m); // or do_wait(m,detail::timeout::sentinel()); + return true; + } + if (wait_duration.is_special()) + { + return true; + } + return timed_wait(m,get_system_time()+wait_duration); + } + + template + bool timed_wait( + unique_lock& m, + boost::system_time const& abs_time,predicate_type pred) + { + while (!pred()) + { + if(!timed_wait(m, abs_time)) + return pred(); + } + return true; + } + + template + bool timed_wait( + unique_lock& m, + xtime const& abs_time,predicate_type pred) + { + return timed_wait(m,system_time(abs_time),pred); + } + + template + bool timed_wait( + unique_lock& m, + duration_type const& wait_duration,predicate_type pred) + { + if (wait_duration.is_pos_infinity()) + { + while (!pred()) + { + wait(m); // or do_wait(m,detail::timeout::sentinel()); + } + return true; + } + if (wait_duration.is_special()) + { + return pred(); + } + return timed_wait(m,get_system_time()+wait_duration,pred); + } +#endif + +#ifndef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + +#ifdef BOOST_THREAD_USES_CHRONO + + template + cv_status + wait_until( + unique_lock& lock, + const chrono::time_point& t) + { + using namespace chrono; + typedef time_point nano_sys_tmpt; + wait_until(lock, + nano_sys_tmpt(ceil(t.time_since_epoch()))); + return system_clock::now() < t ? cv_status::no_timeout : + cv_status::timeout; + } + + template + cv_status + wait_until( + unique_lock& lock, + const chrono::time_point& t) + { + using namespace chrono; + system_clock::time_point s_now = system_clock::now(); + typename Clock::time_point c_now = Clock::now(); + wait_until(lock, s_now + ceil(t - c_now)); + return Clock::now() < t ? cv_status::no_timeout : cv_status::timeout; + } + + + + template + cv_status + wait_for( + unique_lock& lock, + const chrono::duration& d) + { + using namespace chrono; + system_clock::time_point s_now = system_clock::now(); + steady_clock::time_point c_now = steady_clock::now(); + wait_until(lock, s_now + ceil(d)); + return steady_clock::now() - c_now < d ? cv_status::no_timeout : + cv_status::timeout; + + } + + inline cv_status wait_until( + unique_lock& lk, + chrono::time_point tp) + { + using namespace chrono; + nanoseconds d = tp.time_since_epoch(); + timespec ts = boost::detail::to_timespec(d); + if (do_wait_until(lk, ts)) return cv_status::no_timeout; + else return cv_status::timeout; + } +#endif + +#else // defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#ifdef BOOST_THREAD_USES_CHRONO + + template + cv_status + wait_until( + unique_lock& lock, + const chrono::time_point& t) + { + using namespace chrono; + typedef time_point nano_sys_tmpt; + wait_until(lock, + nano_sys_tmpt(ceil(t.time_since_epoch()))); + return steady_clock::now() < t ? cv_status::no_timeout : + cv_status::timeout; + } + + template + cv_status + wait_until( + unique_lock& lock, + const chrono::time_point& t) + { + using namespace chrono; + steady_clock::time_point s_now = steady_clock::now(); + typename Clock::time_point c_now = Clock::now(); + wait_until(lock, s_now + ceil(t - c_now)); + return Clock::now() < t ? cv_status::no_timeout : cv_status::timeout; + } + + template + cv_status + wait_for( + unique_lock& lock, + const chrono::duration& d) + { + using namespace chrono; + steady_clock::time_point c_now = steady_clock::now(); + wait_until(lock, c_now + ceil(d)); + return steady_clock::now() - c_now < d ? cv_status::no_timeout : + cv_status::timeout; + } + + inline cv_status wait_until( + unique_lock& lk, + chrono::time_point tp) + { + using namespace chrono; + nanoseconds d = tp.time_since_epoch(); + timespec ts = boost::detail::to_timespec(d); + if (do_wait_until(lk, ts)) return cv_status::no_timeout; + else return cv_status::timeout; + } +#endif + +#endif // defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + +#ifdef BOOST_THREAD_USES_CHRONO + template + bool + wait_until( + unique_lock& lock, + const chrono::time_point& t, + Predicate pred) + { + while (!pred()) + { + if (wait_until(lock, t) == cv_status::timeout) + return pred(); + } + return true; + } + + template + bool + wait_for( + unique_lock& lock, + const chrono::duration& d, + Predicate pred) + { + return wait_until(lock, chrono::steady_clock::now() + d, boost::move(pred)); + } +#endif + +#define BOOST_THREAD_DEFINES_CONDITION_VARIABLE_NATIVE_HANDLE + typedef pthread_cond_t* native_handle_type; + native_handle_type native_handle() + { + return &cond; + } + + void notify_one() BOOST_NOEXCEPT; + void notify_all() BOOST_NOEXCEPT; + + + }; + + BOOST_THREAD_DECL void notify_all_at_thread_exit(condition_variable& cond, unique_lock lk); + +} + + +#include + +#endif diff --git a/clients/roscpp/include/ros/forwards.h b/clients/roscpp/include/ros/forwards.h index 9095ce55e2..8ac7406b1b 100644 --- a/clients/roscpp/include/ros/forwards.h +++ b/clients/roscpp/include/ros/forwards.h @@ -161,6 +161,24 @@ struct WallTimerEvent }; typedef boost::function WallTimerCallback; +/** + * \brief Structure passed as a parameter to the callback invoked by a ros::SteadyTimer + */ +struct SteadyTimerEvent +{ + SteadyTime last_expected; ///< In a perfect world, this is when the last callback should have happened + SteadyTime last_real; ///< When the last callback actually happened + + SteadyTime current_expected; ///< In a perfect world, this is when the current callback should be happening + SteadyTime current_real; ///< This is when the current callback was actually called (SteadyTime::now() as of the beginning of the callback) + + struct + { + WallDuration last_duration; ///< How long the last callback ran for + } profile; +}; +typedef boost::function SteadyTimerCallback; + class ServiceManager; typedef boost::shared_ptr ServiceManagerPtr; class TopicManager; diff --git a/clients/roscpp/include/ros/internal_timer_manager.h b/clients/roscpp/include/ros/internal_timer_manager.h index 5f33dfcf97..44e23c2d56 100644 --- a/clients/roscpp/include/ros/internal_timer_manager.h +++ b/clients/roscpp/include/ros/internal_timer_manager.h @@ -36,7 +36,7 @@ namespace ros { template class TimerManager; -typedef TimerManager InternalTimerManager; +typedef TimerManager InternalTimerManager; typedef boost::shared_ptr InternalTimerManagerPtr; ROSCPP_DECL void initInternalTimerManager(); diff --git a/clients/roscpp/include/ros/node_handle.h b/clients/roscpp/include/ros/node_handle.h index 638011458b..fcb6c783ab 100644 --- a/clients/roscpp/include/ros/node_handle.h +++ b/clients/roscpp/include/ros/node_handle.h @@ -36,6 +36,7 @@ #include "ros/timer.h" #include "ros/rate.h" #include "ros/wall_timer.h" +#include "ros/steady_timer.h" #include "ros/advertise_options.h" #include "ros/advertise_service_options.h" #include "ros/subscribe_options.h" @@ -1466,6 +1467,85 @@ if (service) // Enter if advertised service is valid */ WallTimer createWallTimer(WallTimerOptions& ops) const; + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Versions of createSteadyTimer() + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * \brief Create a timer which will call a callback at the specified rate, using wall time to determine + * when to call the callback instead of ROS time. + * This variant takes a class member function, and a bare pointer to the object to call the method on. + * + * When the Timer (and all copies of it) returned goes out of scope, the timer will automatically + * be stopped, and the callback will no longer be called. + * + * \param period The period at which to call the callback + * \param callback The method to call + * \param obj The object to call the method on + * \param oneshot If true, this timer will only fire once + * \param autostart If true (default), return timer that is already started + */ + template + SteadyTimer createSteadyTimer(WallDuration period, void(T::*callback)(const SteadyTimerEvent&), T* obj, + bool oneshot = false, bool autostart = true) const + { + return createSteadyTimer(period, boost::bind(callback, obj, _1), oneshot, autostart); + } + + /** + * \brief Create a timer which will call a callback at the specified rate, using wall time to determine + * when to call the callback instead of ROS time. This variant takes + * a class member function, and a shared pointer to the object to call the method on. + * + * When the Timer (and all copies of it) returned goes out of scope, the timer will automatically + * be stopped, and the callback will no longer be called. + * + * \param period The period at which to call the callback + * \param callback The method to call + * \param obj The object to call the method on. Since this is a shared pointer, the object will + * automatically be tracked with a weak_ptr so that if it is deleted before the Timer goes out of + * scope the callback will no longer be called (and therefore will not crash). + * \param oneshot If true, this timer will only fire once + */ + template + SteadyTimer createSteadyTimer(WallDuration period, void(T::*callback)(const SteadyTimerEvent&), + const boost::shared_ptr& obj, + bool oneshot = false, bool autostart = true) const + { + SteadyTimerOptions ops(period, boost::bind(callback, obj.get(), _1), 0); + ops.tracked_object = obj; + ops.oneshot = oneshot; + ops.autostart = autostart; + return createSteadyTimer(ops); + } + + /** + * \brief Create a timer which will call a callback at the specified rate, using wall time to determine + * when to call the callback instead of ROS time. This variant takes + * anything that can be bound to a Boost.Function, including a bare function + * + * When the Timer (and all copies of it) returned goes out of scope, the timer will automatically + * be stopped, and the callback will no longer be called. + * + * \param period The period at which to call the callback + * \param callback The function to call + * \param oneshot If true, this timer will only fire once + */ + SteadyTimer createSteadyTimer(WallDuration period, const SteadyTimerCallback& callback, + bool oneshot = false, bool autostart = true) const; + + /** + * \brief Create a timer which will call a callback at the specified rate, using wall time to determine + * when to call the callback instead of ROS time. This variant allows + * the full range of TimerOptions. + * + * When the Timer (and all copies of it) returned goes out of scope, the timer will automatically + * be stopped, and the callback will no longer be called. + * + * \param ops The options to use when creating the timer + */ + SteadyTimer createSteadyTimer(SteadyTimerOptions& ops) const; + /** \brief Set an arbitrary XML/RPC value on the parameter server. * * \param key The key to be used in the parameter server's dictionary diff --git a/clients/roscpp/include/ros/steady_timer.h b/clients/roscpp/include/ros/steady_timer.h new file mode 100644 index 0000000000..c16f282895 --- /dev/null +++ b/clients/roscpp/include/ros/steady_timer.h @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2017, Felix Ruess, Roboception GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef ROSCPP_STEADY_TIMER_H +#define ROSCPP_STEADY_TIMER_H + +#include "common.h" +#include "forwards.h" +#include "steady_timer_options.h" + +namespace ros +{ + +/** + * \brief Manages a steady-clock timer callback + * + * A SteadyTimer should always be created through a call to NodeHandle::createSteadyTimer(), or copied from one + * that was. Once all copies of a specific + * SteadyTimer go out of scope, the callback associated with that handle will stop + * being called. + */ +class ROSCPP_DECL SteadyTimer +{ +public: + SteadyTimer() {} + SteadyTimer(const SteadyTimer& rhs); + ~SteadyTimer(); + + /** + * \brief Start the timer. Does nothing if the timer is already started. + */ + void start(); + /** + * \brief Stop the timer. Once this call returns, no more callbacks will be called. Does + * nothing if the timer is already stopped. + */ + void stop(); + + /** + * \brief Returns whether or not the timer has any pending events to call. + */ + bool hasPending(); + + /** + * \brief Set the period of this timer + */ + void setPeriod(const WallDuration& period, bool reset=true); + + bool isValid() { return impl_ && impl_->isValid(); } + operator void*() { return isValid() ? (void *) 1 : (void *) 0; } + + bool operator<(const SteadyTimer& rhs) + { + return impl_ < rhs.impl_; + } + + bool operator==(const SteadyTimer& rhs) + { + return impl_ == rhs.impl_; + } + + bool operator!=(const SteadyTimer& rhs) + { + return impl_ != rhs.impl_; + } + +private: + SteadyTimer(const SteadyTimerOptions& ops); + + class Impl + { + public: + Impl(); + ~Impl(); + + bool isValid(); + bool hasPending(); + void setPeriod(const WallDuration &period, bool reset=true); + + void start(); + void stop(); + + bool started_; + int32_t timer_handle_; + + WallDuration period_; + SteadyTimerCallback callback_; + CallbackQueueInterface *callback_queue_; + VoidConstWPtr tracked_object_; + bool has_tracked_object_; + bool oneshot_; + }; + typedef boost::shared_ptr ImplPtr; + typedef boost::weak_ptr ImplWPtr; + + ImplPtr impl_; + + friend class NodeHandle; +}; + +} + +#endif diff --git a/clients/roscpp/include/ros/steady_timer_options.h b/clients/roscpp/include/ros/steady_timer_options.h new file mode 100644 index 0000000000..e93a973de5 --- /dev/null +++ b/clients/roscpp/include/ros/steady_timer_options.h @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2017, Felix Ruess, Roboception GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef ROSCPP_STEADY_TIMER_OPTIONS_H +#define ROSCPP_STEADY_TIMER_OPTIONS_H + +#include "common.h" +#include "ros/forwards.h" + +namespace ros +{ + +/** + * \brief Encapsulates all options available for starting a timer + */ +struct ROSCPP_DECL SteadyTimerOptions +{ + SteadyTimerOptions() + : period(0.1) + , callback_queue(0) + , oneshot(false) + , autostart(true) + { + } + + /* + * \brief Constructor + * \param + */ + SteadyTimerOptions(WallDuration _period, const SteadyTimerCallback& _callback, CallbackQueueInterface* _queue, + bool oneshot = false, bool autostart = true) + : period(_period) + , callback(_callback) + , callback_queue(_queue) + , oneshot(oneshot) + , autostart(autostart) + {} + + WallDuration period; ///< The period to call the callback at + SteadyTimerCallback callback; ///< The callback to call + + CallbackQueueInterface* callback_queue; ///< Queue to add callbacks to. If NULL, the global callback queue will be used + + /** + * A shared pointer to an object to track for these callbacks. If set, the a weak_ptr will be created to this object, + * and if the reference count goes to 0 the subscriber callbacks will not get called. + * + * \note Note that setting this will cause a new reference to be added to the object before the + * callback, and for it to go out of scope (and potentially be deleted) in the code path (and therefore + * thread) that the callback is invoked from. + */ + VoidConstPtr tracked_object; + + bool oneshot; + bool autostart; +}; + + +} + +#endif + diff --git a/clients/roscpp/include/ros/timer_manager.h b/clients/roscpp/include/ros/timer_manager.h index e5da5fccdd..27f8ca1f64 100644 --- a/clients/roscpp/include/ros/timer_manager.h +++ b/clients/roscpp/include/ros/timer_manager.h @@ -28,6 +28,21 @@ #ifndef ROSCPP_TIMER_MANAGER_H #define ROSCPP_TIMER_MANAGER_H +// check if we might need to include our own backported version boost::condition_variable +// in order to use CLOCK_MONOTONIC for the SteadyTimer +// the include order here is important! +#ifdef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#include +#if BOOST_VERSION < 106100 +// use backported version of boost condition variable, see https://svn.boost.org/trac/boost/ticket/6377 +#include "boost_161_condition_variable.h" +#else // Boost version is 1.61 or greater and has the steady clock fixes +#include +#endif +#else // !BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC +#include +#endif // BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + #include "ros/forwards.h" #include "ros/time.h" #include "ros/file_log.h" @@ -35,7 +50,6 @@ #include #include #include -#include #include "ros/assert.h" #include "ros/callback_queue_interface.h" @@ -180,9 +194,9 @@ class TimerManager event.current_real = T::now(); event.profile.last_duration = info->last_cb_duration; - WallTime cb_start = WallTime::now(); + SteadyTime cb_start = SteadyTime::now(); info->callback(event); - WallTime cb_end = WallTime::now(); + SteadyTime cb_end = SteadyTime::now(); info->last_cb_duration = cb_end - cb_start; info->last_real = event.current_real; diff --git a/clients/roscpp/include/ros/transport_publisher_link.h b/clients/roscpp/include/ros/transport_publisher_link.h index a2bfe87803..7d462920cf 100644 --- a/clients/roscpp/include/ros/transport_publisher_link.h +++ b/clients/roscpp/include/ros/transport_publisher_link.h @@ -42,7 +42,7 @@ typedef boost::weak_ptr SubscriptionWPtr; class Connection; typedef boost::shared_ptr ConnectionPtr; -struct WallTimerEvent; +struct SteadyTimerEvent; /** * \brief Handles a connection to a single publisher on a given topic. Receives messages from a publisher @@ -76,14 +76,14 @@ class ROSCPP_DECL TransportPublisherLink : public PublisherLink void onMessageLength(const ConnectionPtr& conn, const boost::shared_array& buffer, uint32_t size, bool success); void onMessage(const ConnectionPtr& conn, const boost::shared_array& buffer, uint32_t size, bool success); - void onRetryTimer(const ros::WallTimerEvent&); + void onRetryTimer(const ros::SteadyTimerEvent&); ConnectionPtr connection_; int32_t retry_timer_handle_; bool needs_retry_; WallDuration retry_period_; - WallTime next_retry_; + SteadyTime next_retry_; bool dropping_; }; typedef boost::shared_ptr TransportPublisherLinkPtr; diff --git a/clients/roscpp/package.xml b/clients/roscpp/package.xml index be7edbd48c..1d6fc23d8f 100644 --- a/clients/roscpp/package.xml +++ b/clients/roscpp/package.xml @@ -31,7 +31,7 @@ roscpp_traits rosgraph_msgs roslang - rostime + rostime std_msgs xmlrpcpp @@ -41,7 +41,7 @@ roscpp_serialization roscpp_traits rosgraph_msgs - rostime + rostime std_msgs xmlrpcpp diff --git a/clients/roscpp/src/libros/internal_timer_manager.cpp b/clients/roscpp/src/libros/internal_timer_manager.cpp index 5f3ba278d0..facb4174c8 100644 --- a/clients/roscpp/src/libros/internal_timer_manager.cpp +++ b/clients/roscpp/src/libros/internal_timer_manager.cpp @@ -25,8 +25,19 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "ros/internal_timer_manager.h" +// make sure we use CLOCK_MONOTONIC for the condition variable +#define BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + #include "ros/timer_manager.h" +#include "ros/internal_timer_manager.h" + +// check if we have really included the backported boost condition variable +// just in case someone messes with the include order... +#if BOOST_VERSION < 106100 +#ifndef USING_BACKPORTED_BOOST_CONDITION_VARIABLE +#error "steady timer needs boost version >= 1.61 or the backported headers!" +#endif +#endif namespace ros { diff --git a/clients/roscpp/src/libros/master.cpp b/clients/roscpp/src/libros/master.cpp index 04d73058f5..c0e2e10d06 100644 --- a/clients/roscpp/src/libros/master.cpp +++ b/clients/roscpp/src/libros/master.cpp @@ -178,7 +178,7 @@ boost::mutex g_xmlrpc_call_mutex; bool execute(const std::string& method, const XmlRpc::XmlRpcValue& request, XmlRpc::XmlRpcValue& response, XmlRpc::XmlRpcValue& payload, bool wait_for_master) { - ros::WallTime start_time = ros::WallTime::now(); + ros::SteadyTime start_time = ros::SteadyTime::now(); std::string master_host = getHost(); uint32_t master_port = getPort(); @@ -213,7 +213,7 @@ bool execute(const std::string& method, const XmlRpc::XmlRpcValue& request, XmlR return false; } - if (!g_retry_timeout.isZero() && (ros::WallTime::now() - start_time) >= g_retry_timeout) + if (!g_retry_timeout.isZero() && (ros::SteadyTime::now() - start_time) >= g_retry_timeout) { ROS_ERROR("[%s] Timed out trying to connect to the master after [%f] seconds", method.c_str(), g_retry_timeout.toSec()); XMLRPCManager::instance()->releaseXMLRPCClient(c); diff --git a/clients/roscpp/src/libros/node_handle.cpp b/clients/roscpp/src/libros/node_handle.cpp index 7dc079749c..67caeee7fe 100644 --- a/clients/roscpp/src/libros/node_handle.cpp +++ b/clients/roscpp/src/libros/node_handle.cpp @@ -29,10 +29,12 @@ #include "ros/this_node.h" #include "ros/service.h" #include "ros/callback_queue.h" -#include "ros/timer_manager.h" #include "ros/time.h" #include "ros/rate.h" +#include "ros/timer.h" +#include "ros/wall_timer.h" +#include "ros/steady_timer.h" #include "ros/xmlrpc_manager.h" #include "ros/topic_manager.h" @@ -449,6 +451,37 @@ WallTimer NodeHandle::createWallTimer(WallTimerOptions& ops) const return timer; } +SteadyTimer NodeHandle::createSteadyTimer(WallDuration period, const SteadyTimerCallback& callback, + bool oneshot, bool autostart) const +{ + SteadyTimerOptions ops; + ops.period = period; + ops.callback = callback; + ops.oneshot = oneshot; + ops.autostart = autostart; + return createSteadyTimer(ops); +} + +SteadyTimer NodeHandle::createSteadyTimer(SteadyTimerOptions& ops) const +{ + if (ops.callback_queue == 0) + { + if (callback_queue_) + { + ops.callback_queue = callback_queue_; + } + else + { + ops.callback_queue = getGlobalCallbackQueue(); + } + } + + SteadyTimer timer(ops); + if (ops.autostart) + timer.start(); + return timer; +} + void NodeHandle::shutdown() { { diff --git a/clients/roscpp/src/libros/steady_timer.cpp b/clients/roscpp/src/libros/steady_timer.cpp new file mode 100644 index 0000000000..7621f42b45 --- /dev/null +++ b/clients/roscpp/src/libros/steady_timer.cpp @@ -0,0 +1,224 @@ +/* + * Copyright (C) 2017, Felix Ruess, Roboception GmbH + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +// make sure we use CLOCK_MONOTONIC for the condition variable +#define BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC + +#include "ros/steady_timer.h" +#include "ros/timer_manager.h" + +// check if we have really included the backported boost condition variable +// just in case someone messes with the include order... +#if BOOST_VERSION < 106100 +#ifndef USING_BACKPORTED_BOOST_CONDITION_VARIABLE +#error "steady timer needs boost version >= 1.61 or the backported headers!" +#endif +#endif + +namespace ros +{ + +// specialization for SteadyTimer, to make sure we use a version with wait_until that uses the monotonic clock +template<> +void TimerManager::threadFunc() +{ + SteadyTime current; + while (!quit_) + { + SteadyTime sleep_end; + + boost::mutex::scoped_lock lock(timers_mutex_); + + current = SteadyTime::now(); + + { + boost::mutex::scoped_lock waitlock(waiting_mutex_); + + if (waiting_.empty()) + { + sleep_end = current + WallDuration(0.1); + } + else + { + TimerInfoPtr info = findTimer(waiting_.front()); + + while (!waiting_.empty() && info && info->next_expected <= current) + { + current = SteadyTime::now(); + + //ROS_DEBUG("Scheduling timer callback for timer [%d] of period [%f], [%f] off expected", info->handle, info->period.toSec(), (current - info->next_expected).toSec()); + CallbackInterfacePtr cb(boost::make_shared(this, info, info->last_expected, info->last_real, info->next_expected)); + info->callback_queue->addCallback(cb, (uint64_t)info.get()); + + waiting_.pop_front(); + + if (waiting_.empty()) + { + break; + } + + info = findTimer(waiting_.front()); + } + + if (info) + { + sleep_end = info->next_expected; + } + } + } + + while (!new_timer_ && SteadyTime::now() < sleep_end && !quit_) + { + current = SteadyTime::now(); + + if (current >= sleep_end) + { + break; + } + + // requires boost 1.61 for wait_until to actually use the steady clock + // see: https://svn.boost.org/trac/boost/ticket/6377 + boost::chrono::steady_clock::time_point end_tp(boost::chrono::nanoseconds(sleep_end.toNSec())); + timers_cond_.wait_until(lock, end_tp); + } + + new_timer_ = false; + } +} + +SteadyTimer::Impl::Impl() + : started_(false) + , timer_handle_(-1) +{ } + +SteadyTimer::Impl::~Impl() +{ + ROS_DEBUG("SteadyTimer deregistering callbacks."); + stop(); +} + +void SteadyTimer::Impl::start() +{ + if (!started_) + { + VoidConstPtr tracked_object; + if (has_tracked_object_) + { + tracked_object = tracked_object_.lock(); + } + timer_handle_ = TimerManager::global().add(period_, callback_, callback_queue_, tracked_object, oneshot_); + started_ = true; + } +} + +void SteadyTimer::Impl::stop() +{ + if (started_) + { + started_ = false; + TimerManager::global().remove(timer_handle_); + timer_handle_ = -1; + } +} + +bool SteadyTimer::Impl::isValid() +{ + return !period_.isZero(); +} + +bool SteadyTimer::Impl::hasPending() +{ + if (!isValid() || timer_handle_ == -1) + { + return false; + } + + return TimerManager::global().hasPending(timer_handle_); +} + +void SteadyTimer::Impl::setPeriod(const WallDuration& period, bool reset) +{ + period_ = period; + TimerManager::global().setPeriod(timer_handle_, period, reset); +} + + +SteadyTimer::SteadyTimer(const SteadyTimerOptions& ops) +: impl_(new Impl) +{ + impl_->period_ = ops.period; + impl_->callback_ = ops.callback; + impl_->callback_queue_ = ops.callback_queue; + impl_->tracked_object_ = ops.tracked_object; + impl_->has_tracked_object_ = (ops.tracked_object != NULL); + impl_->oneshot_ = ops.oneshot; +} + +SteadyTimer::SteadyTimer(const SteadyTimer& rhs) +{ + impl_ = rhs.impl_; +} + +SteadyTimer::~SteadyTimer() +{ +} + +void SteadyTimer::start() +{ + if (impl_) + { + impl_->start(); + } +} + +void SteadyTimer::stop() +{ + if (impl_) + { + impl_->stop(); + } +} + +bool SteadyTimer::hasPending() +{ + if (impl_) + { + return impl_->hasPending(); + } + + return false; +} + +void SteadyTimer::setPeriod(const WallDuration& period, bool reset) +{ + if (impl_) + { + impl_->setPeriod(period, reset); + } +} + +} diff --git a/clients/roscpp/src/libros/transport_publisher_link.cpp b/clients/roscpp/src/libros/transport_publisher_link.cpp index 6e9b8785b4..092585d3f2 100644 --- a/clients/roscpp/src/libros/transport_publisher_link.cpp +++ b/clients/roscpp/src/libros/transport_publisher_link.cpp @@ -198,14 +198,14 @@ void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::s } } -void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&) +void TransportPublisherLink::onRetryTimer(const ros::SteadyTimerEvent&) { if (dropping_) { return; } - if (needs_retry_ && WallTime::now() > next_retry_) + if (needs_retry_ && SteadyTime::now() > next_retry_) { retry_period_ = std::min(retry_period_ * 2, WallDuration(20)); needs_retry_ = false; @@ -268,12 +268,12 @@ void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Conn ROS_ASSERT(!needs_retry_); needs_retry_ = true; - next_retry_ = WallTime::now() + retry_period_; + next_retry_ = SteadyTime::now() + retry_period_; if (retry_timer_handle_ == -1) { retry_period_ = WallDuration(0.1); - next_retry_ = WallTime::now() + retry_period_; + next_retry_ = SteadyTime::now() + retry_period_; // shared_from_this() shared_ptr is used to ensure TransportPublisherLink is not // destroyed in the middle of onRetryTimer execution retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_), diff --git a/test/test_roscpp/test/src/timer_callbacks.cpp b/test/test_roscpp/test/src/timer_callbacks.cpp index 9f2bc7ec5e..4f87907e95 100644 --- a/test/test_roscpp/test/src/timer_callbacks.cpp +++ b/test/test_roscpp/test/src/timer_callbacks.cpp @@ -54,6 +54,319 @@ using namespace test_roscpp; std::string g_node_name = "test_timer_callbacks"; + +/************************* SteadyTimer tests **********************/ + +class SteadyTimerHelper +{ + public: + SteadyTimerHelper(float period, bool oneshot = false) + : expected_period_(period) + , failed_(false) + , total_calls_(0) + , testing_period_(false) + , calls_before_testing_period_(0) + { + NodeHandle n; + timer_ = n.createSteadyTimer(expected_period_, &SteadyTimerHelper::callback, this, oneshot); + } + + void callback(const SteadyTimerEvent& e) + { + bool first = last_call_.isZero(); + SteadyTime last_call = last_call_; + last_call_ = SteadyTime::now(); + SteadyTime start = last_call_; + + if (!first) + { + if (fabsf(expected_next_call_.toSec() - start.toSec()) > 0.1f) + { + ROS_ERROR("Call came at wrong time (%f vs. %f)", expected_next_call_.toSec(), start.toSec()); + failed_ = true; + } + } + + if(testing_period_) + { + + // Inside callback, less than current period, reset=false + if(total_calls_ == calls_before_testing_period_) + { + WallDuration p(0.5); + pretendWork(0.15); + setPeriod(p); + } + + // Inside callback, greater than current period, reset=false + else if(total_calls_ == (calls_before_testing_period_+1)) + { + WallDuration p(0.25); + pretendWork(0.15); + setPeriod(p); + } + + // Inside callback, less than current period, reset=true + else if(total_calls_ == (calls_before_testing_period_+2)) + { + WallDuration p(0.5); + pretendWork(0.15); + setPeriod(p, true); + } + + // Inside callback, greater than current period, reset=true + else if(total_calls_ == (calls_before_testing_period_+3)) + { + WallDuration p(0.25); + pretendWork(0.15); + setPeriod(p, true); + } + } + else + { + expected_next_call_ = e.current_expected + expected_period_; + } + + SteadyTime end = SteadyTime::now(); + last_duration_ = end - start; + + ++total_calls_; + } + + void setPeriod(const WallDuration p, bool reset=false) + { + if(reset) + { + expected_next_call_ = SteadyTime::now() + p; + } + else + { + expected_next_call_ = last_call_ + p; + } + + timer_.setPeriod(p, reset); + expected_period_ = p; + } + + + void pretendWork(const float t) + { + ros::Rate r(1. / t); + r.sleep(); + } + + SteadyTime last_call_; + SteadyTime expected_next_call_; + WallDuration expected_period_; + WallDuration last_duration_; + + bool failed_; + + SteadyTimer timer_; + int32_t total_calls_; + + bool testing_period_; + int calls_before_testing_period_; +}; + +TEST(RoscppTimerCallbacks, singleSteadyTimeCallback) +{ + NodeHandle n; + SteadyTimerHelper helper1(0.01); + + WallDuration d(0.001f); + for (int32_t i = 0; i < 1000 && n.ok(); ++i) + { + spinOnce(); + d.sleep(); + } + + if (helper1.failed_) + { + FAIL(); + } + + if (helper1.total_calls_ < 99) + { + ROS_ERROR("Total calls: %d (expected at least 100)", helper1.total_calls_); + FAIL(); + } +} + +TEST(RoscppTimerCallbacks, multipleSteadyTimeCallbacks) +{ + NodeHandle n; + const int count = 100; + typedef boost::scoped_ptr HelperPtr; + HelperPtr helpers[count]; + for (int i = 0; i < count; ++i) + { + helpers[i].reset(new SteadyTimerHelper((float)(i + 1) * 0.1f)); + } + + WallDuration d(0.01f); + const int spin_count = 1000; + for (int32_t i = 0; i < spin_count && n.ok(); ++i) + { + spinOnce(); + d.sleep(); + } + + for (int i = 0; i < count; ++i) + { + if (helpers[i]->failed_) + { + ROS_ERROR("Helper %d failed", i); + FAIL(); + } + + int32_t expected_count = (spin_count * d.toSec()) / helpers[i]->expected_period_.toSec(); + if (helpers[i]->total_calls_ < (expected_count - 1)) + { + ROS_ERROR("Helper %d total calls: %d (at least %d expected)", i, helpers[i]->total_calls_, expected_count); + FAIL(); + } + } +} + +TEST(RoscppTimerCallbacks, steadySetPeriod) +{ + NodeHandle n; + WallDuration period(0.5); + SteadyTimerHelper helper(period.toSec()); + Rate r(100); + + // Let the callback occur once before getting started + while(helper.total_calls_ < 1) + { + spinOnce(); + r.sleep(); + } + + helper.pretendWork(0.1); + + // outside callback, new period < old period, reset = false + Duration wait(0.5); + WallDuration p(0.25); + helper.setPeriod(p); + while(helper.total_calls_ < 2) + { + spinOnce(); + r.sleep(); + } + + helper.pretendWork(0.1); + + // outside callback, new period > old period, reset = false + WallDuration p2(0.5); + helper.setPeriod(p); + while(helper.total_calls_ < 3) + { + spinOnce(); + r.sleep(); + } + + helper.pretendWork(0.1); + + // outside callback, new period < old period, reset = true + WallDuration p3(0.25); + helper.setPeriod(p, true); + while(helper.total_calls_ < 4) + { + spinOnce(); + r.sleep(); + } + + helper.pretendWork(0.1); + + // outside callback, new period > old period, reset = true + WallDuration p4(0.5); + helper.setPeriod(p, true); + while(helper.total_calls_ < 5) + { + spinOnce(); + r.sleep(); + } + + // Test calling setPeriod inside callback + helper.calls_before_testing_period_ = helper.total_calls_; + int total = helper.total_calls_ + 5; + helper.testing_period_ = true; + while(helper.total_calls_ < total) + { + spinOnce(); + r.sleep(); + } + helper.testing_period_ = false; + + + if(helper.failed_) + { + ROS_ERROR("Helper failed in setPeriod"); + FAIL(); + } +} + +TEST(RoscppTimerCallbacks, stopSteadyTimer) +{ + NodeHandle n; + SteadyTimerHelper helper(0.001); + + for (int32_t i = 0; i < 1000 && n.ok(); ++i) + { + WallDuration(0.001).sleep(); + spinOnce(); + } + + ASSERT_GT(helper.total_calls_, 0); + int32_t last_count = helper.total_calls_; + helper.timer_.stop(); + + for (int32_t i = 0; i < 1000 && n.ok(); ++i) + { + WallDuration(0.001).sleep(); + spinOnce(); + } + + ASSERT_EQ(last_count, helper.total_calls_); +} + +int32_t g_steady_count = 0; +void steadyTimerCallback(const ros::SteadyTimerEvent&) +{ + ++g_steady_count; +} + +TEST(RoscppTimerCallbacks, steadyStopThenSpin) +{ + g_steady_count = 0; + NodeHandle n; + ros::SteadyTimer timer = n.createSteadyTimer(ros::WallDuration(0.001), steadyTimerCallback); + + WallDuration(0.1).sleep(); + timer.stop(); + + spinOnce(); + + ASSERT_EQ(g_steady_count, 0); +} + +TEST(RoscppTimerCallbacks, oneShotSteadyTimer) +{ + NodeHandle n; + SteadyTimerHelper helper(0.001, true); + + for (int32_t i = 0; i < 1000 && n.ok(); ++i) + { + WallDuration(0.001).sleep(); + spinOnce(); + } + + ASSERT_EQ(helper.total_calls_, 1); +} + +/************************* WallTimer tests **********************/ + class WallTimerHelper { public: