Skip to content

Commit

Permalink
bthread set concurrency by tag
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed Apr 30, 2024
1 parent bbd88a8 commit 3061223
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
4 changes: 4 additions & 0 deletions example/bthread_tag_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
DEFINE_int32(tag1, 0, "Server1 tag");
DEFINE_int32(tag2, 1, "Server2 tag");
DEFINE_int32(tag3, 2, "Background task tag");
DEFINE_int32(num_threads1, 4, "Number of threads of server1");
DEFINE_int32(num_threads2, 4, "Number of threads of server2");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
Expand Down Expand Up @@ -102,6 +104,7 @@ int main(int argc, char* argv[]) {
options1.max_concurrency = FLAGS_max_concurrency;
options1.internal_port = FLAGS_internal_port1;
options1.bthread_tag = FLAGS_tag1;
options1.num_threads = FLAGS_num_threads1;
if (server1.Start(FLAGS_port1, &options1) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand All @@ -127,6 +130,7 @@ int main(int argc, char* argv[]) {
options2.max_concurrency = FLAGS_max_concurrency;
options2.internal_port = FLAGS_internal_port2;
options2.bthread_tag = FLAGS_tag2;
options2.num_threads = FLAGS_num_threads2;
if (server2.Start(FLAGS_port2, &options2) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand Down
30 changes: 22 additions & 8 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ServerOptions::ServerOptions()
, health_reporter(NULL)
, rtmp_service(NULL)
, redis_service(NULL)
, bthread_tag(BTHREAD_TAG_DEFAULT) {
, bthread_tag(BTHREAD_TAG_INVALID) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
Expand Down Expand Up @@ -902,6 +902,17 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
_tl_options = ThreadLocalOptions();
}

auto original_bthread_tag = _options.bthread_tag;
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
_options.bthread_tag = BTHREAD_TAG_DEFAULT;
}
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}

if (_options.bthread_init_count != 0 &&
_options.bthread_init_fn != NULL) {
// Create some special bthreads to call the init functions. The
Expand Down Expand Up @@ -1020,7 +1031,16 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
bthread_setconcurrency(_options.num_threads);
int res;
if (original_bthread_tag == BTHREAD_TAG_INVALID) {
res = bthread_setconcurrency(_options.num_threads);
} else {
res = bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}
if (res != 0) {
LOG(ERROR) << "Invalid set bthread concurrency to " << _options.num_threads;
return -1;
}
}

for (MethodMap::iterator it = _method_map.begin();
Expand Down Expand Up @@ -1085,12 +1105,6 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
return -1;
}
_am->_use_rdma = _options.use_rdma;
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}
_am->_bthread_tag = _options.bthread_tag;
}
// Set `_status' to RUNNING before accepting connections
Expand Down
8 changes: 2 additions & 6 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,18 +389,14 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
bthread::never_set_bthread_concurrency_by_tag = false;
return 0;
}
auto c = bthread::get_or_new_task_control();
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
auto c = bthread::get_task_control();
if (c == NULL) {
bthread::FLAGS_bthread_concurrency_by_tag = 0;
return 0;
}
auto ngroup = c->concurrency();
auto tag_ngroup = c->concurrency(tag);
auto add = num - tag_ngroup;
if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
LOG(ERROR) << "Fail to set concurrency by tag " << tag
<< ", Whole concurrency larger than bthread_concurrency";
<< ", Total concurrency larger than bthread_concurrency";
return EPERM;
}
auto added = 0;
Expand Down

0 comments on commit 3061223

Please sign in to comment.