-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathThreadPool.h
198 lines (167 loc) · 5.23 KB
/
ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#ifndef FATE_THREAD_POOL_H
#define FATE_THREAD_POOL_H
#include "FunctionTraits.h"
#include <vector>
#include <deque>
#include <memory>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
namespace Fate {
/* 自旋锁 */
/*class SpinLock {
public:
inline void lock() {
while (m_flag.test_and_set()) {
std::this_thread::yield();
}
}
inline void unlock() { m_flag.clear(); }
private:
std::atomic_flag m_flag = ATOMIC_FLAG_INIT;
};*/
/* 线程池 */
class ThreadPool {
public:
/* 析构 */
~ThreadPool();
/* 线程池主备类型 */
enum class ThreadPoolType {
MASTER = 0,
SLAVE
};
/* 线程状态 */
enum class ThreadState {
BLOCKING = 0,
READY,
RUNNING
};
private:
/* 当threads小于等于0时, threads为cpu + 1*/
ThreadPool(int threads = 0, ThreadPoolType type = ThreadPoolType::MASTER);
/* 禁止拷贝、赋值*/
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
/* 构建一个新的线程池 */
static std::shared_ptr<ThreadPool> create(int threads = 0, ThreadPoolType type = ThreadPoolType::MASTER);
private:
/* 备用线程数量 */
static int s_slaveThreadNum;
/* 线程状态相关同步变量 */
static int s_states;
static std::mutex s_stateMutex;
static std::condition_variable s_stateCondition;
/* 任务队列相关同步变量 */
static std::deque<std::function<void()> > s_tasks;
static std::mutex s_taskMutex;
static std::condition_variable s_taskCondition;
/* 强行停止标识符 */
static bool s_stop;
public:
/* 公共线程池 */
static std::shared_ptr<ThreadPool> instance(int threads = 0);
/* 当前线程是否属于线程池 */
static bool isWorkerThread();
/* 获取当前线程状态 */
static ThreadPool::ThreadState getCurrentThreadState();
/* 设置当前线程状态 */
static void setCurrentThreadState(ThreadPool::ThreadState state);
/* 添加任务 */
template<class F, class... Args, typename R = typename std::decay<typename function_traits<F>::return_type>::type>
std::future<R> enqueue(F&& f, Args&&... args);
/* 运行一个线程 */
template<class F, class... Args, typename R = typename std::decay<typename function_traits<F>::return_type>::type>
static std::future<R> runWithPool(std::shared_ptr<ThreadPool> pool, F&& f, Args&&... args);
template<class F, class... Args, typename R = typename std::decay<typename function_traits<F>::return_type>::type>
static std::future<R> run(F&& f, Args&&... args);
private:
/* 线程队列(消费者队列)*/
std::vector<std::thread> m_workers;
/* 主备标识 */
ThreadPoolType m_type;
/* 停止标识 */
bool m_stop;
/* 备用线程池 */
std::shared_ptr<ThreadPool> m_slaveThreadPool;
};
/* 添加任务 */
template<class F, class... Args, typename R>
std::future<R> ThreadPool::enqueue(F&& f, Args&&... args)
{
auto task = std::make_shared<std::packaged_task<R()> >(
std::bind(std::forward<F>(f), std::ref(std::forward<Args>(args))...)
);
std::future<R> ret = task->get_future();
/* 如果是从线程池中线程发起的任务,设置为BLOCKING状态,从备用线程中挑选一个开始运行 */
std::shared_ptr<ThreadPool> needDeletePool;
if (ThreadPool::isWorkerThread() && (ThreadPool::getCurrentThreadState() != ThreadState::BLOCKING))
{
ThreadPool::setCurrentThreadState(ThreadState::BLOCKING);
std::unique_lock<std::mutex> lock(s_stateMutex);
/* 如果备用线程不足,生成新的备用线程池 */
if (s_slaveThreadNum <= 0)
{
if (m_slaveThreadPool)
{
/* 旧线程池释放任务放进线程池,这是个BLOCKING任务 */
needDeletePool = m_slaveThreadPool;
++s_states;
--s_slaveThreadNum;
s_stateCondition.notify_one();
}
m_slaveThreadPool = ThreadPool::create(m_workers.size(), ThreadPoolType::SLAVE);
}
/* 当前线程转换为BLOCKING状态后,通知备用线程改变状态 */
++s_states;
--s_slaveThreadNum;
s_stateCondition.notify_one();
}
{
/* 将任务放进任务队列 */
std::unique_lock<std::mutex> lock(s_taskMutex);
if (ThreadPool::isWorkerThread())
s_tasks.emplace_front([task] { (*task)(); });
else
s_tasks.emplace_back([task] { (*task)(); });
s_taskCondition.notify_one();
/* 检查旧线程池是否需要释放 */
if (needDeletePool)
{
auto deleteTask = [needDeletePool] {
ThreadPool::setCurrentThreadState(ThreadState::BLOCKING);
((std::shared_ptr<ThreadPool>)needDeletePool).reset();
};
s_tasks.emplace_back(deleteTask);
s_taskCondition.notify_one();
}
}
return ret;
}
/* 运行一个线程 */
template<class F, class... Args, typename R>
static std::future<R> ThreadPool::runWithPool(std::shared_ptr<ThreadPool> pool, F&& f, Args&&... args)
{
std::shared_ptr<ThreadPool> ins = pool;
if (!ins)
ins = ThreadPool::instance();
return ins->enqueue(std::forward<F>(f), std::forward<Args>(args)...);
}
template<class F, class... Args, typename R>
static std::future<R> ThreadPool::run(F&& f, Args&&... args)
{
std::shared_ptr<ThreadPool> ins = ThreadPool::instance();
return ThreadPool::runWithPool(ins, std::forward<F>(f), std::forward<Args>(args)...);
}
/* 线程池内线程的TLS数据 */
class ThreadPoolTLS {
public:
/* 当前线程属于线程池标识 */
thread_local static bool t_threadPoolFlag;
/* 当前线程状态 */
thread_local static ThreadPool::ThreadState t_threadState;
};
}
#endif // FATE_THREAD_POOL_H