-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement a simple threadpool #6684
Implement a simple threadpool #6684
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a simple thread pool is cool.
Maybe @tonyyang-svail need to pay attention to this PR and it may help to implement the Parallel.for
operator
paddle/framework/CMakeLists.txt
Outdated
@@ -58,3 +58,5 @@ cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry | |||
proto_desc) | |||
cc_library(selected_rows SRCS selected_rows.cc DEPS tensor) | |||
cc_test(selected_rows_test SRCS selected_rows_test.cc DEPS selected_rows) | |||
|
|||
cc_test(threadpool_test SRCS threadpool_test.cc ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The space before trailing )
seems not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool.h
Outdated
condition_.notify_one(); | ||
} | ||
|
||
// wait unitle all the function are completed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo "unitle".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool.h
Outdated
std::queue<Func> tasks_; | ||
std::vector<std::unique_ptr<std::thread>> threads_; | ||
std::mutex mutex_; | ||
std::condition_variable condition_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition_
doesn't mean anything, should have a proper name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition_variable
must use with a mutex as a pair, mutex_
is used every where, and don't know which variable it's protecting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition_ doesn't mean anything, should have a proper name
condition_variable must use with a mutex as a pair, mutex_ is used every where, and don't know which variable it's protecting
the mutex just protects the task queue, maybe we only need one mutex for now?
@reyoung @tonyyang-svail The ThreadPool is also needed for multi-GPU, each GPU thread needs a corresponding CPU thread to launch its CUDA kernel. |
I think also:
Do we need to investigate BRPC threading pool?It's a similar implement with GO scheduler: |
paddle/framework/threadpool.h
Outdated
|
||
// push a function to the queue, and will be scheduled and | ||
// executed if a thread is available | ||
void Start(const Func& fn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start=>Run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool.h
Outdated
* If num_threads <= 0, the thread pool wil be initilized | ||
* with the number of concurrent threads supported. | ||
*/ | ||
static ThreadPool* Instance(size_t num_threads) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just bake in num_threads
(not as argument, std::thread::hardware_concurrency
is probably a good choice. But it could return 0, so need to check for 0), otherwise the first time Instance(10)
is called, latter Instance(20)
is called, but returns one threadpool with 10 threads rather than 20.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user may specify the max CPU cores which will be used, such as GOMAXPROCS in Go, but maybe I can remove the argument num_threads
and support MAXPROCS
for future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool.h
Outdated
*/ | ||
static ThreadPool* Instance(size_t num_threads) { | ||
static std::unique_ptr<ThreadPool> threadpool; | ||
if (threadpool.get() == nullptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe here is a race condition (as well as GetNumThreads and GetAvailable). We can either use a mutex to protect it, or do not do lazy initialization. If it's initialized when the program starts, it will happen before all concurrent access, thus thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The num_threads_
will be initialized for the first call GetInstance()
, and never changed, so maybe it's not a race condition, and I add a lock in functionGetAvailabel()
, thanks :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if Instance
is not called before, and called concurrently, it will be a race condition. Maybe try this: https://stackoverflow.com/a/19907903/852385
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think call_once
is a good choice, and use a enhancement version in https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/platform/call_once.h
paddle/framework/threadpool.h
Outdated
std::unique_lock<std::mutex> lock(mutex_); | ||
++available_; | ||
if (tasks_.empty() && available_ == num_threads_) { | ||
done_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you think about removing done_
and create a function done()
which returns tasks_.empty() && available_ == num_threads_
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool_test.cc
Outdated
|
||
TEST(ThreadPool, Start) { | ||
framework::ThreadPool* pool = framework::ThreadPool::GetInstance(); | ||
std::map<int, bool> dict; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dict
is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool_test.cc
Outdated
std::map<int, bool> dict; | ||
int sum = 0; | ||
for (int i = 0; i < 10; ++i) { | ||
pool->Run([&sum]() { sum++; }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure ++
operator is atomic, please checkout http://www.cplusplus.com/reference/atomic/atomic/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done. And enhance the unit test.
paddle/framework/threadpool_test.cc
Outdated
TEST(ThreadPool, ConcurrentStart) { | ||
std::atomic<int> sum(0); | ||
int cnt1 = 10, cnt2 = 20; | ||
std::thread t1(do_sum, pool, std::ref(sum), 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should expect not to use std::thread
in other places. Call pool->Run directly should do the job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we could call the Run
interface like do_sum
function defined above: https://github.com/PaddlePaddle/Paddle/pull/6684/files#diff-196b8642754d3c84ae65f78bb0e83dfdR25 , and this unit test is just for testing multi-threading call.
paddle/framework/threadpool_test.cc
Outdated
|
||
TEST(ThreadPool, ConcurrentStart) { | ||
std::atomic<int> sum(0); | ||
int cnt1 = 10, cnt2 = 20; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call GetInstance
before using the pool
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++
paddle/framework/threadpool_test.cc
Outdated
framework::ThreadPool* pool = framework::ThreadPool::GetInstance(); | ||
std::atomic<int> sum(0); | ||
int cnt1 = 10, cnt2 = 20; | ||
std::thread t1(do_sum, pool, std::ref(sum), 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change 10, 20 to cnt1, cnt2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
paddle/framework/threadpool_test.cc
Outdated
framework::ThreadPool* pool = framework::ThreadPool::GetInstance(); | ||
std::atomic<int> sum(0); | ||
int cnt1 = 10, cnt2 = 20; | ||
std::thread t1(do_sum, pool, std::ref(sum), 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create more thread (like 50) can expose more problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++
fixed #6683
Maybe the simple threadpool would be used in parallel_do Op, such as: