Skip to content

Commit

Permalink
[tools] Fix some multi threading issues
Browse files Browse the repository at this point in the history
  • Loading branch information
c-jimenez committed Feb 10, 2024
1 parent 2124bf8 commit 2a66753
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
9 changes: 9 additions & 0 deletions src/rpc/RpcBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ void RpcBase::processDisconnected()
// Disable queues
m_requests_queue.setEnable(false);
m_results_queue.setEnable(false);

// Check if a pool has been configured
if (m_pool)
{
// Disable owner
m_rpc_owner->lock.lock();
m_rpc_owner->is_operational = false;
m_rpc_owner->lock.unlock();
}
}

/** @brief Process received data */
Expand Down
3 changes: 2 additions & 1 deletion src/tools/helpers/Queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.
#ifndef OPENOCPP_QUEUE_H
#define OPENOCPP_QUEUE_H

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
Expand Down Expand Up @@ -208,7 +209,7 @@ class Queue
/** @brief Queue to store data */
std::queue<ItemType> m_queue;
/** @brief Indicate that the queue is enabled */
bool m_enabled;
std::atomic<bool> m_enabled;
};

} // namespace helpers
Expand Down
5 changes: 3 additions & 2 deletions src/tools/helpers/TimerPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.

#include "ITimerPool.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
Expand Down Expand Up @@ -54,9 +55,9 @@ class TimerPool : public ITimerPool

private:
/** @brief Indicate that the timers must stop */
bool m_stop;
std::atomic<bool> m_stop;
/** @brief Indicate that the next wakeup time has changed */
bool m_update_wakeup_time;
std::atomic<bool> m_update_wakeup_time;
/** @brief Mutex for wakeup condition */
std::mutex m_wakeup_mutex;
/** @brief Wakeup condition */
Expand Down
9 changes: 5 additions & 4 deletions src/tools/helpers/WorkerThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.

#include "Queue.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
Expand Down Expand Up @@ -64,7 +65,7 @@ class JobBase : public IJob
/** @brief Condition variable for end of job synchronization */
std::condition_variable end_of_job_var;
/** @brief Indicate end of job */
bool end;
std::atomic<bool> end;
/** @brief Function to execute */
std::function<ReturnType()> function;
};
Expand Down Expand Up @@ -159,7 +160,7 @@ class Waiter
{
Job<ReturnType>* job = dynamic_cast<Job<ReturnType>*>(m_job.get());
std::unique_lock<std::mutex> lock(job->end_of_job_mutex);
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; });
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); });
}

private:
Expand All @@ -186,7 +187,7 @@ class Waiter<void>
{
Job<void>* job = dynamic_cast<Job<void>*>(m_job.get());
std::unique_lock<std::mutex> lock(job->end_of_job_mutex);
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; });
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); });
}

private:
Expand Down Expand Up @@ -229,7 +230,7 @@ class WorkerThreadPool

private:
/** @brief Indicate that the threads must stop */
bool m_stop;
std::atomic<bool> m_stop;
/** @brief Worker threads */
std::vector<std::thread*> m_threads;
/** @brief Job queue */
Expand Down

0 comments on commit 2a66753

Please sign in to comment.