-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_pool.cpp
139 lines (119 loc) · 3.17 KB
/
thread_pool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <iostream>
#include <thread>
#include <array>
#include <functional>
#include <vector>
#include <random>
#include <chrono>
#include <atomic>
using func_type = std::function<int(int,int)>;
using args_ary = std::array<int,2>;
struct ThreadPool{
bool enqueue(func_type fn, args_ary args){
size_t current_head_idx = head_idx.load(std::memory_order_acquire);
size_t next_idx = (current_head_idx + 1) % max_size;
Work w{fn, args};
work[next_idx] = w;
head_idx.store(next_idx);
return true;
}
bool process(int remainder, int total_threads){
size_t next_idx = (tail_idx + 1) % max_size;
Work w = work[next_idx];
w.set_processed();
if(!w.invalid()){
int output = w.process();
std::cout << output << '\n';
}
tail_idx.store(next_idx);
return true;
}
private:
struct Work{
private:
func_type _work;
args_ary _args;
bool is_processed = false;
public:
Work():is_processed(true){};
Work(func_type work, args_ary args):_work{work},_args{args},is_processed(false){};
bool processed(){
return is_processed;
}
int process(){
return _work(_args[0], _args[1]);
}
bool invalid(){
return _work == nullptr;
}
void set_processed(){
is_processed = true;
}
};
static const size_t max_size = 1000;
std::array<Work,max_size> work;
std::atomic<size_t> head_idx = 0, tail_idx = 0;
};
int random_num(){
std::mt19937 rng;
rng.seed(std::random_device()());
std::uniform_int_distribution<std::mt19937::result_type> dist(1,10000); // distribution in range [1, 6]
return dist(rng);
}
int main(){
ThreadPool tp;
auto adder = [](int a, int b){ std::cout << "Adding \t\t" << a << '\t'<< b << '\t'; return a+b;};
auto subtractor = [](int a, int b){ std::cout << "Subtracting \t" << a << '\t'<< b << '\t'; return a-b;};
auto multiplicator = [](int a, int b){std::cout << "Multiplying \t" << a << '\t'<< b << '\t'; return a*b;};
std::thread producer1([&](){
while(true){
tp.enqueue(adder, args_ary{random_num(),random_num()});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
std::thread producer2([&](){
while(true){
tp.enqueue(subtractor, args_ary{random_num(),random_num()});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
std::thread producer3([&](){
while(true){
tp.enqueue(multiplicator, args_ary{random_num(),random_num()});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
std::thread consumer1([&](){
while(true){
tp.process(0, 3);
}
});
std::thread consumer2([&](){
while(true){
tp.process(1, 3);
}
});
std::thread consumer3([&](){
while(true){
tp.process(2, 3);
}
});
if(producer1.joinable()){
producer1.join();
}
if(producer2.joinable()){
producer2.join();
}
if(producer3.joinable()){
producer3.join();
}
if(consumer1.joinable()){
consumer1.join();
}
if(consumer2.joinable()){
consumer2.join();
}
if(consumer3.joinable()){
consumer3.join();
}
}