From b46794897414bd0e5aed8f91cb9c64ad2792f0f8 Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Tue, 30 Apr 2024 22:08:15 +0800 Subject: [PATCH] bthread set concurrency by tag --- docs/cn/bthread_tagged_task_group.md | 7 +++++- example/bthread_tag_echo_c++/server.cpp | 11 +++++++-- src/brpc/server.cpp | 30 ++++++++++++++++++------- src/bthread/bthread.cpp | 8 ++----- 4 files changed, 39 insertions(+), 17 deletions(-) diff --git a/docs/cn/bthread_tagged_task_group.md b/docs/cn/bthread_tagged_task_group.md index 46e88c0878..bfdafd76e7 100644 --- a/docs/cn/bthread_tagged_task_group.md +++ b/docs/cn/bthread_tagged_task_group.md @@ -22,18 +22,23 @@ FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_mi 一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。 Q:如何动态改变分组线程的数量? -A:当前每个分组的线程数最少为4个,所以先设置FLAGS_bthread_concurrency=4*分组数,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。 + +A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。 Q:不同分组之间有什么关系吗? + A:不同分组是独立的线程池和事件驱动器,完全没有关系。 Q:可以在分组之间做bthread的同步操作吗? + A:可以的,每个bthread都有自己的tag标签,挂起后重新投入运行将继续在这个tag的线程池上执行。 Q:客户端发送和接收RPC消息是在哪个分组上执行的? + A:这取决于客户端的上下文,如果客户端不在任何tag分组上,那将使用tag0分组收发消息;否则将在当前所在的tag分组收发消息。 Q:如何将一个分组的线程绑定到指定的一些cpu上面。 + A:int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t))这个函数用于在某个分组上做一些初始化的工作,比如:可以实现绑核的代码,根据tag入参来确定不同分组绑定不同的cpu。 # 监控 diff --git a/example/bthread_tag_echo_c++/server.cpp b/example/bthread_tag_echo_c++/server.cpp index 4086c4a003..961de0a81d 100644 --- a/example/bthread_tag_echo_c++/server.cpp +++ b/example/bthread_tag_echo_c++/server.cpp @@ -29,6 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server"); DEFINE_int32(tag1, 0, "Server1 tag"); DEFINE_int32(tag2, 1, "Server2 tag"); DEFINE_int32(tag3, 2, "Background task tag"); +DEFINE_int32(num_threads1, 4, "Number of threads of server1"); +DEFINE_int32(num_threads2, 4, "Number of threads of server2"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); @@ -63,8 +65,11 @@ static void my_tagged_worker_start_fn(bthread_tag_t tag) { } static void* my_background_task(void*) { - LOG(INFO) << "run background task tag=" << bthread_self_tag(); - bthread_usleep(1000000UL); + while (true) { + LOG(INFO) << "run background task tag=" << bthread_self_tag(); + bthread_usleep(1000000UL); + } + return nullptr; } int main(int argc, char* argv[]) { @@ -102,6 +107,7 @@ int main(int argc, char* argv[]) { options1.max_concurrency = FLAGS_max_concurrency; options1.internal_port = FLAGS_internal_port1; options1.bthread_tag = FLAGS_tag1; + options1.num_threads = FLAGS_num_threads1; if (server1.Start(FLAGS_port1, &options1) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; @@ -127,6 +133,7 @@ int main(int argc, char* argv[]) { options2.max_concurrency = FLAGS_max_concurrency; options2.internal_port = FLAGS_internal_port2; options2.bthread_tag = FLAGS_tag2; + options2.num_threads = FLAGS_num_threads2; if (server2.Start(FLAGS_port2, &options2) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 3ab7562e17..84d7a83231 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -149,7 +149,7 @@ ServerOptions::ServerOptions() , health_reporter(NULL) , rtmp_service(NULL) , redis_service(NULL) - , bthread_tag(BTHREAD_TAG_DEFAULT) { + , bthread_tag(BTHREAD_TAG_INVALID) { if (s_ncore > 0) { num_threads = s_ncore + 1; } @@ -902,6 +902,17 @@ int Server::StartInternal(const butil::EndPoint& endpoint, _tl_options = ThreadLocalOptions(); } + auto original_bthread_tag = _options.bthread_tag; + if (original_bthread_tag == BTHREAD_TAG_INVALID) { + _options.bthread_tag = BTHREAD_TAG_DEFAULT; + } + if (_options.bthread_tag < BTHREAD_TAG_DEFAULT || + _options.bthread_tag >= FLAGS_task_group_ntags) { + LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is [" + << BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")"; + return -1; + } + if (_options.bthread_init_count != 0 && _options.bthread_init_fn != NULL) { // Create some special bthreads to call the init functions. The @@ -1020,7 +1031,16 @@ int Server::StartInternal(const butil::EndPoint& endpoint, if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) { _options.num_threads = BTHREAD_MIN_CONCURRENCY; } - bthread_setconcurrency(_options.num_threads); + int res; + if (original_bthread_tag == BTHREAD_TAG_INVALID) { + res = bthread_setconcurrency(_options.num_threads); + } else { + res = bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag); + } + if (res != 0) { + LOG(ERROR) << "Invalid set bthread concurrency to " << _options.num_threads; + return -1; + } } for (MethodMap::iterator it = _method_map.begin(); @@ -1085,12 +1105,6 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } _am->_use_rdma = _options.use_rdma; - if (_options.bthread_tag < BTHREAD_TAG_DEFAULT || - _options.bthread_tag >= FLAGS_task_group_ntags) { - LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is [" - << BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")"; - return -1; - } _am->_bthread_tag = _options.bthread_tag; } // Set `_status' to RUNNING before accepting connections diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index f784a658e9..14954e4ba6 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -392,18 +392,14 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) { if (tag < BTHREAD_TAG_DEFAULT || tag >= FLAGS_task_group_ntags) { return EPERM; } + auto c = bthread::get_or_new_task_control(); BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex); - auto c = bthread::get_task_control(); - if (c == NULL) { - bthread::FLAGS_bthread_concurrency_by_tag = 0; - return 0; - } auto ngroup = c->concurrency(); auto tag_ngroup = c->concurrency(tag); auto add = num - tag_ngroup; if (ngroup + add > bthread::FLAGS_bthread_concurrency) { LOG(ERROR) << "Fail to set concurrency by tag " << tag - << ", Whole concurrency larger than bthread_concurrency"; + << ", Total concurrency larger than bthread_concurrency"; return EPERM; } auto added = 0;