Skip to content

Commit

Permalink
bthread set concurrency by tag
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed May 1, 2024
1 parent f97cbe5 commit b467948
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
7 changes: 6 additions & 1 deletion docs/cn/bthread_tagged_task_group.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_mi
一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。

Q:如何动态改变分组线程的数量?
A:当前每个分组的线程数最少为4个,所以先设置FLAGS_bthread_concurrency=4*分组数,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。

A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。

Q:不同分组之间有什么关系吗?

A:不同分组是独立的线程池和事件驱动器,完全没有关系。

Q:可以在分组之间做bthread的同步操作吗?

A:可以的,每个bthread都有自己的tag标签,挂起后重新投入运行将继续在这个tag的线程池上执行。

Q:客户端发送和接收RPC消息是在哪个分组上执行的?

A:这取决于客户端的上下文,如果客户端不在任何tag分组上,那将使用tag0分组收发消息;否则将在当前所在的tag分组收发消息。

Q:如何将一个分组的线程绑定到指定的一些cpu上面。

A:int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t))这个函数用于在某个分组上做一些初始化的工作,比如:可以实现绑核的代码,根据tag入参来确定不同分组绑定不同的cpu。

# 监控
Expand Down
11 changes: 9 additions & 2 deletions example/bthread_tag_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
DEFINE_int32(tag1, 0, "Server1 tag");
DEFINE_int32(tag2, 1, "Server2 tag");
DEFINE_int32(tag3, 2, "Background task tag");
DEFINE_int32(num_threads1, 4, "Number of threads of server1");
DEFINE_int32(num_threads2, 4, "Number of threads of server2");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
Expand Down Expand Up @@ -63,8 +65,11 @@ static void my_tagged_worker_start_fn(bthread_tag_t tag) {
}

static void* my_background_task(void*) {
LOG(INFO) << "run background task tag=" << bthread_self_tag();
bthread_usleep(1000000UL);
while (true) {
LOG(INFO) << "run background task tag=" << bthread_self_tag();
bthread_usleep(1000000UL);
}
return nullptr;
}

int main(int argc, char* argv[]) {
Expand Down Expand Up @@ -102,6 +107,7 @@ int main(int argc, char* argv[]) {
options1.max_concurrency = FLAGS_max_concurrency;
options1.internal_port = FLAGS_internal_port1;
options1.bthread_tag = FLAGS_tag1;
options1.num_threads = FLAGS_num_threads1;
if (server1.Start(FLAGS_port1, &options1) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand All @@ -127,6 +133,7 @@ int main(int argc, char* argv[]) {
options2.max_concurrency = FLAGS_max_concurrency;
options2.internal_port = FLAGS_internal_port2;
options2.bthread_tag = FLAGS_tag2;
options2.num_threads = FLAGS_num_threads2;
if (server2.Start(FLAGS_port2, &options2) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand Down
30 changes: 22 additions & 8 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ServerOptions::ServerOptions()
, health_reporter(NULL)
, rtmp_service(NULL)
, redis_service(NULL)
, bthread_tag(BTHREAD_TAG_DEFAULT) {
, bthread_tag(BTHREAD_TAG_INVALID) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
Expand Down Expand Up @@ -902,6 +902,17 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
_tl_options = ThreadLocalOptions();
}

auto original_bthread_tag = _options.bthread_tag;
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
_options.bthread_tag = BTHREAD_TAG_DEFAULT;
}
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}

if (_options.bthread_init_count != 0 &&
_options.bthread_init_fn != NULL) {
// Create some special bthreads to call the init functions. The
Expand Down Expand Up @@ -1020,7 +1031,16 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
bthread_setconcurrency(_options.num_threads);
int res;
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
res = bthread_setconcurrency(_options.num_threads);
} else {
res = bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}
if (res != 0) {
LOG(ERROR) << "Invalid set bthread concurrency to " << _options.num_threads;
return -1;
}
}

for (MethodMap::iterator it = _method_map.begin();
Expand Down Expand Up @@ -1085,12 +1105,6 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
return -1;
}
_am->_use_rdma = _options.use_rdma;
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}
_am->_bthread_tag = _options.bthread_tag;
}
// Set `_status' to RUNNING before accepting connections
Expand Down
8 changes: 2 additions & 6 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,18 +392,14 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
if (tag < BTHREAD_TAG_DEFAULT || tag >= FLAGS_task_group_ntags) {
return EPERM;
}
auto c = bthread::get_or_new_task_control();
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto c = bthread::get_task_control();
if (c == NULL) {
bthread::FLAGS_bthread_concurrency_by_tag = 0;
return 0;
}
auto ngroup = c->concurrency();
auto tag_ngroup = c->concurrency(tag);
auto add = num - tag_ngroup;
if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
LOG(ERROR) << "Fail to set concurrency by tag " << tag
<< ", Whole concurrency larger than bthread_concurrency";
<< ", Total concurrency larger than bthread_concurrency";
return EPERM;
}
auto added = 0;
Expand Down

0 comments on commit b467948

Please sign in to comment.