diff --git a/include/dsn/tool-api/task_worker.h b/include/dsn/tool-api/task_worker.h index b3392c5230..e4f56c6bf9 100644 --- a/include/dsn/tool-api/task_worker.h +++ b/include/dsn/tool-api/task_worker.h @@ -89,7 +89,7 @@ class task_worker : public extensible_object int _index; int _native_tid; std::string _name; - std::thread *_thread; + std::unique_ptr _thread; bool _is_running; utils::notify_event _started; int _processed_task_count; diff --git a/include/dsn/tool-api/timer_service.h b/include/dsn/tool-api/timer_service.h index 9c7ac52394..47e88ca718 100644 --- a/include/dsn/tool-api/timer_service.h +++ b/include/dsn/tool-api/timer_service.h @@ -61,6 +61,8 @@ class timer_service public: timer_service(service_node *node, timer_service *inner_provider) { _node = node; } + virtual ~timer_service() = default; + virtual void start() = 0; // after milliseconds, the provider should call task->enqueue() diff --git a/src/core/core/task_queue.cpp b/src/core/core/task_queue.cpp index 7692aa332c..400c09641d 100644 --- a/src/core/core/task_queue.cpp +++ b/src/core/core/task_queue.cpp @@ -61,10 +61,7 @@ task_queue::task_queue(task_worker_pool *pool, int index, task_queue *inner_prov _spec = (threadpool_spec *)&pool->spec(); } -task_queue::~task_queue() -{ - perf_counters::instance().remove_counter(_queue_length_counter->full_name()); -} +task_queue::~task_queue() = default; void task_queue::enqueue_internal(task *task) { diff --git a/src/core/core/task_worker.cpp b/src/core/core/task_worker.cpp index c015b8b8dc..62a8674dff 100644 --- a/src/core/core/task_worker.cpp +++ b/src/core/core/task_worker.cpp @@ -24,34 +24,10 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #include -#include "task_engine.h" -#include -#include - -#ifdef _WIN32 - -#else -#include - -#ifdef __FreeBSD__ -#include -#endif - -#ifdef __APPLE__ -#include -#endif +#include -#endif +#include "task_engine.h" namespace dsn { @@ -77,7 +53,14 @@ task_worker::task_worker(task_worker_pool *pool, _processed_task_count = 0; } -task_worker::~task_worker() { stop(); } +task_worker::~task_worker() +{ + if (!_is_running) + return; + + // TODO(wutao1): use join, detach is not work with valgrind + _thread->detach(); +} void task_worker::start() { @@ -86,7 +69,7 @@ void task_worker::start() _is_running = true; - _thread = new std::thread(std::bind(&task_worker::run_internal, this)); + _thread = make_unique(std::bind(&task_worker::run_internal, this)); _started.wait(); } @@ -99,116 +82,39 @@ void task_worker::stop() _is_running = false; _thread->join(); - delete _thread; - _thread = nullptr; - - _is_running = false; } void task_worker::set_name(const char *name) { -#ifdef _WIN32 - -#ifndef MS_VC_EXCEPTION -#define MS_VC_EXCEPTION 0x406D1388 -#endif - - typedef struct tagTHREADNAME_INFO - { - uint32_t dwType; // Must be 0x1000. - LPCSTR szName; // Pointer to name (in user addr space). - uint32_t dwThreadID; // Thread ID (-1=caller thread). - uint32_t dwFlags; // Reserved for future use, must be zero. - } THREADNAME_INFO; - - THREADNAME_INFO info; - info.dwType = 0x1000; - info.szName = name; - info.dwThreadID = (uint32_t)-1; - info.dwFlags = 0; - - __try { - ::RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(uint32_t), (ULONG_PTR *)&info); - } __except (EXCEPTION_CONTINUE_EXECUTION) { - } - -#else std::string sname(name); - auto thread_name = sname -#ifdef __linux__ - .substr(0, (16 - 1)) -#endif - ; + auto thread_name = sname.substr(0, (16 - 1)); auto tid = pthread_self(); - int err = 0; -#ifdef __FreeBSD__ - pthread_set_name_np(tid, thread_name.c_str()); -#elif defined(__linux__) - err = pthread_setname_np(tid, thread_name.c_str()); -#elif defined(__APPLE__) - err = pthread_setname_np(thread_name.c_str()); -#endif + int err = pthread_setname_np(tid, thread_name.c_str()); if (err != 0) { dwarn("Fail to set pthread name. err = %d", err); } -#endif } void task_worker::set_priority(worker_priority_t pri) { -#ifndef _WIN32 -#ifndef __linux__ - static int policy = SCHED_OTHER; -#endif - static int prio_max = -#ifdef __linux__ - -20; -#else - sched_get_priority_max(policy); -#endif - static int prio_min = -#ifdef __linux__ - 19; -#else - sched_get_priority_min(policy); -#endif + static int prio_max = -20; + static int prio_min = 19; static int prio_middle = ((prio_min + prio_max + 1) / 2); -#endif - static int g_thread_priority_map[] = { -#ifdef _WIN32 - THREAD_PRIORITY_LOWEST, - THREAD_PRIORITY_BELOW_NORMAL, - THREAD_PRIORITY_NORMAL, - THREAD_PRIORITY_ABOVE_NORMAL, - THREAD_PRIORITY_HIGHEST -#else - prio_min, (prio_min + prio_middle) / 2, prio_middle, (prio_middle + prio_max) / 2, prio_max -#endif - }; + static int g_thread_priority_map[] = {prio_min, + (prio_min + prio_middle) / 2, + prio_middle, + (prio_middle + prio_max) / 2, + prio_max}; static_assert(ARRAYSIZE(g_thread_priority_map) == THREAD_xPRIORITY_COUNT, "ARRAYSIZE(g_thread_priority_map) != THREAD_xPRIORITY_COUNT"); int prio = g_thread_priority_map[static_cast(pri)]; bool succ = true; -#if !defined(_WIN32) && !defined(__linux__) - struct sched_param param; - memset(¶m, 0, sizeof(struct sched_param)); - param.sched_priority = prio; -#endif - -#ifdef _WIN32 - succ = (::SetThreadPriority(::GetCurrentThread(), prio) == TRUE); -#elif defined(__linux__) if ((nice(prio) == -1) && (errno != 0)) { succ = false; } -#else - succ = (pthread_setschedparam(pthread_self(), policy, ¶m) == 0); -//# error "not implemented" -#endif - if (!succ) { dwarn("You may need priviledge to set thread priority. errno = %d", errno); } @@ -226,23 +132,6 @@ void task_worker::set_affinity(uint64_t affinity) } int err = 0; -#ifdef _WIN32 - if (::SetThreadAffinityMask(::GetCurrentThread(), static_cast(affinity)) == 0) { - err = static_cast(::GetLastError()); - } -#elif defined(__APPLE__) - thread_affinity_policy_data_t policy; - policy.affinity_tag = static_cast(affinity); - err = static_cast(thread_policy_set(static_cast(::dsn::utils::get_current_tid()), - THREAD_AFFINITY_POLICY, - (thread_policy_t)&policy, - THREAD_AFFINITY_POLICY_COUNT)); -#else -#ifdef __FreeBSD__ -#ifndef cpu_set_t -#define cpu_set_t cpuset_t -#endif -#endif cpu_set_t cpuset; int nr_bits = std::min(nr_cpu, static_cast(sizeof(affinity) * 8)); @@ -253,7 +142,6 @@ void task_worker::set_affinity(uint64_t affinity) } } err = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); -#endif if (err != 0) { dwarn("Fail to set thread affinity. err = %d", err); @@ -307,7 +195,6 @@ void task_worker::loop() task_queue *q = queue(); int best_batch_size = pool_spec().dequeue_batch_size; - // try { while (_is_running) { int batch_size = best_batch_size; task *task = q->dequeue(batch_size), *next; @@ -336,11 +223,6 @@ void task_worker::loop() _processed_task_count += batch_size; } - /*} - catch (std::exception& ex) - { - dassert (false, "%s: unhandled exception '%s'", name().c_str(), ex.what()); - }*/ } const threadpool_spec &task_worker::pool_spec() const { return pool()->spec(); } diff --git a/src/core/tools/common/simple_task_queue.cpp b/src/core/tools/common/simple_task_queue.cpp index 0aca67f9de..7412150ea4 100644 --- a/src/core/tools/common/simple_task_queue.cpp +++ b/src/core/tools/common/simple_task_queue.cpp @@ -24,28 +24,19 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #include "simple_task_queue.h" namespace dsn { namespace tools { + simple_timer_service::simple_timer_service(service_node *node, timer_service *inner_provider) : timer_service(node, inner_provider) { - _worker = nullptr; } void simple_timer_service::start() { - _worker = std::shared_ptr(new std::thread([this]() { + _worker = std::thread([this]() { task::set_tls_dsn_context(node(), nullptr); char buffer[128]; @@ -55,8 +46,18 @@ void simple_timer_service::start() task_worker::set_priority(worker_priority_t::THREAD_xPRIORITY_ABOVE_NORMAL); boost::asio::io_service::work work(_ios); - _ios.run(); - })); + boost::system::error_code ec; + _ios.run(ec); + if (ec) { + dassert(false, "io_service in simple_timer_service run failed: %s", ec.message().data()); + } + }); +} + +simple_timer_service::~simple_timer_service() +{ + _ios.stop(); + _worker.join(); } void simple_timer_service::add_timer(task *task) @@ -93,5 +94,6 @@ task *simple_task_queue::dequeue(/*inout*/ int &batch_size) batch_size = 1; return t; } -} -} + +} // namespace tools +} // namespace dsn diff --git a/src/core/tools/common/simple_task_queue.h b/src/core/tools/common/simple_task_queue.h index e3851993af..7a8365b4d8 100644 --- a/src/core/tools/common/simple_task_queue.h +++ b/src/core/tools/common/simple_task_queue.h @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once #include @@ -46,6 +37,8 @@ class simple_task_queue : public task_queue public: simple_task_queue(task_worker_pool *pool, int index, task_queue *inner_provider); + ~simple_task_queue() override = default; + virtual void enqueue(task *task) override; virtual task *dequeue(/*inout*/ int &batch_size) override; @@ -59,6 +52,8 @@ class simple_timer_service : public timer_service public: simple_timer_service(service_node *node, timer_service *inner_provider); + ~simple_timer_service() override; + // after milliseconds, the provider should call task->enqueue() virtual void add_timer(task *task) override; @@ -66,7 +61,8 @@ class simple_timer_service : public timer_service private: boost::asio::io_service _ios; - std::shared_ptr _worker; + std::thread _worker; }; -} -} + +} // namespace tools +} // namespace dsn