-
Notifications
You must be signed in to change notification settings - Fork 6.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Background schedule pool fixed #1722
Changes from 6 commits
b24a4b2
f247967
6629b03
a2dc16a
24de8d6
23aee5e
7d6268c
31874ed
4361df9
5099284
438121e
0aa9b9e
1418e33
0a05769
f811da7
c547c5a
f1b8d4b
1b75fba
32dd455
f824112
1dd5a70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,296 @@ | ||
#include <Common/BackgroundSchedulePool.h> | ||
#include <Common/MemoryTracker.h> | ||
#include <Common/CurrentMetrics.h> | ||
#include <Common/Exception.h> | ||
#include <Common/setThreadName.h> | ||
#include <Common/Stopwatch.h> | ||
#include <common/logger_useful.h> | ||
#include <chrono> | ||
|
||
namespace CurrentMetrics | ||
{ | ||
extern const Metric BackgroundSchedulePoolTask; | ||
extern const Metric MemoryTrackingInBackgroundSchedulePool; | ||
} | ||
|
||
namespace DB | ||
{ | ||
|
||
|
||
// TaskNotification | ||
|
||
class TaskNotification final : public Poco::Notification | ||
{ | ||
public: | ||
explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {} | ||
void execute() { task->execute(); } | ||
|
||
private: | ||
BackgroundSchedulePool::TaskHandle task; | ||
}; | ||
|
||
|
||
// BackgroundSchedulePool::TaskInfo | ||
|
||
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function): | ||
name(name), | ||
pool(pool), | ||
function(function) | ||
{ | ||
} | ||
|
||
|
||
bool BackgroundSchedulePool::TaskInfo::schedule() | ||
{ | ||
std::lock_guard lock(schedule_mutex); | ||
|
||
if (deactivated || scheduled) | ||
return false; | ||
|
||
scheduled = true; | ||
|
||
if(!executing) | ||
{ | ||
if (delayed) | ||
pool.cancelDelayedTask(shared_from_this(), lock); | ||
|
||
pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); | ||
} | ||
|
||
return true; | ||
} | ||
|
||
|
||
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms) | ||
{ | ||
std::lock_guard lock(schedule_mutex); | ||
|
||
if (deactivated || scheduled) | ||
return false; | ||
|
||
pool.scheduleDelayedTask(shared_from_this(), ms, lock); | ||
return true; | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::TaskInfo::deactivate() | ||
{ | ||
std::lock_guard lock_exec(exec_mutex); | ||
std::lock_guard lock_schedule(schedule_mutex); | ||
|
||
if (deactivated) | ||
return; | ||
|
||
deactivated = true; | ||
scheduled = false; | ||
|
||
if (delayed) | ||
pool.cancelDelayedTask(shared_from_this(), lock_schedule); | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::TaskInfo::activate() | ||
{ | ||
std::lock_guard lock(schedule_mutex); | ||
deactivated = false; | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::TaskInfo::execute() | ||
{ | ||
std::lock_guard lock_exec(exec_mutex); | ||
|
||
{ | ||
std::lock_guard lock_schedule(schedule_mutex); | ||
|
||
if (deactivated) | ||
return; | ||
|
||
scheduled = false; | ||
executing = true; | ||
} | ||
|
||
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask}; | ||
|
||
Stopwatch watch; | ||
function(); | ||
UInt64 milliseconds = watch.elapsedMilliseconds(); | ||
|
||
/// If the task is executed longer than specified time, it will be logged. | ||
static const int32_t slow_execution_threshold_ms = 50; | ||
|
||
if (milliseconds >= slow_execution_threshold_ms) | ||
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms."); | ||
|
||
{ | ||
std::lock_guard lock_schedule(schedule_mutex); | ||
|
||
executing = false; | ||
|
||
/// In case was scheduled while executing (including a scheduleAfter which expired) we schedule the task | ||
/// on the queue. We don't call the function again here because this way all tasks | ||
/// will have their chance to execute | ||
|
||
if(scheduled) | ||
pool.queue.enqueueNotification(new TaskNotification(shared_from_this())); | ||
} | ||
|
||
} | ||
|
||
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback() | ||
{ | ||
return [t=shared_from_this()](zkutil::ZooKeeper &, int, int, const char *) mutable { | ||
if (t) | ||
{ | ||
t->schedule(); | ||
t.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events. | ||
} | ||
}; | ||
} | ||
|
||
|
||
// BackgroundSchedulePool | ||
|
||
BackgroundSchedulePool::BackgroundSchedulePool(size_t size) | ||
: size(size) | ||
{ | ||
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads"); | ||
|
||
threads.resize(size); | ||
for (auto & thread : threads) | ||
thread = std::thread([this] { threadFunction(); }); | ||
|
||
delayed_thread = std::thread([this] { delayExecutionThreadFunction(); }); | ||
} | ||
|
||
|
||
BackgroundSchedulePool::~BackgroundSchedulePool() | ||
{ | ||
try | ||
{ | ||
{ | ||
std::unique_lock lock(delayed_tasks_lock); | ||
shutdown = true; | ||
wakeup_cond.notify_all(); | ||
} | ||
|
||
queue.wakeUpAll(); | ||
delayed_thread.join(); | ||
|
||
LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish."); | ||
for (std::thread & thread : threads) | ||
thread.join(); | ||
} | ||
catch (...) | ||
{ | ||
tryLogCurrentException(__PRETTY_FUNCTION__); | ||
} | ||
} | ||
|
||
|
||
BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task) | ||
{ | ||
return std::make_shared<TaskInfo>(*this, name, task); | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::removeTask(const TaskHandle & task) | ||
{ | ||
task->deactivate(); | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> & /* schedule_mutex_lock */) | ||
{ | ||
Poco::Timestamp current_time; | ||
|
||
{ | ||
std::lock_guard lock(delayed_tasks_lock); | ||
|
||
if (task->delayed) | ||
delayed_tasks.erase(task->iterator); | ||
|
||
task->iterator = delayed_tasks.emplace(current_time + (ms * 1000), task); | ||
task->delayed = true; | ||
} | ||
|
||
wakeup_cond.notify_all(); | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> & /* schedule_mutex_lock */) | ||
{ | ||
{ | ||
std::lock_guard lock(delayed_tasks_lock); | ||
delayed_tasks.erase(task->iterator); | ||
task->delayed = false; | ||
} | ||
|
||
wakeup_cond.notify_all(); | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::threadFunction() | ||
{ | ||
setThreadName("BackgrSchedPool"); | ||
|
||
MemoryTracker memory_tracker; | ||
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool); | ||
current_memory_tracker = &memory_tracker; | ||
|
||
while (!shutdown) | ||
{ | ||
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification()) | ||
{ | ||
TaskNotification & task_notification = static_cast<TaskNotification &>(*notification); | ||
task_notification.execute(); | ||
} | ||
} | ||
|
||
current_memory_tracker = nullptr; | ||
} | ||
|
||
|
||
void BackgroundSchedulePool::delayExecutionThreadFunction() | ||
{ | ||
setThreadName("BckSchPoolDelay"); | ||
|
||
while (!shutdown) | ||
{ | ||
TaskHandle task; | ||
|
||
{ | ||
std::unique_lock lock(delayed_tasks_lock); | ||
|
||
if(!shutdown) | ||
break; | ||
|
||
Poco::Timestamp min_time; | ||
|
||
if (!delayed_tasks.empty()) | ||
{ | ||
auto t = delayed_tasks.begin(); | ||
min_time = t->first; | ||
task = t->second; | ||
} | ||
|
||
if (!task) | ||
{ | ||
wakeup_cond.wait(lock); | ||
continue; | ||
} | ||
|
||
Poco::Timestamp current_time; | ||
|
||
if (min_time > current_time) | ||
{ | ||
wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time)); | ||
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This pattern is a bit strange - after the thread is awakened, it will be granted delayed_tasks_lock, then it will release it, only to try and immediately reacquire it on line 230. But probably this doesn't matter for correctness. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right that in this case the lock is released and then reacquired. But in this case we cannot just schedule the task. Maybe was removed from the queue so for this reason we are going again through checks. I need to think if calling active() method on the task is enough or there are also other implications. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we need to go through all the checks after returning from the wait() method (even if for the possibility of a spurious wakeup). My point was that it is not necessary to release the lock and immediately re-acquire it to check the condition - the lock is already acquired after we return from the wait() method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ztlpn I'm afraid is not clear for me on how you want this fixed : I mean how I can go through all the checks while also I'm not releasing the lock which is already acquired. Can you provide some hints ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible solution: while (!shutdown)
{
{
acquire lock;
if (shutdown)
break;
if (no tasks)
{
condition.wait();
continue;
}
}
schedule task;
} Every time we do Now what if we add a second loop: while (!shutdown)
{
{
acquire lock;
while (!shutdown)
{
if (no tasks)
{
condition.wait();
continue;
}
else
break;
}
}
schedule task;
} This way we still hold the lock after continuing and we execute the inner loop until we get a task to schedule. The lock is released only to wait on the condition. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW there is a bug in the exit predicate (line 265) - should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 7d6268c There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You see any issue with this implementation : void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");
while (!shutdown)
{
TaskHandle task;
bool found = false;
{
std::unique_lock lock(delayed_tasks_lock);
while(!shutdown)
{
Poco::Timestamp min_time;
if (!delayed_tasks.empty())
{
auto t = delayed_tasks.begin();
min_time = t->first;
task = t->second;
}
if (!task)
{
wakeup_cond.wait(lock);
continue;
}
Poco::Timestamp current_time;
if (min_time > current_time)
{
wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time));
continue;
}
else
{
//we have a task ready for execution
found = true;
break;
}
}
}
if(found)
task->schedule();
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 0a05769 |
||
} | ||
} | ||
|
||
task->schedule(); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As scheduled flag is false here, we can schedule this task once more from another thread. If a thread in the pool is available, it will immediately try to execute but will be stuck waiting for the exec_mutex. Which is good because we don't want to execute the function more than once at any point in time.
But it is bad from the performance point of view because the executing task will occupy not one but two threads in the pool which are in short supply. We should try to minimize waiting in BackgroundSchedulePool::threadFunction().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any idea on how we can do this safely ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the pool has no idea if the task is currently executing.
If we think of a task more as a state machine... Here are the current states: Idle, Scheduled, Executing, Delayed. We want to go from Executing to Scheduled and we need to do it without waiting (that was the problem with the first version). But if we transition the task to the Scheduled state it will be immediately picked up for execution by another thread that will then get stuck on the exec_mutex (the current problem).
So possible path to the solution is to add one more state ExecutingScheduled (maybe just another flag) and to check after executing the task if it was scheduled once more in the meantime. If it was, execute it again right away (possibly in the same thread!).
The same goes for the situation when we delay the currently executing task for some small amount and this delay expires before the task finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fixed in 24de8d6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍