diff --git a/example/bthread_tag_echo_c++/server.cpp b/example/bthread_tag_echo_c++/server.cpp index 4086c4a003..ed453fc9d5 100644 --- a/example/bthread_tag_echo_c++/server.cpp +++ b/example/bthread_tag_echo_c++/server.cpp @@ -29,6 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server"); DEFINE_int32(tag1, 0, "Server1 tag"); DEFINE_int32(tag2, 1, "Server2 tag"); DEFINE_int32(tag3, 2, "Background task tag"); +DEFINE_int32(num_threads1, 4, "Number of threads of server1"); +DEFINE_int32(num_threads2, 4, "Number of threads of server2"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); @@ -102,6 +104,7 @@ int main(int argc, char* argv[]) { options1.max_concurrency = FLAGS_max_concurrency; options1.internal_port = FLAGS_internal_port1; options1.bthread_tag = FLAGS_tag1; + options1.num_threads = FLAGS_num_threads1; if (server1.Start(FLAGS_port1, &options1) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; @@ -127,6 +130,7 @@ int main(int argc, char* argv[]) { options2.max_concurrency = FLAGS_max_concurrency; options2.internal_port = FLAGS_internal_port2; options2.bthread_tag = FLAGS_tag2; + options2.num_threads = FLAGS_num_threads2; if (server2.Start(FLAGS_port2, &options2) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 9d938c6bc9..0b3316cbe6 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -149,7 +149,7 @@ ServerOptions::ServerOptions() , health_reporter(NULL) , rtmp_service(NULL) , redis_service(NULL) - , bthread_tag(BTHREAD_TAG_DEFAULT) { + , bthread_tag(BTHREAD_TAG_INVALID) { if (s_ncore > 0) { num_threads = s_ncore + 1; } @@ -902,6 +902,17 @@ int Server::StartInternal(const butil::EndPoint& endpoint, _tl_options = ThreadLocalOptions(); } + auto original_bthread_tag = _options.bthread_tag; + if (original_bthread_tag == BTHREAD_TAG_INVALID) { + _options.bthread_tag = BTHREAD_TAG_DEFAULT; + } + if (_options.bthread_tag < BTHREAD_TAG_DEFAULT || + _options.bthread_tag >= FLAGS_task_group_ntags) { + LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is [" + << BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")"; + return -1; + } + if (_options.bthread_init_count != 0 && _options.bthread_init_fn != NULL) { // Create some special bthreads to call the init functions. The @@ -1020,7 +1031,16 @@ int Server::StartInternal(const butil::EndPoint& endpoint, if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) { _options.num_threads = BTHREAD_MIN_CONCURRENCY; } - bthread_setconcurrency(_options.num_threads); + int res; + if (original_bthread_tag == BTHREAD_TAG_INVALID) { + res = bthread_setconcurrency(_options.num_threads); + } else { + res = bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag); + } + if (res != 0) { + LOG(ERROR) << "Invalid set bthread concurrency to " << _options.num_threads; + return -1; + } } for (MethodMap::iterator it = _method_map.begin(); @@ -1085,12 +1105,6 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } _am->_use_rdma = _options.use_rdma; - if (_options.bthread_tag < BTHREAD_TAG_DEFAULT || - _options.bthread_tag >= FLAGS_task_group_ntags) { - LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is [" - << BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")"; - return -1; - } _am->_bthread_tag = _options.bthread_tag; } // Set `_status' to RUNNING before accepting connections diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index bd8c3efdea..8b02ff892b 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -389,18 +389,14 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) { bthread::never_set_bthread_concurrency_by_tag = false; return 0; } + auto c = bthread::get_or_new_task_control(); BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex); - auto c = bthread::get_task_control(); - if (c == NULL) { - bthread::FLAGS_bthread_concurrency_by_tag = 0; - return 0; - } auto ngroup = c->concurrency(); auto tag_ngroup = c->concurrency(tag); auto add = num - tag_ngroup; if (ngroup + add > bthread::FLAGS_bthread_concurrency) { LOG(ERROR) << "Fail to set concurrency by tag " << tag - << ", Whole concurrency larger than bthread_concurrency"; + << ", Total concurrency larger than bthread_concurrency"; return EPERM; } auto added = 0;