Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a simple threadpool #6684

Merged
merged 11 commits into from
Dec 25, 2017

Conversation

Yancey1989
Copy link
Contributor

@Yancey1989 Yancey1989 commented Dec 18, 2017

fixed #6683

Maybe the simple threadpool would be used in parallel_do Op, such as:

ParallelDo::Run(...) {
  BlockingCounter bc(4);  // means parallel execute the block with 4 threads
  tp = ThreadPoool::Instance();
  for (int i = 0; i < 4; ++i){
    tp.Run([](){
      executor::Run(block);
      bc.Desc();
    })
  }
  bc.Wait(); // wait for the blocking counter become 0.
}

Copy link
Collaborator

@reyoung reyoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a simple thread pool is cool.
Maybe @tonyyang-svail need to pay attention to this PR and it may help to implement the Parallel.for operator

@@ -58,3 +58,5 @@ cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry
proto_desc)
cc_library(selected_rows SRCS selected_rows.cc DEPS tensor)
cc_test(selected_rows_test SRCS selected_rows_test.cc DEPS selected_rows)

cc_test(threadpool_test SRCS threadpool_test.cc )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space before trailing ) seems not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

condition_.notify_one();
}

// wait unitle all the function are completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo "unitle".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

std::queue<Func> tasks_;
std::vector<std::unique_ptr<std::thread>> threads_;
std::mutex mutex_;
std::condition_variable condition_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condition_ doesn't mean anything, should have a proper name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condition_variable must use with a mutex as a pair, mutex_ is used every where, and don't know which variable it's protecting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condition_ doesn't mean anything, should have a proper name

condition_variable must use with a mutex as a pair, mutex_ is used every where, and don't know which variable it's protecting

the mutex just protects the task queue, maybe we only need one mutex for now?

@typhoonzero
Copy link
Contributor

@reyoung @tonyyang-svail The ThreadPool is also needed for multi-GPU, each GPU thread needs a corresponding CPU thread to launch its CUDA kernel.

@gongweibao
Copy link
Contributor

gongweibao commented Dec 18, 2017

I think also:

I think a simple thread pool is cool

Do we need to investigate BRPC threading pool?It's a similar implement with GO scheduler:
https://github.com/brpc/brpc/blob/master/docs/cn/threading_overview.md#%E5%A4%9A%E7%BA%BF%E7%A8%8Breactor
https://github.com/brpc/brpc/blob/master/docs/cn/bthread.md
http://morsmachine.dk/go-scheduler


// push a function to the queue, and will be scheduled and
// executed if a thread is available
void Start(const Func& fn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start=>Run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* If num_threads <= 0, the thread pool wil be initilized
* with the number of concurrent threads supported.
*/
static ThreadPool* Instance(size_t num_threads) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just bake in num_threads (not as argument, std::thread::hardware_concurrency is probably a good choice. But it could return 0, so need to check for 0), otherwise the first time Instance(10) is called, latter Instance(20) is called, but returns one threadpool with 10 threads rather than 20.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user may specify the max CPU cores which will be used, such as GOMAXPROCS in Go, but maybe I can remove the argument num_threads and support MAXPROCS for future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
static ThreadPool* Instance(size_t num_threads) {
static std::unique_ptr<ThreadPool> threadpool;
if (threadpool.get() == nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe here is a race condition (as well as GetNumThreads and GetAvailable). We can either use a mutex to protect it, or do not do lazy initialization. If it's initialized when the program starts, it will happen before all concurrent access, thus thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The num_threads_ will be initialized for the first call GetInstance(), and never changed, so maybe it's not a race condition, and I add a lock in functionGetAvailabel(), thanks :)

Copy link
Contributor

@helinwang helinwang Dec 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if Instance is not called before, and called concurrently, it will be a race condition. Maybe try this: https://stackoverflow.com/a/19907903/852385

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think call_once is a good choice, and use a enhancement version in https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/platform/call_once.h

std::unique_lock<std::mutex> lock(mutex_);
++available_;
if (tasks_.empty() && available_ == num_threads_) {
done_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you think about removing done_ and create a function done() which returns tasks_.empty() && available_ == num_threads_?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


TEST(ThreadPool, Start) {
framework::ThreadPool* pool = framework::ThreadPool::GetInstance();
std::map<int, bool> dict;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dict is not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

std::map<int, bool> dict;
int sum = 0;
for (int i = 0; i < 10; ++i) {
pool->Run([&sum]() { sum++; });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure ++ operator is atomic, please checkout http://www.cplusplus.com/reference/atomic/atomic/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done. And enhance the unit test.

TEST(ThreadPool, ConcurrentStart) {
std::atomic<int> sum(0);
int cnt1 = 10, cnt2 = 20;
std::thread t1(do_sum, pool, std::ref(sum), 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should expect not to use std::thread in other places. Call pool->Run directly should do the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we could call the Run interface like do_sum function defined above: https://github.com/PaddlePaddle/Paddle/pull/6684/files#diff-196b8642754d3c84ae65f78bb0e83dfdR25 , and this unit test is just for testing multi-threading call.


TEST(ThreadPool, ConcurrentStart) {
std::atomic<int> sum(0);
int cnt1 = 10, cnt2 = 20;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call GetInstance before using the pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

typhoonzero
typhoonzero previously approved these changes Dec 22, 2017
Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++

framework::ThreadPool* pool = framework::ThreadPool::GetInstance();
std::atomic<int> sum(0);
int cnt1 = 10, cnt2 = 20;
std::thread t1(do_sum, pool, std::ref(sum), 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change 10, 20 to cnt1, cnt2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

framework::ThreadPool* pool = framework::ThreadPool::GetInstance();
std::atomic<int> sum(0);
int cnt1 = 10, cnt2 = 20;
std::thread t1(do_sum, pool, std::ref(sum), 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create more thread (like 50) can expose more problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++

@Yancey1989 Yancey1989 merged commit 127bc2e into PaddlePaddle:develop Dec 25, 2017
@Yancey1989 Yancey1989 deleted the simple_threadpool branch December 25, 2017 03:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement simple threadpool
5 participants