Skip to content

Commit

Permalink
Support add background tasks with specified interval (pingcap#2571)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Aug 5, 2021
1 parent e36ae56 commit 1593d34
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 22 deletions.
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ class DeltaMergeStore : private boost::noncopyable
/// Compact fregment packs into bigger one.
void compact(const Context & context, const RowKeyRange & range);

/// Apply `commands` on `table_columns`
/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit);

/// Apply DDL `commands` on `table_columns`
void applyAlters(const AlterCommands & commands, //
const OptionTableInfoConstRef table_info,
ColumnID & max_column_id_used,
Expand All @@ -384,8 +387,6 @@ class DeltaMergeStore : private boost::noncopyable
bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }

UInt64 onSyncGc(Int64 limit);

public:
/// Methods mainly used by region split.

Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Storages/GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ extern const int TABLE_IS_DROPPED;
bool GCManager::work()
{
auto & global_settings = global_context.getSettingsRef();
// TODO: remove this when `BackgroundProcessingPool` supports specify task running interval
if (gc_check_stop_watch.elapsedSeconds() < global_settings.dt_bg_gc_check_interval)
return false;
Int64 gc_segments_limit = global_settings.dt_bg_gc_max_segments_to_check_every_round;
// limit less than or equal to 0 means no gc
if (gc_segments_limit <= 0)
{
gc_check_stop_watch.restart();
return false;
}

LOG_INFO(log, "Start GC with table id: " << next_table_id);
// Get a storage snapshot with weak_ptrs first
// TODO: avoid gc on storage which have no data?
Expand Down Expand Up @@ -76,7 +72,6 @@ bool GCManager::work()
iter = storages.begin();
next_table_id = iter->first;
LOG_INFO(log, "End GC and next gc will start with table id: " << next_table_id);
gc_check_stop_watch.restart();
// Always return false
return false;
}
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/GCManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class GCManager

TableID next_table_id = InvalidTableID;

AtomicStopwatch gc_check_stop_watch;

Logger * log;
};
} // namespace DB
30 changes: 23 additions & 7 deletions dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/randomSeed.h>
#include <Common/setThreadName.h>
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <Poco/Timespan.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <common/logger_useful.h>

#include <pcg_random.hpp>
#include <random>
Expand Down Expand Up @@ -66,9 +67,9 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
}


BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task, const bool multi)
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task, const bool multi, const size_t interval_ms)
{
TaskHandle res = std::make_shared<TaskInfo>(*this, task, multi);
TaskHandle res = std::make_shared<TaskInfo>(*this, task, multi, interval_ms);

Poco::Timestamp current_time;

Expand Down Expand Up @@ -132,8 +133,9 @@ void BackgroundProcessingPool::threadFunction()

while (!shutdown)
{
bool done_work = false;
TaskHandle task;
// The time to sleep before running next task, `sleep_seconds` by default.
Poco::Timespan next_sleep_time_span(sleep_seconds, 0);

try
{
Expand Down Expand Up @@ -185,6 +187,7 @@ void BackgroundProcessingPool::threadFunction()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundPoolTask};

bool done_work = false;
if (!task->multi)
{
bool expected = false;
Expand All @@ -198,6 +201,19 @@ void BackgroundProcessingPool::threadFunction()
}
else
done_work = task->function();

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
if (done_work)
{
next_sleep_time_span = 0;
}
else if (task->interval_milliseconds != 0)
{
// Update `next_sleep_time_span` by user-defined interval if the later one is non-zero
next_sleep_time_span = Poco::Timespan(0, /*microseconds=*/task->interval_milliseconds * 1000);
}
// else `sleep_seconds` by default
}
}
catch (...)
Expand All @@ -216,7 +232,7 @@ void BackgroundProcessingPool::threadFunction()

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + next_sleep_time_span;

{
std::unique_lock<std::mutex> lock(tasks_mutex);
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/MergeTree/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ class BackgroundProcessingPool
/// Wake up any thread.
void wake();

TaskInfo(BackgroundProcessingPool & pool_, const Task & function_, const bool multi_) : pool(pool_), function(function_), multi(multi_) {}
TaskInfo(BackgroundProcessingPool & pool_, const Task & function_, const bool multi_, const uint64_t interval_ms_)
: pool(pool_), function(function_), multi(multi_), interval_milliseconds(interval_ms_)
{}

private:
friend class BackgroundProcessingPool;
Expand All @@ -51,6 +53,8 @@ class BackgroundProcessingPool
const bool multi;
std::atomic_bool occupied {false};

const uint64_t interval_milliseconds;

std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
};

Expand All @@ -65,7 +69,9 @@ class BackgroundProcessingPool
}

/// if multi == false, this task can only be called by one thread at same time.
TaskHandle addTask(const Task & task, const bool multi = true);
/// If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// If interval_ms is not zero, this task will be scheduled with `interval_ms`.
TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

~BackgroundProcessingPool();
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/Transaction/BackgroundService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ BackgroundService::BackgroundService(TMTContext & tmt_)
else
{
LOG_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled.");
storage_gc_handle = background_pool.addTask([this] { return tmt.getGCManager().work(); }, false);
auto & global_settings = tmt.getContext().getSettingsRef();
storage_gc_handle = background_pool.addTask(
[this] { return tmt.getGCManager().work(); }, false, /*interval_ms=*/global_settings.dt_bg_gc_check_interval * 1000);
LOG_INFO(log, "Start background storage gc worker with interval " << global_settings.dt_bg_gc_check_interval << " seconds.");
}
}

Expand Down

0 comments on commit 1593d34

Please sign in to comment.