Skip to content

Commit

Permalink
fix bug in MinTSOScheduler that estimated_thread_usage/`waiting_t…
Browse files Browse the repository at this point in the history
…asks_count`/`active_tasks_count` are not 0 even if there is no queries running (#5557)

close #5556
  • Loading branch information
windtalker authored Aug 8, 2022
1 parent d8fae61 commit 89e7df8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,17 @@ void MPPTask::scheduleOrWait()
}
}

void MPPTask::scheduleThisTask(ScheduleState state)
bool MPPTask::scheduleThisTask(ScheduleState state)
{
std::unique_lock lock(schedule_mu);
if (schedule_state == ScheduleState::WAITING)
{
LOG_FMT_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
schedule_state = state;
schedule_cv.notify_one();
return true;
}
return false;
}

int MPPTask::estimateCountOfNewThreads()
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
COMPLETED
};

void scheduleThisTask(ScheduleState state);
bool scheduleThisTask(ScheduleState state);

bool isScheduled();

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error))
{
if (has_error)
{
query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors.
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
return;
}
query_task_set->waiting_tasks.pop();
Expand All @@ -189,11 +192,13 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it");
active_set.insert(tso);
estimated_thread_usage += needed_threads;
task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED);
if (task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED))
{
estimated_thread_usage += needed_threads;
GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment();
}
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());
GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage);
GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment();
LOG_FMT_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", task->getId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit);
return true;
}
Expand Down

0 comments on commit 89e7df8

Please sign in to comment.