Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background schedule pool fixed #1722

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b24a4b2
Add back the buggy BackgroundSchedulePool
Dec 29, 2017
f247967
Fix BackgroundSchedulePool
Jan 1, 2018
6629b03
Fixed few of the observation of the code review
Mar 22, 2018
a2dc16a
Remove the link between TaskHandles and ZooKeeper
Mar 22, 2018
24de8d6
Fix for corner case where executing task will occupy not one but two …
Mar 22, 2018
23aee5e
Task cannot be scheduled and deactivated in the same time.
Mar 23, 2018
7d6268c
Fix bug in the exit predicate
Mar 26, 2018
31874ed
Use consistent names for the task variables
Mar 26, 2018
4361df9
Remove exists and use existsWatch instead for WatchCallbacks
Mar 26, 2018
5099284
Use consistent names for the task variables (change from next_update_…
Mar 26, 2018
438121e
Renamed Zookeeper get method with getWatch for consistency
Mar 26, 2018
0aa9b9e
Use consistent names for the task variables (change from next_update_…
Mar 26, 2018
1418e33
Rename merge_selecting_handle with merge_selecting_task_handle for co…
Mar 26, 2018
0a05769
Reduce the number of lock releases and reacquires in the BckSchPoolDe…
Mar 27, 2018
f811da7
Move merge selection state into ReplicatedMergeTreeMergeSelectingThread
Apr 2, 2018
c547c5a
Merge remote-tracking branch 'origin/background-schedule-pool-fix'
Apr 10, 2018
f1b8d4b
Merge remote-tracking branch 'github/master' into background-schedule…
Apr 10, 2018
1b75fba
Merge remote-tracking branch 'github/master'
Apr 19, 2018
32dd455
Merge remote-tracking branch 'github/master' into background-schedule…
Apr 19, 2018
f824112
Merge remote-tracking branch 'github/master'
Apr 24, 2018
1dd5a70
Merge remote-tracking branch 'origin/master' into background-schedule…
Apr 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions dbms/src/Common/BackgroundSchedulePool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
#include <Common/BackgroundSchedulePool.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <common/logger_useful.h>
#include <chrono>

namespace CurrentMetrics
{
extern const Metric BackgroundSchedulePoolTask;
extern const Metric MemoryTrackingInBackgroundSchedulePool;
}

namespace DB
{


// TaskNotification

class TaskNotification final : public Poco::Notification
{
public:
explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {}
void execute() { task->execute(); }

private:
BackgroundSchedulePool::TaskHandle task;
};


// BackgroundSchedulePool::TaskInfo

BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function):
name(name),
pool(pool),
function(function)
{
}


bool BackgroundSchedulePool::TaskInfo::schedule()
{
std::lock_guard lock(schedule_mutex);

if (deactivated || scheduled)
return false;

scheduled = true;

if(!executing)
{
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock);

pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}

return true;
}


bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
{
std::lock_guard lock(schedule_mutex);

if (deactivated || scheduled)
return false;

pool.scheduleDelayedTask(shared_from_this(), ms, lock);
return true;
}


void BackgroundSchedulePool::TaskInfo::deactivate()
{
std::lock_guard lock_exec(exec_mutex);
std::lock_guard lock_schedule(schedule_mutex);

if (deactivated)
return;

deactivated = true;
scheduled = false;

if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock_schedule);
}


void BackgroundSchedulePool::TaskInfo::activate()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
}


void BackgroundSchedulePool::TaskInfo::execute()
{
std::lock_guard lock_exec(exec_mutex);

{
std::lock_guard lock_schedule(schedule_mutex);

if (deactivated)
return;

scheduled = false;
executing = true;
}

CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};

Stopwatch watch;
function();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As scheduled flag is false here, we can schedule this task once more from another thread. If a thread in the pool is available, it will immediately try to execute but will be stuck waiting for the exec_mutex. Which is good because we don't want to execute the function more than once at any point in time.

But it is bad from the performance point of view because the executing task will occupy not one but two threads in the pool which are in short supply. We should try to minimize waiting in BackgroundSchedulePool::threadFunction().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea on how we can do this safely ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the pool has no idea if the task is currently executing.

If we think of a task more as a state machine... Here are the current states: Idle, Scheduled, Executing, Delayed. We want to go from Executing to Scheduled and we need to do it without waiting (that was the problem with the first version). But if we transition the task to the Scheduled state it will be immediately picked up for execution by another thread that will then get stuck on the exec_mutex (the current problem).

So possible path to the solution is to add one more state ExecutingScheduled (maybe just another flag) and to check after executing the task if it was scheduled once more in the meantime. If it was, execute it again right away (possibly in the same thread!).

The same goes for the situation when we delay the currently executing task for some small amount and this delay expires before the task finishes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed in 24de8d6

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

UInt64 milliseconds = watch.elapsedMilliseconds();

/// If the task is executed longer than specified time, it will be logged.
static const int32_t slow_execution_threshold_ms = 50;

if (milliseconds >= slow_execution_threshold_ms)
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms.");

{
std::lock_guard lock_schedule(schedule_mutex);

executing = false;

/// In case was scheduled while executing (including a scheduleAfter which expired) we schedule the task
/// on the queue. We don't call the function again here because this way all tasks
/// will have their chance to execute

if(scheduled)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}

}

zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t=shared_from_this()](zkutil::ZooKeeper &, int, int, const char *) mutable {
if (t)
{
t->schedule();
t.reset(); /// The event is set only once, even if the callback can fire multiple times due to session events.
}
};
}


// BackgroundSchedulePool

BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
: size(size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");

threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });

delayed_thread = std::thread([this] { delayExecutionThreadFunction(); });
}


BackgroundSchedulePool::~BackgroundSchedulePool()
{
try
{
{
std::unique_lock lock(delayed_tasks_lock);
shutdown = true;
wakeup_cond.notify_all();
}

queue.wakeUpAll();
delayed_thread.join();

LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish.");
for (std::thread & thread : threads)
thread.join();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}


BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task)
{
return std::make_shared<TaskInfo>(*this, name, task);
}


void BackgroundSchedulePool::removeTask(const TaskHandle & task)
{
task->deactivate();
}


void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> & /* schedule_mutex_lock */)
{
Poco::Timestamp current_time;

{
std::lock_guard lock(delayed_tasks_lock);

if (task->delayed)
delayed_tasks.erase(task->iterator);

task->iterator = delayed_tasks.emplace(current_time + (ms * 1000), task);
task->delayed = true;
}

wakeup_cond.notify_all();
}


void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> & /* schedule_mutex_lock */)
{
{
std::lock_guard lock(delayed_tasks_lock);
delayed_tasks.erase(task->iterator);
task->delayed = false;
}

wakeup_cond.notify_all();
}


void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");

MemoryTracker memory_tracker;
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
current_memory_tracker = &memory_tracker;

while (!shutdown)
{
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification())
{
TaskNotification & task_notification = static_cast<TaskNotification &>(*notification);
task_notification.execute();
}
}

current_memory_tracker = nullptr;
}


void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");

while (!shutdown)
{
TaskHandle task;

{
std::unique_lock lock(delayed_tasks_lock);

if(!shutdown)
break;

Poco::Timestamp min_time;

if (!delayed_tasks.empty())
{
auto t = delayed_tasks.begin();
min_time = t->first;
task = t->second;
}

if (!task)
{
wakeup_cond.wait(lock);
continue;
}

Poco::Timestamp current_time;

if (min_time > current_time)
{
wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time));
continue;
Copy link
Contributor

@ztlpn ztlpn Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is a bit strange - after the thread is awakened, it will be granted delayed_tasks_lock, then it will release it, only to try and immediately reacquire it on line 230.

But probably this doesn't matter for correctness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that in this case the lock is released and then reacquired. But in this case we cannot just schedule the task. Maybe was removed from the queue so for this reason we are going again through checks.

I need to think if calling active() method on the task is enough or there are also other implications.

Copy link
Contributor

@ztlpn ztlpn Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we need to go through all the checks after returning from the wait() method (even if for the possibility of a spurious wakeup).

My point was that it is not necessary to release the lock and immediately re-acquire it to check the condition - the lock is already acquired after we return from the wait() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ztlpn I'm afraid is not clear for me on how you want this fixed : I mean how I can go through all the checks while also I'm not releasing the lock which is already acquired.

Can you provide some hints ?

Copy link
Contributor

@ztlpn ztlpn Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible solution:
Here is the current code (in pseudocode)

while (!shutdown)
{
    {
        acquire lock;
        
        if (shutdown)
            break;

        if (no tasks)
        {
            condition.wait();
            continue;
        }
    }

    schedule task;
}

Every time we do continue; we release the lock only to immediately re-acquire it.

Now what if we add a second loop:

while (!shutdown)
{
    {
        acquire lock;

        while (!shutdown)
        {
            if (no tasks)
            {
                condition.wait();
                continue;
            }
            else
                break;
        }
    }

    schedule task;
}

This way we still hold the lock after continuing and we execute the inner loop until we get a task to schedule. The lock is released only to wait on the condition.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW there is a bug in the exit predicate (line 265) - should be if (shutdown) not if(!shutdown)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 7d6268c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You see any issue with this implementation :

void BackgroundSchedulePool::delayExecutionThreadFunction()
{
    setThreadName("BckSchPoolDelay");

    while (!shutdown)
    {
        TaskHandle task;
        bool found = false;

        {
            std::unique_lock lock(delayed_tasks_lock);

            while(!shutdown)
            {
                Poco::Timestamp min_time;

                if (!delayed_tasks.empty())
                {
                    auto t = delayed_tasks.begin();
                    min_time = t->first;
                    task = t->second;
                }

                if (!task)
                {
                    wakeup_cond.wait(lock);
                    continue;
                }

                Poco::Timestamp current_time;

                if (min_time > current_time)
                {
                    wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time));
                    continue;
                }
                else
                {
                    //we have a task ready for execution
                    found = true;
                    break;
                }
            }
        }

        if(found)
            task->schedule();
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0a05769

}
}

task->schedule();
}
}

}
Loading