-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathThreadPool.h
151 lines (122 loc) · 4.09 KB
/
ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
// https://embeddedartistry.com/blog/2017/02/08/implementing-an-asynchronous-dispatch-queue/
#ifdef DEBUG
#include <cstdio>
#define debug printf
#else
#define debug(...)
#endif
class ThreadPool {
using Task = std::function<void()>;
public:
// Deleted operations
ThreadPool(const ThreadPool &rhs) = delete;
ThreadPool &operator=(const ThreadPool &rhs) = delete;
ThreadPool(ThreadPool &&rhs) = delete;
ThreadPool &operator=(ThreadPool &&rhs) = delete;
ThreadPool(size_t threads = 1)
: _threads(threads), _num_active_threads(threads)
{
debug("Construct ThreadPool with threads: %zu\n", threads);
// 活跃线程数加加,
// 在析构函数之前,`_num_active_threads`无法被减到0,线程不能退出
// 以此避免线程在析构函数开始执行之前退出
_num_active_threads += 1;
for (size_t i = 0; i < threads; ++i)
_threads[i] = std::thread(&ThreadPool::worker, this);
}
~ThreadPool()
{
// 活跃线程数减减,
// 此后`_num_active_threads`可以被减到0,线程能够退出
std::unique_lock<std::mutex> _lock(_mtx);
_num_active_threads -= 1;
_lock.unlock();
// 通知工作线程,可以退出
_cv.notify_all();
// join all threads
#ifdef DEBUG
int i = 0;
#endif
for (auto &t : _threads) {
if (t.joinable()) {
t.join();
debug("Destructor: thread %d joined\n", ++i);
}
}
}
// copy and submit the task
void submit(const Task &s)
{
std::unique_lock<std::mutex> _lock(_mtx);
debug("Thread %zu add new task\n", current_thread_id());
_q.push(s);
// 在通知之前手动解锁,以避免等待唤醒的线程在被唤醒之后仅仅是再次阻塞
_lock.unlock();
_cv.notify_one();
}
// move and submit the task
void submit(Task &&s)
{
std::unique_lock<std::mutex> _lock(_mtx);
debug("Thread %zu add new task\n", current_thread_id());
_q.push(std::move(s));
// 同上
_lock.unlock();
_cv.notify_one();
}
#ifdef DEBUG
size_t current_thread_id()
{
std::thread::id id = std::this_thread::get_id();
for (size_t i = 0; i < _threads.size(); ++i) {
if (_threads[i].get_id() == id)
return i + 1;
}
return 0;
}
#endif
private:
std::condition_variable _cv;
std::mutex _mtx;
std::queue<Task> _q;
std::vector<std::thread> _threads;
int _num_active_threads;
// 工作函数,运行于`工作线程`之中
void worker()
{
Task task;
for (; ;) {
{
std::unique_lock<std::mutex> _lock(_mtx);
_num_active_threads -= 1;
// 等待,直到有任务可执行,或者所有工作线程均已不再活跃
_cv.wait(_lock, [this] {
return (!_q.empty() || _num_active_threads == 0);
});
// 无任务可执行,那么进入临界区的条件就是
// `_num_active_threads == 0`
// 这时候,由于所有工作线程均处于等待任务状态,
// 就意味着,不会再有新任务被产生和提交。
// 于是跳出循环,不再继续无谓的等待。
if (_q.empty())
break;
_num_active_threads += 1;
task = std::move(_q.front());
_q.pop();
}
debug("Thread %zu run task, active threads: %d\n",
current_thread_id(), _num_active_threads);
task();
}
// 通知所有线程退出,也许可以考虑串联通知
_cv.notify_all();
debug("Thread %zu exited, active threads: %d\n",
current_thread_id(), _num_active_threads);
}
};