-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadPoolExecutor.h
111 lines (84 loc) · 3.1 KB
/
ThreadPoolExecutor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#ifndef CPPEXECUTOR_THREADPOOLEXECUTOR_H
#define CPPEXECUTOR_THREADPOOLEXECUTOR_H
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <memory>
#include "BlockingQueue.h"
#if _WIN32
#include <Windows.h>
#endif
class RejectedExecutionException : public std::runtime_error {
public:
explicit RejectedExecutionException(const std::string&);
};
class ThreadPoolExecutor;
class RejectedExecutionHandler {
public:
RejectedExecutionHandler() = default;
virtual void rejectedExecution(const std::function<void()>& func, ThreadPoolExecutor* e) = 0;
virtual ~RejectedExecutionHandler() = default;
};
class ThreadPoolExecutor {
friend class ThreadPoolTest;
private:
const int corePoolSize, maximumPoolSize;
const long keepAliveTime;
std::unique_ptr<BlockingQueue<std::function<void()> > > workQueue;
std::unique_ptr<RejectedExecutionHandler> rejectHandler;
std::atomic<int> thread_cnt;
int finished_cnt;
std::condition_variable complete_condition;
std::mutex finish_mutex;
std::vector<std::thread> threads_;
std::mutex thread_lock;
volatile bool stop_;
const bool enableWaitComplete;
public:
class AbortPolicy : public RejectedExecutionHandler {
public:
void rejectedExecution(const std::function<void()>& func, ThreadPoolExecutor* e) override;
};
class DiscardPolicy : public RejectedExecutionHandler {
public:
void rejectedExecution(const std::function<void()>& func, ThreadPoolExecutor* e) override;
};
class DiscardOldestPolicy : public RejectedExecutionHandler {
public:
void rejectedExecution(const std::function<void()>& func, ThreadPoolExecutor* e) override;
};
class CallerRunsPolicy : public RejectedExecutionHandler {
public:
void rejectedExecution(const std::function<void()>& func, ThreadPoolExecutor* e) override;
};
private:
std::function<void()> createCoreThread(const std::function<void()>& firstTask);
std::function<void()> createTempThread(const std::function<void()>& firstTask);
void enqueue(const std::function<void()>& task);
bool addWorker(bool core, const std::function<void()>& firstTask = nullptr);
void reject(const std::function<void()>& task);
void increaseFinishedCount();
public:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
std::unique_ptr<BlockingQueue<std::function<void()> > > workQueue,
std::unique_ptr<RejectedExecutionHandler> rejectHandler,
bool enableWaitComplete = false);
~ThreadPoolExecutor();
template<class F, class... Args>
void execute(F&& f, Args&& ... args) {
std::function<void()> cfunc = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
enqueue(cfunc);
}
bool isShutdown() const;
void waitForTaskComplete(int task_cnt);
void resetFinishedCount() { finished_cnt = 0; }
#if _WIN32
bool insidePool(DWORD);
#endif
};
#endif //CPPEXECUTOR_THREADPOOLEXECUTOR_H