Skip to content

Commit

Permalink
CR changes; added another unit-test for waiting_on<>
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Jun 13, 2021
1 parent 2e7a30e commit ac03d9c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 26 deletions.
20 changes: 19 additions & 1 deletion common/utilities/time/waiting-on.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ namespace time {

// Helper class -- encapsulate a variable of type T that we want to wait on: another thread will set
// it and signal when we can continue...
//
// We use the least amount of synchronization mechanisms: no effort is made to synchronize (usually,
// it's not needed: only one thread will be writing to T, and the owner of T will be waiting on it)
// so it's the responsibility of the user to do so if needed.
//
template< class T >
class waiting_on
Expand Down Expand Up @@ -54,6 +58,10 @@ class waiting_on
{
_cv.notify_one();
}
void signal_all()
{
_cv.notify_all();
}
};
private:
std::shared_ptr< wait_state_t > _ptr;
Expand All @@ -75,6 +83,16 @@ class waiting_on
: _ptr( local._ptr )
{
}
#if 0 // TODO this causes major slowdowns! left in here for Eran to break his head against...
~in_thread_()
{
// We get here when the lambda we're in is destroyed -- so either we've already run
// (and signalled once) or we've never run. We signal anyway -- if anything's waiting
// they'll get woken up; otherwise nothing'll happen...
if( auto wait_state = still_alive() )
wait_state->signal_all();
}
#endif

std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); }

Expand All @@ -100,7 +118,7 @@ class waiting_on
// Convert to the in-thread representation
in_thread_ in_thread() const { return in_thread_( *this ); }

operator T const &() const { return _ptr->_value; }
operator T const &() const { return *_ptr; }

// struct value_t { double x; int k; };
// waiting_on< value_t > output({ 1., -1 });
Expand Down
34 changes: 9 additions & 25 deletions src/media/playback/playback_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,39 +512,23 @@ void playback_device::do_loop(T action)
//On failure, exit thread
if(action_succeeded == false && m_is_started)
{
std::vector<std::shared_ptr<playback_sensor>> playback_sensors_copy;
// Stopping the sensor will call another function which will remove the sensor from the
// list of active sensors, which will cause issues -- so we copy it first
std::vector< std::shared_ptr< playback_sensor > > playback_sensors_copy;
{
std::lock_guard<std::mutex> locker(_active_sensors_mutex);
{
for (auto s : m_active_sensors)
{
playback_sensors_copy.push_back(s.second);
}
}
std::lock_guard< std::mutex > locker( _active_sensors_mutex );
for (auto s : m_active_sensors)
playback_sensors_copy.push_back( s.second );
}


for (auto& psc : playback_sensors_copy)
for( auto & psc : playback_sensors_copy )
{
if (psc)
if( psc )
{
psc->flush_pending_frames();
psc->stop(false);
psc->stop( false );
}
}

//Go over the sensors and stop them
//size_t active_sensors_count = m_active_sensors.size();
//for
//for (size_t i = 0; i<active_sensors_count; i++)
//{
// if (m_active_sensors.size() == 0)
// break;

// //NOTE: calling stop will remove the sensor from m_active_sensors
// m_active_sensors.begin()->second->stop(false);
//}

m_last_published_timestamp = device_serializer::nanoseconds(0);

//After all sensors were stopped, stop_internal() is called and flags m_is_started as false
Expand Down
45 changes: 45 additions & 0 deletions unit-tests/utilities/time/test-waiting-on.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ INITIALIZE_EASYLOGGINGPP

#include <unit-tests/catch.h>
#include <common/utilities/time/waiting-on.h>
#include <queue>

using utilities::time::waiting_on;

Expand Down Expand Up @@ -84,3 +85,47 @@ TEST_CASE( "Struct usage" )
output.wait_until( std::chrono::seconds( 3 ), [&]() { return false; } );
REQUIRE( output->i == 30 );
}

TEST_CASE( "Not invoked but still notified" )
{
// Emulate some dispatcher
typedef std::function< void () > func;
auto dispatcher = new std::queue< func >;

// Push some stuff onto it (not important what)
int i = 0;
dispatcher->push( [&]() { ++i; } );
dispatcher->push( [&]() { ++i; } );

// Add something we'll be waiting on
utilities::time::waiting_on< bool > invoked( false );
dispatcher->push( [invoked_in_thread = invoked.in_thread()]() {
invoked_in_thread.signal( true );
} );

// Destroy the dispatcher while we're waiting on the invocation!
std::atomic_bool stopped( false );
std::thread( [&]() {
std::this_thread::sleep_for( std::chrono::seconds( 2 ));
stopped = true;
delete dispatcher;
} ).detach();

// Wait for it -- we'd expect that, when 'invoked_in_thread' is destroyed, it'll wake us up and
// not wait for the timeout
auto wait_start = std::chrono::high_resolution_clock::now();
invoked.wait_until( std::chrono::seconds( 5 ), [&]() {
return invoked || stopped; // Without stopped, invoked will be false and we'll wait again
// even after we're signalled!
} );
auto wait_end = std::chrono::high_resolution_clock::now();
auto waited_ms = std::chrono::duration_cast<std::chrono::milliseconds>( wait_end - wait_start ).count();
#if 1
// TODO: the requires below depend on the commented-out ~waiting_on::in_frame_(), but it also
// causes unintended slowdowns when the playback is done
REQUIRE( waited_ms > 4990 );
#else
REQUIRE( waited_ms > 1990 );
REQUIRE( waited_ms < 3000 ); // Up to a second buffer
#endif
}

0 comments on commit ac03d9c

Please sign in to comment.