diff --git a/common/utilities/time/waiting-on.h b/common/utilities/time/waiting-on.h index b34a52f9a7..85b410d2db 100644 --- a/common/utilities/time/waiting-on.h +++ b/common/utilities/time/waiting-on.h @@ -14,6 +14,10 @@ namespace time { // Helper class -- encapsulate a variable of type T that we want to wait on: another thread will set // it and signal when we can continue... +// +// We use the least amount of synchronization mechanisms: no effort is made to synchronize (usually, +// it's not needed: only one thread will be writing to T, and the owner of T will be waiting on it) +// so it's the responsibility of the user to do so if needed. // template< class T > class waiting_on @@ -54,6 +58,10 @@ class waiting_on { _cv.notify_one(); } + void signal_all() + { + _cv.notify_all(); + } }; private: std::shared_ptr< wait_state_t > _ptr; @@ -75,6 +83,16 @@ class waiting_on : _ptr( local._ptr ) { } +#if 0 // TODO this causes major slowdowns! left in here for Eran to break his head against... + ~in_thread_() + { + // We get here when the lambda we're in is destroyed -- so either we've already run + // (and signalled once) or we've never run. We signal anyway -- if anything's waiting + // they'll get woken up; otherwise nothing'll happen... + if( auto wait_state = still_alive() ) + wait_state->signal_all(); + } +#endif std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); } @@ -100,7 +118,7 @@ class waiting_on // Convert to the in-thread representation in_thread_ in_thread() const { return in_thread_( *this ); } - operator T const &() const { return _ptr->_value; } + operator T const &() const { return *_ptr; } // struct value_t { double x; int k; }; // waiting_on< value_t > output({ 1., -1 }); diff --git a/src/media/playback/playback_device.cpp b/src/media/playback/playback_device.cpp index 4788ab7113..beab1a74b4 100644 --- a/src/media/playback/playback_device.cpp +++ b/src/media/playback/playback_device.cpp @@ -512,39 +512,23 @@ void playback_device::do_loop(T action) //On failure, exit thread if(action_succeeded == false && m_is_started) { - std::vector> playback_sensors_copy; + // Stopping the sensor will call another function which will remove the sensor from the + // list of active sensors, which will cause issues -- so we copy it first + std::vector< std::shared_ptr< playback_sensor > > playback_sensors_copy; { - std::lock_guard locker(_active_sensors_mutex); - { - for (auto s : m_active_sensors) - { - playback_sensors_copy.push_back(s.second); - } - } + std::lock_guard< std::mutex > locker( _active_sensors_mutex ); + for (auto s : m_active_sensors) + playback_sensors_copy.push_back( s.second ); } - - - for (auto& psc : playback_sensors_copy) + for( auto & psc : playback_sensors_copy ) { - if (psc) + if( psc ) { psc->flush_pending_frames(); - psc->stop(false); + psc->stop( false ); } } - //Go over the sensors and stop them - //size_t active_sensors_count = m_active_sensors.size(); - //for - //for (size_t i = 0; isecond->stop(false); - //} - m_last_published_timestamp = device_serializer::nanoseconds(0); //After all sensors were stopped, stop_internal() is called and flags m_is_started as false diff --git a/unit-tests/utilities/time/test-waiting-on.cpp b/unit-tests/utilities/time/test-waiting-on.cpp index dd9dbe48d8..34d91fce21 100644 --- a/unit-tests/utilities/time/test-waiting-on.cpp +++ b/unit-tests/utilities/time/test-waiting-on.cpp @@ -11,6 +11,7 @@ INITIALIZE_EASYLOGGINGPP #include #include +#include using utilities::time::waiting_on; @@ -84,3 +85,47 @@ TEST_CASE( "Struct usage" ) output.wait_until( std::chrono::seconds( 3 ), [&]() { return false; } ); REQUIRE( output->i == 30 ); } + +TEST_CASE( "Not invoked but still notified" ) +{ + // Emulate some dispatcher + typedef std::function< void () > func; + auto dispatcher = new std::queue< func >; + + // Push some stuff onto it (not important what) + int i = 0; + dispatcher->push( [&]() { ++i; } ); + dispatcher->push( [&]() { ++i; } ); + + // Add something we'll be waiting on + utilities::time::waiting_on< bool > invoked( false ); + dispatcher->push( [invoked_in_thread = invoked.in_thread()]() { + invoked_in_thread.signal( true ); + } ); + + // Destroy the dispatcher while we're waiting on the invocation! + std::atomic_bool stopped( false ); + std::thread( [&]() { + std::this_thread::sleep_for( std::chrono::seconds( 2 )); + stopped = true; + delete dispatcher; + } ).detach(); + + // Wait for it -- we'd expect that, when 'invoked_in_thread' is destroyed, it'll wake us up and + // not wait for the timeout + auto wait_start = std::chrono::high_resolution_clock::now(); + invoked.wait_until( std::chrono::seconds( 5 ), [&]() { + return invoked || stopped; // Without stopped, invoked will be false and we'll wait again + // even after we're signalled! + } ); + auto wait_end = std::chrono::high_resolution_clock::now(); + auto waited_ms = std::chrono::duration_cast( wait_end - wait_start ).count(); +#if 1 + // TODO: the requires below depend on the commented-out ~waiting_on::in_frame_(), but it also + // causes unintended slowdowns when the playback is done + REQUIRE( waited_ms > 4990 ); +#else + REQUIRE( waited_ms > 1990 ); + REQUIRE( waited_ms < 3000 ); // Up to a second buffer +#endif +}