Skip to content

Commit

Permalink
lockstep_scheduler: optimize performance
Browse files Browse the repository at this point in the history
- use a linked-list instead of std::vector. Insertion and removal are now
  O(1)
- avoid malloc and use a thread_local instance of TimedWait.
  It gets destroyed when the thread exits, so we have to add protection
  in case a thread exits too quickly. This in turn requires a fix to the
  unit-tests.
  • Loading branch information
bkueng authored and julianoes committed Jan 14, 2019
1 parent b6ba7b6 commit 828e31d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 34 deletions.
2 changes: 2 additions & 0 deletions platforms/posix/src/lockstep_scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ if(NOT PROJECT_NAME STREQUAL "px4")

set (CMAKE_CXX_STANDARD 11)

add_definitions(-DUNIT_TESTS)

add_library(lockstep_scheduler
src/lockstep_scheduler.cpp
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
class LockstepScheduler
{
public:
~LockstepScheduler();

void set_absolute_time(uint64_t time_us);
inline uint64_t get_absolute_time() const { return time_us_; }
int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us);
Expand All @@ -19,13 +21,28 @@ class LockstepScheduler
std::atomic<uint64_t> time_us_{0};

struct TimedWait {
~TimedWait()
{
// If a thread quickly exits after a cond_timedwait(), the
// thread_local object can still be in the linked list. In that case
// we need to wait until it's removed.
while (!removed) {
#ifndef UNIT_TESTS // unit tests don't define system_usleep and execute faster w/o sleeping here
system_sleep(5000);
#endif
}
}

pthread_cond_t *passed_cond{nullptr};
pthread_mutex_t *passed_lock{nullptr};
uint64_t time_us{0};
bool timeout{false};
std::atomic<bool> done{false};
std::atomic<bool> removed{true};

TimedWait *next{nullptr}; ///< linked list
};
std::vector<std::shared_ptr<TimedWait>> timed_waits_{};
std::mutex timed_waits_mutex_{};
TimedWait *timed_waits_{nullptr}; ///< head of linked list
std::mutex timed_waits_mutex_;
std::atomic<bool> setting_time_{false}; ///< true if set_absolute_time() is currently being executed
};
73 changes: 51 additions & 22 deletions platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
#include "lockstep_scheduler/lockstep_scheduler.h"

LockstepScheduler::~LockstepScheduler()
{
// cleanup the linked list
std::unique_lock<std::mutex> lock_timed_waits(timed_waits_mutex_);

while (timed_waits_) {
TimedWait *tmp = timed_waits_;
timed_waits_ = timed_waits_->next;
tmp->removed = true;
}
}

void LockstepScheduler::set_absolute_time(uint64_t time_us)
{
Expand All @@ -9,29 +20,38 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)
std::unique_lock<std::mutex> lock_timed_waits(timed_waits_mutex_);
setting_time_ = true;

auto it = std::begin(timed_waits_);

while (it != std::end(timed_waits_)) {

std::shared_ptr<TimedWait> temp_timed_wait = *it;
TimedWait *timed_wait = timed_waits_;
TimedWait *timed_wait_prev = nullptr;

while (timed_wait) {
// Clean up the ones that are already done from last iteration.
if (temp_timed_wait->done) {
it = timed_waits_.erase(it);
if (timed_wait->done) {
// Erase from the linked list
if (timed_wait_prev) {
timed_wait_prev->next = timed_wait->next;

} else {
timed_waits_ = timed_wait->next;
}

TimedWait *tmp = timed_wait;
timed_wait = timed_wait->next;
tmp->removed = true;
continue;
}

if (temp_timed_wait->time_us <= time_us &&
!temp_timed_wait->timeout) {
if (timed_wait->time_us <= time_us &&
!timed_wait->timeout) {
// We are abusing the condition here to signal that the time
// has passed.
pthread_mutex_lock(temp_timed_wait->passed_lock);
temp_timed_wait->timeout = true;
pthread_cond_broadcast(temp_timed_wait->passed_cond);
pthread_mutex_unlock(temp_timed_wait->passed_lock);
pthread_mutex_lock(timed_wait->passed_lock);
timed_wait->timeout = true;
pthread_cond_broadcast(timed_wait->passed_cond);
pthread_mutex_unlock(timed_wait->passed_lock);
}

++it;
timed_wait_prev = timed_wait;
timed_wait = timed_wait->next;
}

setting_time_ = false;
Expand All @@ -40,7 +60,9 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us)

int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us)
{
std::shared_ptr<TimedWait> new_timed_wait;
// A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be
// longer. And using thread_local is more efficient than malloc.
static thread_local TimedWait timed_wait;
{
std::lock_guard<std::mutex> lock_timed_waits(timed_waits_mutex_);

Expand All @@ -49,22 +71,29 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc
return ETIMEDOUT;
}

new_timed_wait = std::make_shared<TimedWait>();
new_timed_wait->time_us = time_us;
new_timed_wait->passed_cond = cond;
new_timed_wait->passed_lock = lock;
timed_waits_.push_back(new_timed_wait);
timed_wait.time_us = time_us;
timed_wait.passed_cond = cond;
timed_wait.passed_lock = lock;
timed_wait.timeout = false;
timed_wait.done = false;

// Add to linked list if not removed yet (otherwise just re-use the object)
if (timed_wait.removed) {
timed_wait.removed = false;
timed_wait.next = timed_waits_;
timed_waits_ = &timed_wait;
}
}

int result = pthread_cond_wait(cond, lock);

const bool timeout = new_timed_wait->timeout;
const bool timeout = timed_wait.timeout;

if (result == 0 && timeout) {
result = ETIMEDOUT;
}

new_timed_wait->done = true;
timed_wait.done = true;

if (!timeout && setting_time_) {
// This is where it gets tricky: the timeout has not been triggered yet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,43 @@
#include <atomic>
#include <random>
#include <iostream>
#include <functional>
#include <chrono>

class TestThread
{
public:
TestThread(const std::function<void()> &f)
: _f(f)
{
_thread = std::thread(std::bind(&TestThread::execute, this));
}

void join(LockstepScheduler &ls)
{
// The unit-tests do not reflect the real usage, where
// set_absolute_time() is called regularly and can do some
// cleanup tasks. We simulate that here by waiting until
// the given task returns (which is expected to happen quickly)
// and then call set_absolute_time(), which can do the cleanup,
// and _thread can then exit as well.
while (!_done) {
std::this_thread::yield(); // usleep is too slow here
}

ls.set_absolute_time(ls.get_absolute_time());
_thread.join();
}
private:
void execute()
{
_f();
_done = true;
}
std::function<void()> _f;
std::atomic<bool> _done{false};
std::thread _thread;
};

constexpr uint64_t some_time_us = 12345678;

Expand Down Expand Up @@ -33,7 +69,7 @@ void test_condition_timing_out()

// Use a thread to wait for condition while we already have the lock.
// This ensures the synchronization happens in the right order.
std::thread thread([&ls, &cond, &lock, &should_have_timed_out]() {
TestThread thread([&ls, &cond, &lock, &should_have_timed_out]() {
assert(ls.cond_timedwait(&cond, &lock, some_time_us + 1000) == ETIMEDOUT);
assert(should_have_timed_out);
// It should be re-locked afterwards, so we should be able to unlock it.
Expand All @@ -44,7 +80,7 @@ void test_condition_timing_out()
should_have_timed_out = true;
ls.set_absolute_time(some_time_us + 1500);

thread.join();
thread.join(ls);

pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
Expand All @@ -67,7 +103,7 @@ void test_locked_semaphore_getting_unlocked()
pthread_mutex_lock(&lock);
// Use a thread to wait for condition while we already have the lock.
// This ensures the synchronization happens in the right order.
std::thread thread([&ls, &cond, &lock]() {
TestThread thread([&ls, &cond, &lock]() {

ls.set_absolute_time(some_time_us + 500);
assert(ls.cond_timedwait(&cond, &lock, some_time_us + 1000) == 0);
Expand All @@ -79,7 +115,7 @@ void test_locked_semaphore_getting_unlocked()
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);

thread.join();
thread.join(ls);

pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
Expand Down Expand Up @@ -107,7 +143,7 @@ class TestCase
void run()
{
pthread_mutex_lock(&lock_);
thread_ = std::make_shared<std::thread>([this]() {
thread_ = std::make_shared<TestThread>([this]() {
result_ = ls_.cond_timedwait(&cond_, &lock_, timeout_);
pthread_mutex_unlock(&lock_);
});
Expand All @@ -131,13 +167,13 @@ class TestCase
pthread_mutex_unlock(&lock_);
is_done_ = true;
// We can be sure that this triggers.
thread_->join();
thread_->join(ls_);
assert(result_ == 0);
}

else if (timeout_reached) {
is_done_ = true;
thread_->join();
thread_->join(ls_);
assert(result_ == ETIMEDOUT);
}
}
Expand All @@ -151,7 +187,7 @@ class TestCase
LockstepScheduler &ls_;
std::atomic<bool> is_done_{false};
std::atomic<int> result_ {INITIAL_RESULT};
std::shared_ptr<std::thread> thread_{};
std::shared_ptr<TestThread> thread_{};
};

int random_number(int min, int max)
Expand Down Expand Up @@ -250,7 +286,7 @@ void test_usleep()

std::atomic<Step> step{Step::Init};

std::thread thread([&step, &ls]() {
TestThread thread([&step, &ls]() {
step = Step::ThreadStarted;

WAIT_FOR(step == Step::BeforeUsleep);
Expand All @@ -268,7 +304,7 @@ void test_usleep()

assert(ls.usleep_until(some_time_us + 1000) == 0);
assert(step == Step::UsleepTriggered);
thread.join();
thread.join(ls);
}

int main(int /*argc*/, char ** /*argv*/)
Expand Down

0 comments on commit 828e31d

Please sign in to comment.