Skip to content

Commit

Permalink
Merge branch 'master' into agg-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliqi authored Jul 21, 2022
2 parents 80455a7 + 71a298a commit a32a616
Show file tree
Hide file tree
Showing 42 changed files with 2,742 additions and 84 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser)
add_headers_and_sources(dbms src/Storages/DeltaMerge/File)
add_headers_and_sources(dbms src/Storages/DeltaMerge/ColumnFile)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Delta)
add_headers_and_sources(dbms src/Storages/DeltaMerge/ReadThread)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/Transaction)
add_headers_and_sources(dbms src/Storages/Page/V1)
Expand Down
19 changes: 17 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,23 @@ namespace DB
F(type_thread_hard_limit, {"type", "thread_hard_limit"}), \
F(type_hard_limit_exceeded_count, {"type", "hard_limit_exceeded_count"})) \
M(tiflash_task_scheduler_waiting_duration_seconds, "Bucketed histogram of task waiting for scheduling duration", Histogram, \
F(type_task_scheduler_waiting_duration, {{"type", "task_waiting_duration"}}, ExpBuckets{0.001, 2, 20}))

F(type_task_scheduler_waiting_duration, {{"type", "task_waiting_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_read_thread_counter, "The counter of storage read thread", Counter, \
F(type_sche_no_pool, {"type", "sche_no_pool"}), \
F(type_sche_no_slot, {"type", "sche_no_slot"}), \
F(type_sche_no_segment, {"type", "sche_no_segment"}), \
F(type_sche_from_cache, {"type", "sche_from_cache"}), \
F(type_sche_new_task, {"type", "sche_new_task"}), \
F(type_add_cache_succ, {"type", "add_cache_succ"}), \
F(type_add_cache_stale, {"type", "add_cache_stale"}), \
F(type_get_cache_miss, {"type", "get_cache_miss"}), \
F(type_get_cache_part, {"type", "get_cache_part"}), \
F(type_get_cache_hit, {"type", "get_cache_hit"}), \
F(type_get_cache_copy, {"type", "add_cache_copy"})) \
M(tiflash_storage_read_thread_gauge, "The gauge of storage read thread", Gauge, \
F(type_merged_task, {"type", "merged_task"})) \
M(tiflash_storage_read_thread_seconds, "Bucketed histogram of read thread", Histogram, \
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20}))
// clang-format on

struct ExpBuckets
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id);
query_info.keep_order = table_scan.keepOrder();
return query_info;
};
if (table_scan.isPartitionTableScan())
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ TiDBTableScan::TiDBTableScan(
, executor_id(executor_id_)
, is_partition_table_scan(table_scan->tp() == tipb::TypePartitionTableScan)
, columns(is_partition_table_scan ? table_scan->partition_table_scan().columns() : table_scan->tbl_scan().columns())
// Only No-partition table need keep order when tablescan executor required keep order.
// If keep_order is not set, keep order for safety.
, keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order()))
{
if (is_partition_table_scan)
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class TiDBTableScan
{
return executor_id;
}
bool keepOrder() const
{
return keep_order;
}

private:
const tipb::Executor * table_scan;
Expand All @@ -66,6 +70,7 @@ class TiDBTableScan
/// physical_table_ids contains the table ids of its partitions
std::vector<Int64> physical_table_ids;
Int64 logical_table_id;
bool keep_order;
};

} // namespace DB
29 changes: 29 additions & 0 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class ExecutorAggTestRunner : public DB::tests::ExecutorTest
toNullableVec<String>(col_name[1], col_gender),
toNullableVec<String>(col_name[2], col_country),
toNullableVec<Float64>(col_name[3], col_salary)});

context.addMockTable({"aggnull_test", "t1"},
{{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}},
{toNullableVec<String>("s1", {"banana", {}, "banana"}),
toNullableVec<String>("s2", {"apple", {}, "banana"})});
}

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(std::pair<String, String> src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj)
Expand Down Expand Up @@ -303,6 +308,30 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggNull)
try
{
auto request = context
.scan("aggnull_test", "t1")
.aggregation({Max(col("s1"))}, {})
.build(context);
{
ASSERT_COLUMNS_EQ_R(executeStreams(request),
createColumns({toNullableVec<String>({"banana"})}));
}

request = context
.scan("aggnull_test", "t1")
.aggregation({}, {col("s1")})
.build(context);
{
ASSERT_COLUMNS_EQ_R(executeStreams(request),
createColumns({toNullableVec<String>("s1", {{}, "banana"})}));
}
}
CATCH


// TODO support more type of min, max, count.
// support more aggregation functions: sum, forst_row, group_concat

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ struct Settings
\
M(SettingDouble, dt_storage_blob_heavy_gc_valid_rate, 0.2, "Max valid rate of deciding a blob can be compact") \
M(SettingDouble, dt_storage_blob_block_alignment_bytes, 0, "Blob IO alignment size") \
M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not") \
M(SettingDouble, dt_block_slots_scale, 1.0, "Block slots limit of a read request") \
M(SettingDouble, dt_active_segments_scale, 1.0, "Acitve segments limit of a read request") \
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/DeltaMerge/ReadThread/ColumnSharingCache.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
#include <Storages/PathCapacityMetrics.h>
Expand Down Expand Up @@ -1335,6 +1338,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getTMTContext().reloadConfig(config());
}

// Initialize the thread pool of storage before the storage engine is initialized.
LOG_FMT_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread);
DM::SegmentReaderPoolManager::instance().init(server_info);
DM::SegmentReadTaskScheduler::instance();
DM::DMFileReaderPool::instance();

{
// Note that this must do before initialize schema sync service.
do
Expand Down
150 changes: 110 additions & 40 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
#include <Storages/DeltaMerge/SchemaUpdate.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
Expand Down Expand Up @@ -1135,13 +1137,15 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
size_t num_streams,
bool keep_order,
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
{
SegmentReadTasks tasks;

auto dm_context = newDMContext(db_context, db_settings, fmt::format("read_raw_{}", db_context.getCurrentQueryId()));

// If keep order is required, disable read thread.
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order;
{
std::shared_lock lock(read_write_mutex);

Expand Down Expand Up @@ -1173,29 +1177,56 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read);
};
size_t final_num_stream = std::min(num_streams, tasks.size());
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(std::move(tasks));
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
dm_context,
columns_to_read,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw = */ true,
/* do_delete_mark_filter_for_raw = */ false,
std::move(tasks),
after_segment_read);

String req_info;
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
req_info = db_context.getDAGContext()->getMPPTaskId().toString();
BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
{
BlockInputStreamPtr stream = std::make_shared<DMSegmentThreadInputStream>(
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw_ */ true,
/* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1
extra_table_id_index,
physical_table_id,
req_info);
BlockInputStreamPtr stream;
if (enable_read_thread)
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
columns_to_read,
extra_table_id_index,
physical_table_id,
req_info);
}
else
{
stream = std::make_shared<DMSegmentThreadInputStream>(
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
EMPTY_FILTER,
std::numeric_limits<UInt64>::max(),
DEFAULT_BLOCK_SIZE,
/* is_raw_ */ true,
/* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1
extra_table_id_index,
physical_table_id,
req_info);
}
res.push_back(stream);
}
if (enable_read_thread)
{
SegmentReadTaskScheduler::instance().add(read_task_pool);
}
return res;
}

Expand All @@ -1207,18 +1238,27 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
UInt64 max_version,
const RSOperatorPtr & filter,
const String & tracing_id,
bool keep_order,
bool is_fast_mode,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
{
// Use the id from MPP/Coprocessor level as tracing_id
auto dm_context = newDMContext(db_context, db_settings, tracing_id);

SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments);
// If keep order is required, disable read thread.
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order;
// SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled.
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
// Also, too many read tasks of a segment with different samll ranges is not good for data sharing cache.
SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /*try_split_task =*/!enable_read_thread);

auto tracing_logger = Logger::get(log->name(), dm_context->tracing_id);
LOG_FMT_DEBUG(tracing_logger, "Read create segment snapshot done");
LOG_FMT_DEBUG(tracing_logger,
"Read create segment snapshot done keep_order {} dt_enable_read_thread {} => enable_read_thread {}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread);

auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) {
// TODO: Update the tracing_id before checkSegmentUpdate?
Expand All @@ -1227,30 +1267,56 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,

GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size());
size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size()));
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(std::move(tasks));
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
dm_context,
columns_to_read,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_mode,
/* do_delete_mark_filter_for_raw = */ is_fast_mode,
std::move(tasks),
after_segment_read);

String req_info;
if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask())
req_info = db_context.getDAGContext()->getMPPTaskId().toString();
BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
{
BlockInputStreamPtr stream = std::make_shared<DMSegmentThreadInputStream>(
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
filter,
max_version,
expected_block_size,
/* is_raw_ */ is_fast_mode,
/* do_delete_mark_filter_for_raw_ */ is_fast_mode,
extra_table_id_index,
physical_table_id,
req_info);
BlockInputStreamPtr stream;
if (enable_read_thread)
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
columns_to_read,
extra_table_id_index,
physical_table_id,
req_info);
}
else
{
stream = std::make_shared<DMSegmentThreadInputStream>(
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
filter,
max_version,
expected_block_size,
/* is_raw_ */ is_fast_mode,
/* do_delete_mark_filter_for_raw_ */ is_fast_mode,
extra_table_id_index,
physical_table_id,
req_info);
}
res.push_back(stream);
}

if (enable_read_thread)
{
SegmentReadTaskScheduler::instance().add(read_task_pool);
}
LOG_FMT_DEBUG(tracing_logger, "Read create stream done");

return res;
Expand Down Expand Up @@ -2632,7 +2698,8 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
DMContext & dm_context,
const RowKeyRanges & sorted_ranges,
size_t expected_tasks_count,
const SegmentIdSet & read_segments)
const SegmentIdSet & read_segments,
bool try_split_task)
{
SegmentReadTasks tasks;

Expand Down Expand Up @@ -2687,12 +2754,15 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
++seg_it;
}
}

/// Try to make task number larger or equal to expected_tasks_count.
auto result_tasks = SegmentReadTask::trySplitReadTasks(tasks, expected_tasks_count);
auto tasks_before_split = tasks.size();
if (try_split_task)
{
/// Try to make task number larger or equal to expected_tasks_count.
tasks = SegmentReadTask::trySplitReadTasks(tasks, expected_tasks_count);
}

size_t total_ranges = 0;
for (auto & task : result_tasks)
for (auto & task : tasks)
{
/// Merge continuously ranges.
task->mergeRanges();
Expand All @@ -2704,11 +2774,11 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(
tracing_logger,
"[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]",
sorted_ranges.size(),
tasks_before_split,
tasks.size(),
result_tasks.size(),
total_ranges);

return result_tasks;
return tasks;
}

} // namespace DM
Expand Down
Loading

0 comments on commit a32a616

Please sign in to comment.