Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

valgrind: fix leaks of task_worker #181

Merged
merged 4 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/dsn/tool-api/task_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class task_worker : public extensible_object<task_worker, 4>
int _index;
int _native_tid;
std::string _name;
std::thread *_thread;
std::unique_ptr<std::thread> _thread;
bool _is_running;
utils::notify_event _started;
int _processed_task_count;
Expand Down
2 changes: 2 additions & 0 deletions include/dsn/tool-api/timer_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class timer_service
public:
timer_service(service_node *node, timer_service *inner_provider) { _node = node; }

virtual ~timer_service() = default;

virtual void start() = 0;

// after milliseconds, the provider should call task->enqueue()
Expand Down
5 changes: 1 addition & 4 deletions src/core/core/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ task_queue::task_queue(task_worker_pool *pool, int index, task_queue *inner_prov
_spec = (threadpool_spec *)&pool->spec();
}

task_queue::~task_queue()
{
perf_counters::instance().remove_counter(_queue_length_counter->full_name());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这段代码解释下,_queue_length_counter 本身是一个 perf_counter_wrapper,它析构时会调用

    perf_counters::instance().remove_counter(this->full_name());

所以执行顺序是

  1. task_queue 析构
  2. _queue_length_counter 被 removed
  3. _queue_length_counter 析构
  4. _queue_length_counter 又被 removed,这里是第二次 remove,valgrind 报错 invalid read.

}
task_queue::~task_queue() = default;

void task_queue::enqueue_internal(task *task)
{
Expand Down
158 changes: 20 additions & 138 deletions src/core/core/task_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,10 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <dsn/tool-api/task_worker.h>
#include "task_engine.h"
#include <sstream>
#include <errno.h>

#ifdef _WIN32

#else
#include <pthread.h>

#ifdef __FreeBSD__
#include <pthread_np.h>
#endif

#ifdef __APPLE__
#include <mach/thread_policy.h>
#endif
#include <dsn/utility/smart_pointers.h>

#endif
#include "task_engine.h"

namespace dsn {

Expand All @@ -77,7 +53,14 @@ task_worker::task_worker(task_worker_pool *pool,
_processed_task_count = 0;
}

task_worker::~task_worker() { stop(); }
task_worker::~task_worker()
{
if (!_is_running)
return;

// TODO(wutao1): use join, detach is not work with valgrind
_thread->detach();
}

void task_worker::start()
{
Expand All @@ -86,7 +69,7 @@ void task_worker::start()

_is_running = true;

_thread = new std::thread(std::bind(&task_worker::run_internal, this));
_thread = make_unique<std::thread>(std::bind(&task_worker::run_internal, this));

_started.wait();
}
Expand All @@ -99,116 +82,39 @@ void task_worker::stop()
_is_running = false;

_thread->join();
delete _thread;
_thread = nullptr;

_is_running = false;
}

void task_worker::set_name(const char *name)
{
#ifdef _WIN32

#ifndef MS_VC_EXCEPTION
#define MS_VC_EXCEPTION 0x406D1388
#endif

typedef struct tagTHREADNAME_INFO
{
uint32_t dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
uint32_t dwThreadID; // Thread ID (-1=caller thread).
uint32_t dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;

THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = name;
info.dwThreadID = (uint32_t)-1;
info.dwFlags = 0;

__try {
::RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(uint32_t), (ULONG_PTR *)&info);
} __except (EXCEPTION_CONTINUE_EXECUTION) {
}

#else
std::string sname(name);
auto thread_name = sname
#ifdef __linux__
.substr(0, (16 - 1))
#endif
;
auto thread_name = sname.substr(0, (16 - 1));
auto tid = pthread_self();
int err = 0;
#ifdef __FreeBSD__
pthread_set_name_np(tid, thread_name.c_str());
#elif defined(__linux__)
err = pthread_setname_np(tid, thread_name.c_str());
#elif defined(__APPLE__)
err = pthread_setname_np(thread_name.c_str());
#endif
int err = pthread_setname_np(tid, thread_name.c_str());
if (err != 0) {
dwarn("Fail to set pthread name. err = %d", err);
}
#endif
}

void task_worker::set_priority(worker_priority_t pri)
{
#ifndef _WIN32
#ifndef __linux__
static int policy = SCHED_OTHER;
#endif
static int prio_max =
#ifdef __linux__
-20;
#else
sched_get_priority_max(policy);
#endif
static int prio_min =
#ifdef __linux__
19;
#else
sched_get_priority_min(policy);
#endif
static int prio_max = -20;
static int prio_min = 19;
static int prio_middle = ((prio_min + prio_max + 1) / 2);
#endif

static int g_thread_priority_map[] = {
#ifdef _WIN32
THREAD_PRIORITY_LOWEST,
THREAD_PRIORITY_BELOW_NORMAL,
THREAD_PRIORITY_NORMAL,
THREAD_PRIORITY_ABOVE_NORMAL,
THREAD_PRIORITY_HIGHEST
#else
prio_min, (prio_min + prio_middle) / 2, prio_middle, (prio_middle + prio_max) / 2, prio_max
#endif
};
static int g_thread_priority_map[] = {prio_min,
(prio_min + prio_middle) / 2,
prio_middle,
(prio_middle + prio_max) / 2,
prio_max};

static_assert(ARRAYSIZE(g_thread_priority_map) == THREAD_xPRIORITY_COUNT,
"ARRAYSIZE(g_thread_priority_map) != THREAD_xPRIORITY_COUNT");

int prio = g_thread_priority_map[static_cast<int>(pri)];
bool succ = true;
#if !defined(_WIN32) && !defined(__linux__)
struct sched_param param;
memset(&param, 0, sizeof(struct sched_param));
param.sched_priority = prio;
#endif

#ifdef _WIN32
succ = (::SetThreadPriority(::GetCurrentThread(), prio) == TRUE);
#elif defined(__linux__)
if ((nice(prio) == -1) && (errno != 0)) {
succ = false;
}
#else
succ = (pthread_setschedparam(pthread_self(), policy, &param) == 0);
//# error "not implemented"
#endif

if (!succ) {
dwarn("You may need priviledge to set thread priority. errno = %d", errno);
}
Expand All @@ -226,23 +132,6 @@ void task_worker::set_affinity(uint64_t affinity)
}

int err = 0;
#ifdef _WIN32
if (::SetThreadAffinityMask(::GetCurrentThread(), static_cast<DWORD_PTR>(affinity)) == 0) {
err = static_cast<int>(::GetLastError());
}
#elif defined(__APPLE__)
thread_affinity_policy_data_t policy;
policy.affinity_tag = static_cast<integer_t>(affinity);
err = static_cast<int>(thread_policy_set(static_cast<thread_t>(::dsn::utils::get_current_tid()),
THREAD_AFFINITY_POLICY,
(thread_policy_t)&policy,
THREAD_AFFINITY_POLICY_COUNT));
#else
#ifdef __FreeBSD__
#ifndef cpu_set_t
#define cpu_set_t cpuset_t
#endif
#endif
cpu_set_t cpuset;
int nr_bits = std::min(nr_cpu, static_cast<int>(sizeof(affinity) * 8));

Expand All @@ -253,7 +142,6 @@ void task_worker::set_affinity(uint64_t affinity)
}
}
err = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
#endif

if (err != 0) {
dwarn("Fail to set thread affinity. err = %d", err);
Expand Down Expand Up @@ -307,7 +195,6 @@ void task_worker::loop()
task_queue *q = queue();
int best_batch_size = pool_spec().dequeue_batch_size;

// try {
while (_is_running) {
int batch_size = best_batch_size;
task *task = q->dequeue(batch_size), *next;
Expand Down Expand Up @@ -336,11 +223,6 @@ void task_worker::loop()

_processed_task_count += batch_size;
}
/*}
catch (std::exception& ex)
{
dassert (false, "%s: unhandled exception '%s'", name().c_str(), ex.what());
}*/
}

const threadpool_spec &task_worker::pool_spec() const { return pool()->spec(); }
Expand Down
32 changes: 17 additions & 15 deletions src/core/tools/common/simple_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,19 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "simple_task_queue.h"

namespace dsn {
namespace tools {

simple_timer_service::simple_timer_service(service_node *node, timer_service *inner_provider)
: timer_service(node, inner_provider)
{
_worker = nullptr;
}

void simple_timer_service::start()
{
_worker = std::shared_ptr<std::thread>(new std::thread([this]() {
_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);

char buffer[128];
Expand All @@ -55,8 +46,18 @@ void simple_timer_service::start()
task_worker::set_priority(worker_priority_t::THREAD_xPRIORITY_ABOVE_NORMAL);

boost::asio::io_service::work work(_ios);
_ios.run();
}));
boost::system::error_code ec;
_ios.run(ec);
if (ec) {
dassert(false, "io_service in simple_timer_service run failed: %s", ec.message().data());
}
});
}

simple_timer_service::~simple_timer_service()
{
_ios.stop();
_worker.join();
}

void simple_timer_service::add_timer(task *task)
Expand Down Expand Up @@ -93,5 +94,6 @@ task *simple_task_queue::dequeue(/*inout*/ int &batch_size)
batch_size = 1;
return t;
}
}
}

} // namespace tools
} // namespace dsn
20 changes: 8 additions & 12 deletions src/core/tools/common/simple_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool_api.h>
Expand All @@ -46,6 +37,8 @@ class simple_task_queue : public task_queue
public:
simple_task_queue(task_worker_pool *pool, int index, task_queue *inner_provider);

~simple_task_queue() override = default;

virtual void enqueue(task *task) override;
virtual task *dequeue(/*inout*/ int &batch_size) override;

Expand All @@ -59,14 +52,17 @@ class simple_timer_service : public timer_service
public:
simple_timer_service(service_node *node, timer_service *inner_provider);

~simple_timer_service() override;

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) override;

virtual void start() override;

private:
boost::asio::io_service _ios;
std::shared_ptr<std::thread> _worker;
std::thread _worker;
};
}
}

} // namespace tools
} // namespace dsn