Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support add background tasks with specified interval #2571

Merged
merged 4 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ class DeltaMergeStore : private boost::noncopyable
/// Compact fregment packs into bigger one.
void compact(const Context & context, const RowKeyRange & range);

/// Apply `commands` on `table_columns`
/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit);

/// Apply DDL `commands` on `table_columns`
void applyAlters(const AlterCommands & commands, //
const OptionTableInfoConstRef table_info,
ColumnID & max_column_id_used,
Expand All @@ -384,8 +387,6 @@ class DeltaMergeStore : private boost::noncopyable
bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }

UInt64 onSyncGc(Int64 limit);

public:
/// Methods mainly used by region split.

Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Storages/GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ extern const int TABLE_IS_DROPPED;
bool GCManager::work()
{
auto & global_settings = global_context.getSettingsRef();
// TODO: remove this when `BackgroundProcessingPool` supports specify task running interval
if (gc_check_stop_watch.elapsedSeconds() < global_settings.dt_bg_gc_check_interval)
return false;
Int64 gc_segments_limit = global_settings.dt_bg_gc_max_segments_to_check_every_round;
// limit less than or equal to 0 means no gc
if (gc_segments_limit <= 0)
{
gc_check_stop_watch.restart();
return false;
}

LOG_INFO(log, "Start GC with table id: " << next_table_id);
// Get a storage snapshot with weak_ptrs first
// TODO: avoid gc on storage which have no data?
Expand Down Expand Up @@ -76,7 +72,6 @@ bool GCManager::work()
iter = storages.begin();
next_table_id = iter->first;
LOG_INFO(log, "End GC and next gc will start with table id: " << next_table_id);
gc_check_stop_watch.restart();
// Always return false
return false;
}
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/GCManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class GCManager

TableID next_table_id = InvalidTableID;

AtomicStopwatch gc_check_stop_watch;

Logger * log;
};
} // namespace DB
30 changes: 23 additions & 7 deletions dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/randomSeed.h>
#include <Common/setThreadName.h>
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <Poco/Timespan.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <common/logger_useful.h>

#include <pcg_random.hpp>
#include <random>
Expand Down Expand Up @@ -66,9 +67,9 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
}


BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task, const bool multi)
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task, const bool multi, const size_t interval_ms)
{
TaskHandle res = std::make_shared<TaskInfo>(*this, task, multi);
TaskHandle res = std::make_shared<TaskInfo>(*this, task, multi, interval_ms);

Poco::Timestamp current_time;

Expand Down Expand Up @@ -132,8 +133,9 @@ void BackgroundProcessingPool::threadFunction()

while (!shutdown)
{
bool done_work = false;
TaskHandle task;
// The time to sleep before running next task, `sleep_seconds` by default.
Poco::Timespan next_sleep_time_span(sleep_seconds, 0);

try
{
Expand Down Expand Up @@ -185,6 +187,7 @@ void BackgroundProcessingPool::threadFunction()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundPoolTask};

bool done_work = false;
if (!task->multi)
{
bool expected = false;
Expand All @@ -198,6 +201,19 @@ void BackgroundProcessingPool::threadFunction()
}
else
done_work = task->function();

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
if (done_work)
{
next_sleep_time_span = 0;
}
else if (task->interval_milliseconds != 0)
{
// Update `next_sleep_time_span` by user-defined interval if the later one is non-zero
next_sleep_time_span = Poco::Timespan(0, /*microseconds=*/task->interval_milliseconds * 1000);
}
// else `sleep_seconds` by default
}
}
catch (...)
Expand All @@ -216,7 +232,7 @@ void BackgroundProcessingPool::threadFunction()

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + (done_work ? 0 : sleep_seconds * 1000000);
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + next_sleep_time_span;

{
std::unique_lock<std::mutex> lock(tasks_mutex);
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/MergeTree/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ class BackgroundProcessingPool
/// Wake up any thread.
void wake();

TaskInfo(BackgroundProcessingPool & pool_, const Task & function_, const bool multi_) : pool(pool_), function(function_), multi(multi_) {}
TaskInfo(BackgroundProcessingPool & pool_, const Task & function_, const bool multi_, const uint64_t interval_ms_)
: pool(pool_), function(function_), multi(multi_), interval_milliseconds(interval_ms_)
{}

private:
friend class BackgroundProcessingPool;
Expand All @@ -51,6 +53,8 @@ class BackgroundProcessingPool
const bool multi;
std::atomic_bool occupied {false};

const uint64_t interval_milliseconds;

std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
};

Expand All @@ -65,7 +69,9 @@ class BackgroundProcessingPool
}

/// if multi == false, this task can only be called by one thread at same time.
TaskHandle addTask(const Task & task, const bool multi = true);
/// If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// If interval_ms is not zero, this task will be scheduled with `interval_ms`.
TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

~BackgroundProcessingPool();
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/Transaction/BackgroundService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ BackgroundService::BackgroundService(TMTContext & tmt_)
else
{
LOG_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled.");
storage_gc_handle = background_pool.addTask([this] { return tmt.getGCManager().work(); }, false);
auto & global_settings = tmt.getContext().getSettingsRef();
storage_gc_handle = background_pool.addTask(
[this] { return tmt.getGCManager().work(); }, false, /*interval_ms=*/global_settings.dt_bg_gc_check_interval * 1000);
LOG_INFO(log, "Start background storage gc worker with interval " << global_settings.dt_bg_gc_check_interval << " seconds.");
}
}

Expand Down