diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 0df79f89a84..3e38c2a9fdb 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -80,6 +80,7 @@ add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser) add_headers_and_sources(dbms src/Storages/DeltaMerge/File) add_headers_and_sources(dbms src/Storages/DeltaMerge/ColumnFile) add_headers_and_sources(dbms src/Storages/DeltaMerge/Delta) +add_headers_and_sources(dbms src/Storages/DeltaMerge/ReadThread) add_headers_and_sources(dbms src/Storages/Distributed) add_headers_and_sources(dbms src/Storages/Transaction) add_headers_and_sources(dbms src/Storages/Page/V1) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index c0ce60af01e..5b627783544 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -205,8 +205,23 @@ namespace DB F(type_thread_hard_limit, {"type", "thread_hard_limit"}), \ F(type_hard_limit_exceeded_count, {"type", "hard_limit_exceeded_count"})) \ M(tiflash_task_scheduler_waiting_duration_seconds, "Bucketed histogram of task waiting for scheduling duration", Histogram, \ - F(type_task_scheduler_waiting_duration, {{"type", "task_waiting_duration"}}, ExpBuckets{0.001, 2, 20})) - + F(type_task_scheduler_waiting_duration, {{"type", "task_waiting_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_storage_read_thread_counter, "The counter of storage read thread", Counter, \ + F(type_sche_no_pool, {"type", "sche_no_pool"}), \ + F(type_sche_no_slot, {"type", "sche_no_slot"}), \ + F(type_sche_no_segment, {"type", "sche_no_segment"}), \ + F(type_sche_from_cache, {"type", "sche_from_cache"}), \ + F(type_sche_new_task, {"type", "sche_new_task"}), \ + F(type_add_cache_succ, {"type", "add_cache_succ"}), \ + F(type_add_cache_stale, {"type", "add_cache_stale"}), \ + F(type_get_cache_miss, {"type", "get_cache_miss"}), \ + F(type_get_cache_part, {"type", "get_cache_part"}), \ + F(type_get_cache_hit, {"type", "get_cache_hit"}), \ + F(type_get_cache_copy, {"type", "add_cache_copy"})) \ + M(tiflash_storage_read_thread_gauge, "The gauge of storage read thread", Gauge, \ + F(type_merged_task, {"type", "merged_task"})) \ + M(tiflash_storage_read_thread_seconds, "Bucketed histogram of read thread", Histogram, \ + F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) // clang-format on struct ExpBuckets diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 390ce7b9948..fb4a03f0999 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -599,6 +599,7 @@ std::unordered_map DAGStorageInterpreter::generateSele analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id); + query_info.keep_order = table_scan.keepOrder(); return query_info; }; if (table_scan.isPartitionTableScan()) diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp index 7d7ad2f2b57..dca738a2e6c 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp @@ -24,6 +24,9 @@ TiDBTableScan::TiDBTableScan( , executor_id(executor_id_) , is_partition_table_scan(table_scan->tp() == tipb::TypePartitionTableScan) , columns(is_partition_table_scan ? table_scan->partition_table_scan().columns() : table_scan->tbl_scan().columns()) + // Only No-partition table need keep order when tablescan executor required keep order. + // If keep_order is not set, keep order for safety. + , keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order())) { if (is_partition_table_scan) { diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.h b/dbms/src/Flash/Coprocessor/TiDBTableScan.h index 6ac07d326f6..a5a463a8ff2 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.h +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.h @@ -51,6 +51,10 @@ class TiDBTableScan { return executor_id; } + bool keepOrder() const + { + return keep_order; + } private: const tipb::Executor * table_scan; @@ -66,6 +70,7 @@ class TiDBTableScan /// physical_table_ids contains the table ids of its partitions std::vector physical_table_ids; Int64 logical_table_id; + bool keep_order; }; } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index 53a6591ac96..9cbf0b80f89 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -100,6 +100,11 @@ class ExecutorAggTestRunner : public DB::tests::ExecutorTest toNullableVec(col_name[1], col_gender), toNullableVec(col_name[2], col_country), toNullableVec(col_name[3], col_salary)}); + + context.addMockTable({"aggnull_test", "t1"}, + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); } std::shared_ptr buildDAGRequest(std::pair src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj) @@ -303,6 +308,30 @@ try } CATCH +TEST_F(ExecutorAggTestRunner, AggNull) +try +{ + auto request = context + .scan("aggnull_test", "t1") + .aggregation({Max(col("s1"))}, {}) + .build(context); + { + ASSERT_COLUMNS_EQ_R(executeStreams(request), + createColumns({toNullableVec({"banana"})})); + } + + request = context + .scan("aggnull_test", "t1") + .aggregation({}, {col("s1")}) + .build(context); + { + ASSERT_COLUMNS_EQ_R(executeStreams(request), + createColumns({toNullableVec("s1", {{}, "banana"})})); + } +} +CATCH + + // TODO support more type of min, max, count. // support more aggregation functions: sum, forst_row, group_concat diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 6d2a6cc47ef..904a88e9a6b 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -317,6 +317,9 @@ struct Settings \ M(SettingDouble, dt_storage_blob_heavy_gc_valid_rate, 0.2, "Max valid rate of deciding a blob can be compact") \ M(SettingDouble, dt_storage_blob_block_alignment_bytes, 0, "Blob IO alignment size") \ + M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not") \ + M(SettingDouble, dt_block_slots_scale, 1.0, "Block slots limit of a read request") \ + M(SettingDouble, dt_active_segments_scale, 1.0, "Acitve segments limit of a read request") \ \ M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \ M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index aabca11cf9c..fa34400d5e3 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -64,6 +64,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -1335,6 +1338,12 @@ int Server::main(const std::vector & /*args*/) global_context->getTMTContext().reloadConfig(config()); } + // Initialize the thread pool of storage before the storage engine is initialized. + LOG_FMT_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread); + DM::SegmentReaderPoolManager::instance().init(server_info); + DM::SegmentReadTaskScheduler::instance(); + DM::DMFileReaderPool::instance(); + { // Note that this must do before initialize schema sync service. do diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index b28bb84f76e..27062c2c108 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -26,6 +26,8 @@ #include #include #include +#include +#include #include #include #include @@ -1135,13 +1137,15 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, size_t num_streams, + bool keep_order, const SegmentIdSet & read_segments, size_t extra_table_id_index) { SegmentReadTasks tasks; auto dm_context = newDMContext(db_context, db_settings, fmt::format("read_raw_{}", db_context.getCurrentQueryId())); - + // If keep order is required, disable read thread. + auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order; { std::shared_lock lock(read_write_mutex); @@ -1173,7 +1177,17 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); }; size_t final_num_stream = std::min(num_streams, tasks.size()); - auto read_task_pool = std::make_shared(std::move(tasks)); + auto read_task_pool = std::make_shared( + physical_table_id, + dm_context, + columns_to_read, + EMPTY_FILTER, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE, + /* is_raw = */ true, + /* do_delete_mark_filter_for_raw = */ false, + std::move(tasks), + after_segment_read); String req_info; if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) @@ -1181,21 +1195,38 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { - BlockInputStreamPtr stream = std::make_shared( - dm_context, - read_task_pool, - after_segment_read, - columns_to_read, - EMPTY_FILTER, - std::numeric_limits::max(), - DEFAULT_BLOCK_SIZE, - /* is_raw_ */ true, - /* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1 - extra_table_id_index, - physical_table_id, - req_info); + BlockInputStreamPtr stream; + if (enable_read_thread) + { + stream = std::make_shared( + read_task_pool, + columns_to_read, + extra_table_id_index, + physical_table_id, + req_info); + } + else + { + stream = std::make_shared( + dm_context, + read_task_pool, + after_segment_read, + columns_to_read, + EMPTY_FILTER, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE, + /* is_raw_ */ true, + /* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1 + extra_table_id_index, + physical_table_id, + req_info); + } res.push_back(stream); } + if (enable_read_thread) + { + SegmentReadTaskScheduler::instance().add(read_task_pool); + } return res; } @@ -1207,6 +1238,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, UInt64 max_version, const RSOperatorPtr & filter, const String & tracing_id, + bool keep_order, bool is_fast_mode, size_t expected_block_size, const SegmentIdSet & read_segments, @@ -1214,11 +1246,19 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { // Use the id from MPP/Coprocessor level as tracing_id auto dm_context = newDMContext(db_context, db_settings, tracing_id); - - SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments); + // If keep order is required, disable read thread. + auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order; + // SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled. + // 'try_split_task' can result in several read tasks with the same id that can cause some trouble. + // Also, too many read tasks of a segment with different samll ranges is not good for data sharing cache. + SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /*try_split_task =*/!enable_read_thread); auto tracing_logger = Logger::get(log->name(), dm_context->tracing_id); - LOG_FMT_DEBUG(tracing_logger, "Read create segment snapshot done"); + LOG_FMT_DEBUG(tracing_logger, + "Read create segment snapshot done keep_order {} dt_enable_read_thread {} => enable_read_thread {}", + keep_order, + db_context.getSettingsRef().dt_enable_read_thread, + enable_read_thread); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { // TODO: Update the tracing_id before checkSegmentUpdate? @@ -1227,7 +1267,17 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size()); size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size())); - auto read_task_pool = std::make_shared(std::move(tasks)); + auto read_task_pool = std::make_shared( + physical_table_id, + dm_context, + columns_to_read, + filter, + max_version, + expected_block_size, + /* is_raw = */ is_fast_mode, + /* do_delete_mark_filter_for_raw = */ is_fast_mode, + std::move(tasks), + after_segment_read); String req_info; if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) @@ -1235,22 +1285,38 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { - BlockInputStreamPtr stream = std::make_shared( - dm_context, - read_task_pool, - after_segment_read, - columns_to_read, - filter, - max_version, - expected_block_size, - /* is_raw_ */ is_fast_mode, - /* do_delete_mark_filter_for_raw_ */ is_fast_mode, - extra_table_id_index, - physical_table_id, - req_info); + BlockInputStreamPtr stream; + if (enable_read_thread) + { + stream = std::make_shared( + read_task_pool, + columns_to_read, + extra_table_id_index, + physical_table_id, + req_info); + } + else + { + stream = std::make_shared( + dm_context, + read_task_pool, + after_segment_read, + columns_to_read, + filter, + max_version, + expected_block_size, + /* is_raw_ */ is_fast_mode, + /* do_delete_mark_filter_for_raw_ */ is_fast_mode, + extra_table_id_index, + physical_table_id, + req_info); + } res.push_back(stream); } - + if (enable_read_thread) + { + SegmentReadTaskScheduler::instance().add(read_task_pool); + } LOG_FMT_DEBUG(tracing_logger, "Read create stream done"); return res; @@ -2632,7 +2698,8 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( DMContext & dm_context, const RowKeyRanges & sorted_ranges, size_t expected_tasks_count, - const SegmentIdSet & read_segments) + const SegmentIdSet & read_segments, + bool try_split_task) { SegmentReadTasks tasks; @@ -2687,12 +2754,15 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( ++seg_it; } } - - /// Try to make task number larger or equal to expected_tasks_count. - auto result_tasks = SegmentReadTask::trySplitReadTasks(tasks, expected_tasks_count); + auto tasks_before_split = tasks.size(); + if (try_split_task) + { + /// Try to make task number larger or equal to expected_tasks_count. + tasks = SegmentReadTask::trySplitReadTasks(tasks, expected_tasks_count); + } size_t total_ranges = 0; - for (auto & task : result_tasks) + for (auto & task : tasks) { /// Merge continuously ranges. task->mergeRanges(); @@ -2704,11 +2774,11 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( tracing_logger, "[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]", sorted_ranges.size(), + tasks_before_split, tasks.size(), - result_tasks.size(), total_ranges); - return result_tasks; + return tasks; } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index bd7a5bb542a..62d28e4743b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -350,6 +350,7 @@ class DeltaMergeStore : private boost::noncopyable const DB::Settings & db_settings, const ColumnDefines & columns_to_read, size_t num_streams, + bool keep_order, const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); @@ -366,6 +367,7 @@ class DeltaMergeStore : private boost::noncopyable UInt64 max_version, const RSOperatorPtr & filter, const String & tracing_id, + bool keep_order, bool is_fast_mode = false, // set true when read in fast mode size_t expected_block_size = DEFAULT_BLOCK_SIZE, const SegmentIdSet & read_segments = {}, @@ -493,7 +495,8 @@ class DeltaMergeStore : private boost::noncopyable SegmentReadTasks getReadTasksByRanges(DMContext & dm_context, const RowKeyRanges & sorted_ranges, size_t expected_tasks_count = 1, - const SegmentIdSet & read_segments = {}); + const SegmentIdSet & read_segments = {}, + bool try_split_task = true); private: void dropAllSegments(bool keep_first_segment); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 56b11b975a1..196a300688b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -56,6 +56,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_limiter, tracing_id); + bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader(); + DMFileReader reader( dmfile, read_columns, @@ -73,8 +75,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_limiter, rows_threshold_per_read, read_one_pack_every_time, - tracing_id); + tracing_id, + enable_read_thread); - return std::make_shared(std::move(reader)); + return std::make_shared(std::move(reader), enable_read_thread); } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index a7f2fe9d556..e1b5fa835c3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -16,9 +16,9 @@ #include #include +#include #include #include - namespace DB { class Context; @@ -29,11 +29,23 @@ namespace DM class DMFileBlockInputStream : public SkippableBlockInputStream { public: - explicit DMFileBlockInputStream(DMFileReader && reader_) + explicit DMFileBlockInputStream(DMFileReader && reader_, bool enable_read_thread_) : reader(std::move(reader_)) - {} + , enable_read_thread(enable_read_thread_) + { + if (enable_read_thread) + { + DMFileReaderPool::instance().add(reader); + } + } - ~DMFileBlockInputStream() = default; + ~DMFileBlockInputStream() + { + if (enable_read_thread) + { + DMFileReaderPool::instance().del(reader); + } + } String getName() const override { return "DMFile"; } @@ -45,6 +57,7 @@ class DMFileBlockInputStream : public SkippableBlockInputStream private: DMFileReader reader; + bool enable_read_thread; }; using DMFileBlockInputStreamPtr = std::shared_ptr; @@ -129,6 +142,7 @@ class DMFileBlockInputStreamBuilder enable_column_cache = settings.dt_enable_stable_column_cache; aio_threshold = settings.min_bytes_to_use_direct_io; max_read_buffer_size = settings.max_read_buffer_size; + enable_read_thread = settings.dt_enable_read_thread; return *this; } DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_) @@ -159,7 +173,7 @@ class DMFileBlockInputStreamBuilder size_t max_read_buffer_size{}; size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD; bool read_one_pack_every_time = false; - + bool enable_read_thread = false; String tracing_id; }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 779de36da63..b00e3cc5c10 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -226,7 +226,8 @@ DMFileReader::DMFileReader( const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, bool read_one_pack_every_time_, - const String & tracing_id_) + const String & tracing_id_, + bool enable_col_sharing_cache) : dmfile(dmfile_) , read_columns(read_columns_) , is_common_handle(is_common_handle_) @@ -266,6 +267,14 @@ DMFileReader::DMFileReader( const auto data_type = dmfile->getColumnStat(cd.id).type; data_type->enumerateStreams(callback, {}); } + if (enable_col_sharing_cache) + { + col_data_cache = std::make_unique(path(), read_columns, log); + for (const auto & cd : read_columns) + { + last_read_from_cache[cd.id] = false; + } + } } bool DMFileReader::shouldSeek(size_t pack_id) @@ -305,7 +314,6 @@ Block DMFileReader::read() const auto & use_packs = pack_filter.getUsePacks(); if (next_pack_id >= use_packs.size()) return {}; - // Find max continuing rows we can read. size_t start_pack_id = next_pack_id; // When single_file_mode is true, or read_one_pack_every_time is true, we can just read one pack every time. @@ -440,7 +448,9 @@ Block DMFileReader::read() { rows_count += pack_stats[cursor].rows; } - readFromDisk(cd, column, range.first, rows_count, skip_packs_by_column[i], single_file_mode); + ColumnPtr col; + readColumn(cd, col, range.first, range.second - range.first, rows_count, skip_packs_by_column[i], single_file_mode); + column->insertRangeFrom(*col, 0, col->size()); skip_packs_by_column[i] = 0; } else @@ -462,8 +472,8 @@ Block DMFileReader::read() else { auto data_type = dmfile->getColumnStat(cd.id).type; - auto column = data_type->createColumn(); - readFromDisk(cd, column, start_pack_id, read_rows, skip_packs_by_column[i], single_file_mode); + ColumnPtr column; + readColumn(cd, column, start_pack_id, read_packs, read_rows, skip_packs_by_column[i], single_file_mode); auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id}); @@ -534,5 +544,58 @@ void DMFileReader::readFromDisk( } } +void DMFileReader::readColumn(ColumnDefine & column_define, + ColumnPtr & column, + size_t start_pack_id, + size_t pack_count, + size_t read_rows, + size_t skip_packs, + bool force_seek) +{ + if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column)) + { + auto data_type = dmfile->getColumnStat(column_define.id).type; + auto col = data_type->createColumn(); + readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, force_seek || last_read_from_cache[column_define.id]); + column = std::move(col); + last_read_from_cache[column_define.id] = false; + } + else + { + last_read_from_cache[column_define.id] = true; + } + + if (col_data_cache != nullptr) + { + DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column); + } +} + +void DMFileReader::addCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, ColumnPtr & col) +{ + if (col_data_cache == nullptr) + { + return; + } + if (next_pack_id >= start_pack_id + pack_count) + { + col_data_cache->addStale(); + } + else + { + col_data_cache->add(col_id, start_pack_id, pack_count, col); + } +} + +bool DMFileReader::getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col) +{ + if (col_data_cache == nullptr) + { + return false; + } + auto found = col_data_cache->get(col_id, start_pack_id, pack_count, read_rows, col, dmfile->getColumnStat(col_id).type); + col_data_cache->del(col_id, next_pack_id); + return found; +} } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index c04c93871a7..71f6df84bf9 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -91,7 +92,8 @@ class DMFileReader const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, bool read_one_pack_every_time_, - const String & tracing_id_); + const String & tracing_id_, + bool enable_col_sharing_cache); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -99,6 +101,15 @@ class DMFileReader /// Return false if it is the end of stream. bool getSkippedRows(size_t & skip_rows); Block read(); + UInt64 fileId() const + { + return dmfile->fileId(); + } + std::string path() const + { + return dmfile->path(); + } + void addCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, ColumnPtr & col); private: bool shouldSeek(size_t pack_id); @@ -109,6 +120,14 @@ class DMFileReader size_t read_rows, size_t skip_packs, bool force_seek); + void readColumn(ColumnDefine & column_define, + ColumnPtr & column, + size_t start_pack_id, + size_t pack_count, + size_t read_rows, + size_t skip_packs, + bool force_seek); + bool getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col); private: DMFilePtr dmfile; @@ -146,6 +165,9 @@ class DMFileReader FileProviderPtr file_provider; LoggerPtr log; + + std::unique_ptr col_data_cache; + std::unordered_map last_read_from_cache; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp new file mode 100644 index 00000000000..fbecd79c6d8 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp @@ -0,0 +1,125 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB::DM +{ +// In Linux a numa node is represented by a device directory, such as '/sys/devices/system/node/node0', '/sys/devices/system/node/node01'. +static inline bool isNodeDir(const std::string & name) +{ + return name.size() > 4 && name.substr(0, 4) == "node" && std::all_of(name.begin() + 4, name.end(), [](unsigned char c) { return std::isdigit(c); }); +} + +// Under a numa node directory is CPU cores and memory, such as '/sys/devices/system/node/node0/cpu0' and '/sys/devices/system/node/node0/memory0'. +static inline bool isCPU(const std::string & name) +{ + return name.size() > 3 && name.substr(0, 3) == "cpu" && std::all_of(name.begin() + 3, name.end(), [](unsigned char c) { return std::isdigit(c); }); +} + +static inline int parseCPUNumber(const std::string & name) +{ + return std::stoi(name.substr(3)); +} + +// Scan the numa node directory and parse the CPU numbers. +static inline std::vector getCPUs(const std::string & dir_name) +{ + std::vector cpus; + Poco::File dir(dir_name); + Poco::DirectoryIterator end; + for (auto iter = Poco::DirectoryIterator(dir); iter != end; ++iter) + { + if (isCPU(iter.name())) + { + cpus.push_back(parseCPUNumber(iter.name())); + } + } + return cpus; +} + +// TODO: What if the process running in the container and the CPU is limited. + +// Scan the device directory and parse the CPU information. +std::vector> getLinuxNumaNodes() +{ + static const std::string nodes_dir_name{"/sys/devices/system/node"}; + static const std::string cpus_dir_name{"/sys/devices/system/cpu"}; + + std::vector> numa_nodes; + Poco::File nodes(nodes_dir_name); + if (!nodes.exists() || !nodes.isDirectory()) + { + auto cpus = getCPUs(cpus_dir_name); + if (cpus.empty()) + { + throw Exception("Not recognize CPU: " + cpus_dir_name); + } + numa_nodes.push_back(std::move(cpus)); + } + else + { + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator iter(nodes); iter != end; ++iter) + { + if (!isNodeDir(iter.name())) + { + continue; + } + auto dir_name = nodes_dir_name + "/" + iter.name(); + auto cpus = getCPUs(dir_name); + if (cpus.empty()) + { + throw Exception("Not recognize CPU: " + nodes_dir_name); + } + numa_nodes.push_back(std::move(cpus)); + } + } + if (numa_nodes.empty()) + { + throw Exception("Not recognize CPU"); + } + return numa_nodes; +} + +std::vector> getNumaNodes(Poco::Logger * log) +{ + try + { + return getLinuxNumaNodes(); + } + catch (Exception & e) + { + LOG_FMT_WARNING(log, "{}", e.message()); + } + catch (std::exception & e) + { + LOG_FMT_WARNING(log, "{}", e.what()); + } + catch (...) + { + LOG_FMT_WARNING(log, "Unknow Error"); + } + LOG_FMT_WARNING(log, "Cannot recognize the CPU NUMA infomation, use the CPU as 'one numa node'"); + std::vector> numa_nodes(1); // "One numa node" + return numa_nodes; +} +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/CPU.h b/dbms/src/Storages/DeltaMerge/ReadThread/CPU.h new file mode 100644 index 00000000000..2d1f564a43a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/CPU.h @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include + +#include + +namespace DB::DM +{ +// `getNumaNodes` returns cpus of each Numa node. +std::vector> getNumaNodes(Poco::Logger * log); +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h b/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h new file mode 100644 index 00000000000..a9a976f2f30 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h @@ -0,0 +1,104 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include + +namespace DB::DM +{ +// CircularScanList is a special circular list. +// It remembers the location of the last iteration and will check whether the object is expired. +template +class CircularScanList +{ +public: + using Value = std::shared_ptr; + + CircularScanList() + : last_itr(l.end()) + {} + + void add(const Value & ptr) + { + l.push_back(ptr); + } + + Value next() + { + last_itr = nextItr(last_itr); + while (!l.empty()) + { + auto ptr = *last_itr; + if (ptr->valid()) + { + return ptr; + } + else + { + last_itr = l.erase(last_itr); + if (last_itr == l.end()) + { + last_itr = l.begin(); + } + } + } + return nullptr; + } + + std::pair count(int64_t table_id) const + { + int64_t valid = 0; + int64_t invalid = 0; + for (const auto & p : l) + { + if (table_id == 0 || p->tableId() == table_id) + { + p->valid() ? valid++ : invalid++; + } + } + return {valid, invalid}; + } + + Value get(uint64_t pool_id) const + { + for (const auto & p : l) + { + if (p->poolId() == pool_id) + { + return p; + } + } + return nullptr; + } + +private: + using Iter = typename std::list::iterator; + Iter nextItr(Iter itr) + { + if (itr == l.end() || std::next(itr) == l.end()) + { + return l.begin(); + } + else + { + return std::next(itr); + } + } + + std::list l; + Iter last_itr; +}; + +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp new file mode 100644 index 00000000000..13b187167b6 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp @@ -0,0 +1,63 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include + +namespace DB::DM +{ +DMFileReaderPool & DMFileReaderPool::instance() +{ + static DMFileReaderPool reader_pool; + return reader_pool; +} + +void DMFileReaderPool::add(DMFileReader & reader) +{ + std::lock_guard lock(mtx); + readers[reader.path()].insert(&reader); +} + +void DMFileReaderPool::del(DMFileReader & reader) +{ + std::lock_guard lock(mtx); + auto itr = readers.find(reader.path()); + if (itr == readers.end()) + { + return; + } + itr->second.erase(&reader); + if (itr->second.empty()) + { + readers.erase(itr); + } +} + +void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col) +{ + std::lock_guard lock(mtx); + auto itr = readers.find(from_reader.path()); + if (itr == readers.end()) + { + return; + } + for (auto * r : itr->second) + { + if (&from_reader == r) + { + continue; + } + r->addCachedPacks(col_id, start, count, col); + } +} +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h new file mode 100644 index 00000000000..446e6777338 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -0,0 +1,234 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB::DM +{ +using PackId = size_t; + +enum class ColumnCacheStatus +{ + ADD_COUNT = 0, + ADD_STALE, + GET_MISS, + GET_PART, + GET_HIT, + GET_COPY, + + _TOTAL_COUNT, +}; + +class ColumnSharingCache +{ +public: + struct ColumnData + { + size_t pack_count{0}; + ColumnPtr col_data; + }; + + void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment(); + std::lock_guard lock(mtx); + auto & value = packs[start_pack_id]; + if (value.pack_count < pack_count) + { + value.pack_count = pack_count; + value.col_data = col_data; + } + } + + ColumnCacheStatus get(size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type) + { + ColumnCacheStatus status; + std::lock_guard lock(mtx); + auto target = packs.find(start_pack_id); + if (target == packs.end()) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_get_cache_miss).Increment(); + status = ColumnCacheStatus::GET_MISS; + } + else if (target->second.pack_count < pack_count) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_get_cache_part).Increment(); + status = ColumnCacheStatus::GET_PART; + } + else if (target->second.pack_count == pack_count) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_get_cache_hit).Increment(); + status = ColumnCacheStatus::GET_HIT; + col_data = target->second.col_data; + } + else + { + GET_METRIC(tiflash_storage_read_thread_counter, type_get_cache_copy).Increment(); + status = ColumnCacheStatus::GET_COPY; + auto column = data_type->createColumn(); + column->insertRangeFrom(*(target->second.col_data), 0, read_rows); + col_data = std::move(column); + } + return status; + } + + void del(size_t upper_start_pack_id) + { + std::lock_guard lock(mtx); + for (auto itr = packs.begin(); itr != packs.end();) + { + if (itr->first + itr->second.pack_count <= upper_start_pack_id) + { + itr = packs.erase(itr); + } + else + { + break; + } + } + } + +private: + std::mutex mtx; + // start_pack_id -> + std::map packs; +}; + +// `ColumnSharingCacheMap` is a per DMFileReader cache. +// It store a ColumnSharingCache(std::map) for each column. +// When read threads are enable, each DMFileReader will be add to DMFileReaderPool, +// so we can find DMFileReaders of the same DMFile easily. +// Each DMFileReader will add the block that they read to other DMFileReaders' cache if the start_pack_id of the block +// is greater or equal than the next_pack_id of the DMFileReader —— This means that the DMFileReader maybe read the block later. +class ColumnSharingCacheMap +{ +public: + ColumnSharingCacheMap(const std::string & dmfile_name_, const ColumnDefines & cds, LoggerPtr & log_) + : dmfile_name(dmfile_name_) + , stats(static_cast(ColumnCacheStatus::_TOTAL_COUNT)) + , log(log_) + { + for (const auto & cd : cds) + { + addColumn(cd.id); + } + } + + ~ColumnSharingCacheMap() + { + LOG_FMT_DEBUG(log, "dmfile {} stat {}", dmfile_name, statString()); + } + + // `addStale` just do some statistics. + void addStale() + { + GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_stale).Increment(); + stats[static_cast(ColumnCacheStatus::ADD_STALE)].fetch_add(1, std::memory_order_relaxed); + } + + void add(int64_t col_id, size_t start_pack_id, size_t pack_count, ColumnPtr & col_data) + { + stats[static_cast(ColumnCacheStatus::ADD_COUNT)].fetch_add(1, std::memory_order_relaxed); + auto itr = cols.find(col_id); + if (itr == cols.end()) + { + return; + } + itr->second.add(start_pack_id, pack_count, col_data); + } + + bool get(int64_t col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type) + { + auto status = ColumnCacheStatus::GET_MISS; + auto itr = cols.find(col_id); + if (itr != cols.end()) + { + status = itr->second.get(start_pack_id, pack_count, read_rows, col_data, data_type); + } + stats[static_cast(status)].fetch_add(1, std::memory_order_relaxed); + return status == ColumnCacheStatus::GET_HIT || status == ColumnCacheStatus::GET_COPY; + } + + // Each read operator of DMFileReader will advance the next_pack_id. + // So we can delete packs if their pack_id is less than next_pack_id to save memory. + void del(int64_t col_id, size_t upper_start_pack_id) + { + auto itr = cols.find(col_id); + if (itr != cols.end()) + { + itr->second.del(upper_start_pack_id); + } + } + +private: + void addColumn(int64_t col_id) + { + cols[col_id]; + } + std::string statString() const + { + auto add_count = stats[static_cast(ColumnCacheStatus::ADD_COUNT)].load(std::memory_order_relaxed); + auto add_stale = stats[static_cast(ColumnCacheStatus::ADD_STALE)].load(std::memory_order_relaxed); + auto get_miss = stats[static_cast(ColumnCacheStatus::GET_MISS)].load(std::memory_order_relaxed); + auto get_part = stats[static_cast(ColumnCacheStatus::GET_PART)].load(std::memory_order_relaxed); + auto get_hit = stats[static_cast(ColumnCacheStatus::GET_HIT)].load(std::memory_order_relaxed); + auto get_copy = stats[static_cast(ColumnCacheStatus::GET_COPY)].load(std::memory_order_relaxed); + auto add_total = add_count + add_stale; + auto get_cached = get_hit + get_copy; + auto get_total = get_miss + get_part + get_hit + get_copy; + return fmt::format("add_count {} add_stale {} add_ratio {} get_miss {} get_part {} get_hit {} get_copy {} cached_ratio {}", + add_count, + add_stale, + add_total > 0 ? add_count * 1.0 / add_total : 0, + get_miss, + get_part, + get_hit, + get_copy, + get_total > 0 ? get_cached * 1.0 / get_total : 0); + } + std::string dmfile_name; + std::unordered_map cols; + std::vector> stats; + LoggerPtr log; +}; + +class DMFileReader; + +// DMFileReaderPool holds all the DMFileReader objects, so we can easily find DMFileReader objects with the same DMFile ID. +// When a DMFileReader object successfully reads a column's packs, it will try to put these packs into other DMFileReader objects' cache. +class DMFileReaderPool +{ +public: + static DMFileReaderPool & instance(); + DMFileReaderPool() = default; + ~DMFileReaderPool() = default; + DISALLOW_COPY_AND_MOVE(DMFileReaderPool); + + void add(DMFileReader & reader); + void del(DMFileReader & reader); + void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); + +private: + std::mutex mtx; + std::unordered_map> readers; +}; + +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp new file mode 100644 index 00000000000..c81fcfe8ccd --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp @@ -0,0 +1,123 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +namespace DB::DM +{ +int MergedTask::readBlock() +{ + initOnce(); + return readOneBlock(); +} + +void MergedTask::initOnce() +{ + if (inited) + { + return; + } + + for (cur_idx = 0; cur_idx < static_cast(units.size()); cur_idx++) + { + auto & [pool, task, stream] = units[cur_idx]; + if (!pool->valid()) + { + setStreamFinished(cur_idx); + continue; + } + stream = pool->buildInputStream(task); + } + + inited = true; +} + +int MergedTask::readOneBlock() +{ + int read_block_count = 0; + for (cur_idx = 0; cur_idx < static_cast(units.size()); cur_idx++) + { + if (isStreamFinished(cur_idx)) + { + continue; + } + + auto & [pool, task, stream] = units[cur_idx]; + + if (!pool->valid()) + { + setStreamFinished(cur_idx); + continue; + } + + if (pool->getFreeBlockSlots() <= 0) + { + continue; + } + + if (pool->readOneBlock(stream, task->segment)) + { + read_block_count++; + } + else + { + setStreamFinished(cur_idx); + } + } + return read_block_count; +} + +void MergedTask::setException(const DB::Exception & e) +{ + if (cur_idx >= 0 && cur_idx < static_cast(units.size())) + { + auto & pool = units[cur_idx].pool; + if (pool != nullptr) + { + pool->setException(e); + } + } + else + { + for (auto & unit : units) + { + if (unit.pool != nullptr) + { + unit.pool->setException(e); + } + } + } +} + +MergedTaskPtr MergedTaskPool::pop(uint64_t pool_id) +{ + std::lock_guard lock(mtx); + MergedTaskPtr target; + for (auto itr = merged_task_pool.begin(); itr != merged_task_pool.end(); ++itr) + { + if ((*itr)->containPool(pool_id)) + { + target = *itr; + merged_task_pool.erase(itr); + break; + } + } + return target; +} + +void MergedTaskPool::push(const MergedTaskPtr & t) +{ + std::lock_guard lock(mtx); + merged_task_pool.push_back(t); +} +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h new file mode 100644 index 00000000000..8233cd2e431 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h @@ -0,0 +1,156 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once +#include +#include +#include + +namespace DB::DM +{ +struct MergedUnit +{ + MergedUnit(const SegmentReadTaskPoolPtr & pool_, const SegmentReadTaskPtr & task_) + : pool(pool_) + , task(task_) + {} + + SegmentReadTaskPoolPtr pool; // The information of a read request. + SegmentReadTaskPtr task; // The information of a segment that want to read. + BlockInputStreamPtr stream; // BlockInputStream of a segment, will be created by read threads. +}; + +// MergedTask merges the same segment of different SegmentReadTaskPools. +// Read segment input streams of different SegmentReadTaskPools sequentially to improve cache sharing. +// MergedTask is NOT thread-safe. +class MergedTask +{ +public: + static int64_t getPassiveMergedSegments() + { + return passive_merged_segments.load(std::memory_order_relaxed); + } + + MergedTask(uint64_t seg_id_, std::vector && units_) + : seg_id(seg_id_) + , units(std::move(units_)) + , inited(false) + , cur_idx(-1) + , finished_count(0) + , log(&Poco::Logger::get("MergedTask")) + { + passive_merged_segments.fetch_add(units.size() - 1, std::memory_order_relaxed); + GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Increment(); + } + ~MergedTask() + { + passive_merged_segments.fetch_sub(units.size() - 1, std::memory_order_relaxed); + GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Decrement(); + GET_METRIC(tiflash_storage_read_thread_seconds, type_merged_task).Observe(sw.elapsedSeconds()); + } + + int readBlock(); + + bool allStreamsFinished() const + { + return finished_count >= units.size(); + } + + uint64_t getSegmentId() const + { + return seg_id; + } + + size_t getPoolCount() const + { + return units.size(); + } + + std::vector getPoolIds() const + { + std::vector ids; + ids.reserve(units.size()); + for (const auto & unit : units) + { + if (unit.pool != nullptr) + { + ids.push_back(unit.pool->poolId()); + } + } + return ids; + } + + bool containPool(uint64_t pool_id) const + { + for (const auto & unit : units) + { + if (unit.pool != nullptr && unit.pool->poolId() == pool_id) + { + return true; + } + } + return false; + } + void setException(const DB::Exception & e); + +private: + void initOnce(); + int readOneBlock(); + + bool isStreamFinished(size_t i) const + { + return units[i].pool == nullptr && units[i].task == nullptr && units[i].stream == nullptr; + } + + void setStreamFinished(size_t i) + { + if (!isStreamFinished(i)) + { + units[i].pool = nullptr; + units[i].task = nullptr; + units[i].stream = nullptr; + finished_count++; + } + } + + uint64_t seg_id; + std::vector units; + bool inited; + int cur_idx; + size_t finished_count; + Poco::Logger * log; + Stopwatch sw; + inline static std::atomic passive_merged_segments{0}; +}; + +using MergedTaskPtr = std::shared_ptr; + +// MergedTaskPool is a MergedTask list. +// When SegmentReadTaskPool's block queue reaching limit, read thread will push MergedTask into it. +// The scheduler thread will try to pop a MergedTask of a related pool_id before build a new MergedTask object. +class MergedTaskPool +{ +public: + MergedTaskPool() + : log(&Poco::Logger::get("MergedTaskPool")) + {} + + MergedTaskPtr pop(uint64_t pool_id); + void push(const MergedTaskPtr & t); + +private: + std::mutex mtx; + std::list merged_task_pool; + Poco::Logger * log; +}; +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp new file mode 100644 index 00000000000..9bec9e18afa --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -0,0 +1,211 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include + +namespace DB::DM +{ +SegmentReadTaskScheduler::SegmentReadTaskScheduler() + : stop(false) + , log(&Poco::Logger::get("SegmentReadTaskScheduler")) +{ + sched_thread = std::thread(&SegmentReadTaskScheduler::schedThread, this); +} + +SegmentReadTaskScheduler::~SegmentReadTaskScheduler() +{ + setStop(); + sched_thread.join(); +} + +void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) +{ + std::lock_guard lock(mtx); + + read_pools.add(pool); + + std::unordered_set seg_ids; + for (const auto & task : pool->getTasks()) + { + auto seg_id = task->segment->segmentId(); + merging_segments[pool->tableId()][seg_id].push_back(pool->poolId()); + if (!seg_ids.insert(seg_id).second) + { + throw DB::Exception(fmt::format("Not support split segment task. seg_ids {} => seg_id {} already exist.", seg_ids, seg_id)); + } + } + auto block_slots = pool->getFreeBlockSlots(); + auto [unexpired, expired] = read_pools.count(pool->tableId()); + LOG_FMT_DEBUG(log, "add pool {} table {} block_slots {} segment count {} segments {} unexpired pool {} expired pool {}", // + pool->poolId(), + pool->tableId(), + block_slots, + seg_ids.size(), + seg_ids, + unexpired, + expired); +} + +std::pair SegmentReadTaskScheduler::scheduleMergedTask() +{ + std::lock_guard lock(mtx); + auto pool = scheduleSegmentReadTaskPoolUnlock(); + if (pool == nullptr) + { + // No SegmentReadTaskPool to schedule. Maybe no read request or + // block queue of each SegmentReadTaskPool reaching the limit. + return {nullptr, false}; + } + + auto merged_task = merged_task_pool.pop(pool->poolId()); + if (merged_task != nullptr) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_from_cache).Increment(); + return {merged_task, true}; + } + + auto segment = scheduleSegmentUnlock(pool); + if (!segment) + { + // The number of active segments reaches the limit. + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment(); + return {nullptr, true}; + } + auto pools = getPoolsUnlock(segment->second); + if (pools.empty()) + { + // Maybe SegmentReadTaskPools are expired because of upper threads finish the request. + return {nullptr, true}; + } + + std::vector units; + units.reserve(pools.size()); + for (auto & pool : pools) + { + units.emplace_back(pool, pool->getTask(segment->first)); + } + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_new_task).Increment(); + + return {std::make_shared(segment->first, std::move(units)), true}; +} + +SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector & pool_ids) +{ + SegmentReadTaskPools pools; + pools.reserve(pool_ids.size()); + for (uint64_t id : pool_ids) + { + auto p = read_pools.get(id); + if (p != nullptr) + { + pools.push_back(p); + } + } + return pools; +} + +SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock() +{ + auto [unexpired, expired] = read_pools.count(0); + for (int64_t i = 0; i < unexpired; i++) + { + auto pool = read_pools.next(); + if (pool != nullptr && pool->getFreeBlockSlots() > 0) + { + return pool; + } + } + if (unexpired == 0) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); + } + else + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment(); + } + return nullptr; +} + +std::optional>> SegmentReadTaskScheduler::scheduleSegmentUnlock(const SegmentReadTaskPoolPtr & pool) +{ + auto [unexpired, expired] = read_pools.count(pool->tableId()); + auto expected_merge_seg_count = std::min(unexpired, 2); + auto itr = merging_segments.find(pool->tableId()); + if (itr == merging_segments.end()) + { + // No segment of tableId left. + return std::nullopt; + } + std::optional>> result; + auto & segments = itr->second; + auto target = pool->scheduleSegment(segments, expected_merge_seg_count); + if (target != segments.end()) + { + if (MergedTask::getPassiveMergedSegments() < 100 || target->second.size() == 1) + { + result = *target; + segments.erase(target); + if (segments.empty()) + { + merging_segments.erase(itr); + } + } + else + { + result = std::pair{target->first, std::vector(1, pool->poolId())}; + auto mutable_target = segments.find(target->first); + auto itr = std::find(mutable_target->second.begin(), mutable_target->second.end(), pool->poolId()); + *itr = mutable_target->second.back(); // SegmentReadTaskPool::scheduleSegment ensures `pool->poolId` must exists in `target`. + mutable_target->second.resize(mutable_target->second.size() - 1); + } + } + return result; +} + +void SegmentReadTaskScheduler::setStop() +{ + stop.store(true, std::memory_order_relaxed); +} + +bool SegmentReadTaskScheduler::isStop() const +{ + return stop.load(std::memory_order_relaxed); +} + +bool SegmentReadTaskScheduler::schedule() +{ + Stopwatch sw; + auto [merged_task, run_sche] = scheduleMergedTask(); + if (merged_task != nullptr) + { + LOG_FMT_DEBUG(log, "scheduleMergedTask seg_id {} pools {} => {} ms", merged_task->getSegmentId(), merged_task->getPoolIds(), sw.elapsedMilliseconds()); + SegmentReaderPoolManager::instance().addTask(std::move(merged_task)); + } + return run_sche; +} + +void SegmentReadTaskScheduler::schedThread() +{ + while (!isStop()) + { + if (!schedule()) + { + using namespace std::chrono_literals; + std::this_thread::sleep_for(2ms); + } + } +} + +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h new file mode 100644 index 00000000000..51aef4d176c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -0,0 +1,83 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include + +#include +namespace DB::DM +{ +using SegmentReadTaskPoolList = CircularScanList; + +// SegmentReadTaskScheduler is a global singleton. +// All SegmentReadTaskPool will be added to it and be scheduled by it. + +// 1. DeltaMergeStore::read/readRaw will call SegmentReadTaskScheduler::add to add a SegmentReadTaskPool object to the `read_pools` list and +// index segments information into `merging_segments`. +// 2. A schedule-thread will scheduling read tasks: +// a. It scans the read_pools list and choosing a SegmentReadTaskPool. +// b. Chooses a segment of the SegmentReadTaskPool and build a MergedTask. +// c. Sends the MergedTask to read threads(SegmentReader). +class SegmentReadTaskScheduler +{ +public: + static SegmentReadTaskScheduler & instance() + { + static SegmentReadTaskScheduler scheduler; + return scheduler; + } + + ~SegmentReadTaskScheduler(); + DISALLOW_COPY_AND_MOVE(SegmentReadTaskScheduler); + + // Add SegmentReadTaskPool to `read_pools` and index segments into merging_segments. + void add(const SegmentReadTaskPoolPtr & pool); + + void pushMergedTask(const MergedTaskPtr & p) + { + merged_task_pool.push(p); + } + +private: + SegmentReadTaskScheduler(); + + // Choose segment to read. + // Returns + std::pair scheduleMergedTask(); + + void setStop(); + bool isStop() const; + bool schedule(); + void schedThread(); + + SegmentReadTaskPools getPoolsUnlock(const std::vector & pool_ids); + // + std::optional>> scheduleSegmentUnlock(const SegmentReadTaskPoolPtr & pool); + SegmentReadTaskPoolPtr scheduleSegmentReadTaskPoolUnlock(); + + std::mutex mtx; + SegmentReadTaskPoolList read_pools; + // table_id -> {seg_id -> pool_ids, seg_id -> pool_ids, ...} + std::unordered_map>> merging_segments; + + MergedTaskPool merged_task_pool; + + std::atomic stop; + std::thread sched_thread; + + Poco::Logger * log; +}; +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp new file mode 100644 index 00000000000..77806fdef72 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp @@ -0,0 +1,233 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include +#include + +#include + +namespace DB::DM +{ +class SegmentReader +{ + inline static const std::string name{"SegmentReader"}; + +public: + SegmentReader(WorkQueue & task_queue_, const std::vector & cpus_) + : task_queue(task_queue_) + , stop(false) + , log(&Poco::Logger::get(name)) + , cpus(cpus_) + { + t = std::thread(&SegmentReader::run, this); + } + + void setStop() + { + stop.store(true, std::memory_order_relaxed); + } + + ~SegmentReader() + { + LOG_FMT_DEBUG(log, "SegmentReader stop begin"); + t.join(); + LOG_FMT_DEBUG(log, "SegmentReader stop end"); + } + + std::thread::id getId() const + { + return t.get_id(); + } + +private: + void setCPUAffinity() + { + if (cpus.empty()) + { + return; + } +#ifdef __linux__ + cpu_set_t cpu_set; + CPU_ZERO(&cpu_set); + for (int i : cpus) + { + CPU_SET(i, &cpu_set); + } + int ret = sched_setaffinity(0, sizeof(cpu_set), &cpu_set); + if (ret != 0) + { + LOG_FMT_ERROR(log, "sched_setaffinity fail: {}", std::strerror(errno)); + throw Exception(fmt::format("sched_setaffinity fail: {}", std::strerror(errno))); + } + LOG_FMT_DEBUG(log, "sched_setaffinity cpus {} succ", cpus); +#endif + } + + bool isStop() + { + return stop.load(std::memory_order_relaxed); + } + + void readSegments() + { + MergedTaskPtr merged_task; + try + { + if (!task_queue.pop(merged_task)) + { + LOG_FMT_INFO(log, "pop fail, stop {}", isStop()); + return; + } + + SCOPE_EXIT({ + if (!merged_task->allStreamsFinished()) + { + SegmentReadTaskScheduler::instance().pushMergedTask(merged_task); + } + }); + + int read_count = 0; + while (!merged_task->allStreamsFinished() && !isStop()) + { + auto c = merged_task->readBlock(); + read_count += c; + if (c <= 0) + { + break; + } + } + if (read_count <= 0) + { + LOG_FMT_DEBUG(log, "pool {} seg_id {} read_count {}", merged_task->getPoolIds(), merged_task->getSegmentId(), read_count); + } + } + catch (DB::Exception & e) + { + LOG_FMT_ERROR(log, "ErrMsg: {} StackTrace {}", e.message(), e.getStackTrace().toString()); + if (merged_task != nullptr) + { + merged_task->setException(e); + } + } + catch (std::exception & e) + { + LOG_FMT_ERROR(log, "ErrMsg: {}", e.what()); + if (merged_task != nullptr) + { + merged_task->setException(DB::Exception(e.what())); + } + } + catch (...) + { + tryLogCurrentException("exception thrown in SegmentReader"); + if (merged_task != nullptr) + { + merged_task->setException(DB::Exception("unknown exception thrown in SegmentReader")); + } + } + } + + void run() + { + setCPUAffinity(); + setThreadName(name.c_str()); + while (!isStop()) + { + readSegments(); + } + } + + WorkQueue & task_queue; + std::atomic stop; + Poco::Logger * log; + std::thread t; + std::vector cpus; +}; + +void SegmentReaderPool::addTask(MergedTaskPtr && task) +{ + if (!task_queue.push(std::forward(task), nullptr)) + { + throw Exception("addTask fail"); + } +} + +SegmentReaderPool::SegmentReaderPool(int thread_count, const std::vector & cpus) + : log(&Poco::Logger::get("SegmentReaderPool")) +{ + LOG_FMT_INFO(log, "Create SegmentReaderPool thread_count {} cpus {} start", thread_count, cpus); + for (int i = 0; i < thread_count; i++) + { + readers.push_back(std::make_unique(task_queue, cpus)); + } + LOG_FMT_INFO(log, "Create SegmentReaderPool thread_count {} cpus {} end", thread_count, cpus); +} + +SegmentReaderPool::~SegmentReaderPool() +{ + for (auto & reader : readers) + { + reader->setStop(); + } + task_queue.finish(); +} + +std::vector SegmentReaderPool::getReaderIds() const +{ + std::vector ids; + for (const auto & r : readers) + { + ids.push_back(r->getId()); + } + return ids; +} + +SegmentReaderPoolManager::SegmentReaderPoolManager() + : log(&Poco::Logger::get("SegmentReaderPoolManager")) +{} + +SegmentReaderPoolManager::~SegmentReaderPoolManager() = default; + +void SegmentReaderPoolManager::init(const ServerInfo & server_info) +{ + auto numa_nodes = getNumaNodes(log); + LOG_FMT_INFO(log, "numa_nodes {} => {}", numa_nodes.size(), numa_nodes); + for (const auto & node : numa_nodes) + { + int thread_count = node.empty() ? server_info.cpu_info.logical_cores : node.size(); + reader_pools.push_back(std::make_unique(thread_count, node)); + auto ids = reader_pools.back()->getReaderIds(); + reader_ids.insert(ids.begin(), ids.end()); + } + LOG_FMT_INFO(log, "readers count {}", reader_ids.size()); +} + +void SegmentReaderPoolManager::addTask(MergedTaskPtr && task) +{ + static std::hash hash_func; + auto idx = hash_func(task->getSegmentId()) % reader_pools.size(); + reader_pools[idx]->addTask(std::move(task)); +} + +// `isSegmentReader` checks whether this thread is a `SegmentReader`. +// Use this function in DMFileBlockInputSteam to check whether enable read thread of this read request, +// Maybe we can pass the argument from DeltaMerge -> SegmentReadTaskPool -> ... -> DMFileBlockInputSteam. +// But this is a long code path and can affect a lot a code. +bool SegmentReaderPoolManager::isSegmentReader() const +{ + return reader_ids.find(std::this_thread::get_id()) != reader_ids.end(); +} +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h new file mode 100644 index 00000000000..5408dcb4fd6 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include + +namespace DB::DM +{ +class MergedTask; +using MergedTaskPtr = std::shared_ptr; + +class SegmentReader; +using SegmentReaderUPtr = std::unique_ptr; + +class SegmentReaderPool +{ +public: + SegmentReaderPool(int thread_count, const std::vector & cpus); + ~SegmentReaderPool(); + SegmentReaderPool(const SegmentReaderPool &) = delete; + SegmentReaderPool & operator=(const SegmentReaderPool &) = delete; + SegmentReaderPool(SegmentReaderPool &&) = delete; + SegmentReaderPool & operator=(SegmentReaderPool &&) = delete; + + void addTask(MergedTaskPtr && task); + std::vector getReaderIds() const; + +private: + void init(int thread_count, const std::vector & cpus); + + WorkQueue task_queue; + std::vector readers; + Poco::Logger * log; +}; + +// SegmentReaderPoolManager is a NUMA-aware singleton that manages several SegmentReaderPool objects. +// The number of SegmentReadPool object is the same as the number of CPU NUMA node. +// Thread number of a SegmentReadPool object is the same as the number of CPU logical core of a CPU NUMA node. +// Function `addTask` dispatches MergedTask to SegmentReadPool by their segment id, so a segment read task +// wouldn't be processed across NUMA nodes. +class SegmentReaderPoolManager +{ +public: + static SegmentReaderPoolManager & instance() + { + static SegmentReaderPoolManager pool_manager; + return pool_manager; + } + void init(const ServerInfo & server_info); + ~SegmentReaderPoolManager(); + DISALLOW_COPY_AND_MOVE(SegmentReaderPoolManager); + + void addTask(MergedTaskPtr && task); + bool isSegmentReader() const; + +private: + SegmentReaderPoolManager(); + std::vector> reader_pools; + std::unordered_set reader_ids; + Poco::Logger * log; +}; + +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h new file mode 100644 index 00000000000..5bdbc7ed59d --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -0,0 +1,131 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB::FailPoints +{ +extern const char pause_when_reading_from_dt_stream[]; +} + +namespace DB::DM +{ +class UnorderedInputStream : public IProfilingBlockInputStream +{ + static constexpr auto NAME = "UnorderedInputStream"; + +public: + UnorderedInputStream( + const SegmentReadTaskPoolPtr & task_pool_, + const ColumnDefines & columns_to_read_, + const int extra_table_id_index, + const TableID physical_table_id, + const String & req_id) + : task_pool(task_pool_) + , header(toEmptyBlock(columns_to_read_)) + , extra_table_id_index(extra_table_id_index) + , physical_table_id(physical_table_id) + , log(Logger::get(NAME, req_id)) + , ref_no(0) + { + if (extra_table_id_index != InvalidColumnID) + { + ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine(); + ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value}; + header.insert(extra_table_id_index, col); + } + ref_no = task_pool->increaseUnorderedInputStreamRefCount(); + LOG_FMT_DEBUG(log, "pool {} ref {} created", task_pool->poolId(), ref_no); + } + + ~UnorderedInputStream() + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_FMT_DEBUG(log, "pool {} ref {} destroy", task_pool->poolId(), ref_no); + } + + String getName() const override { return NAME; } + + Block getHeader() const override { return header; } + +protected: + Block readImpl() override + { + FilterPtr filter_ignored; + return readImpl(filter_ignored, false); + } + + // Currently, res_fiter and return_filter is unused. + Block readImpl(FilterPtr & /*res_filter*/, bool /*return_filter*/) override + { + if (done) + { + return {}; + } + while (true) + { + FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream); + + Block res; + task_pool->popBlock(res); + if (res) + { + if (extra_table_id_index != InvalidColumnID) + { + ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine(); + ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id}; + size_t row_number = res.rows(); + auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id)); + col.column = std::move(col_data); + res.insert(extra_table_id_index, std::move(col)); + } + if (!res.rows()) + { + continue; + } + else + { + total_rows += res.rows(); + return res; + } + } + else + { + done = true; + return {}; + } + } + } + + void readSuffixImpl() override + { + LOG_FMT_DEBUG(log, "pool {} ref {} finish read {} rows from storage", task_pool->poolId(), ref_no, total_rows); + } + +private: + SegmentReadTaskPoolPtr task_pool; + Block header; + // position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function. + const int extra_table_id_index; + bool done = false; + TableID physical_table_id; + LoggerPtr log; + int64_t ref_no; + size_t total_rows = 0; +}; +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h new file mode 100644 index 00000000000..7726617ebef --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -0,0 +1,174 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include + +#include +#include +#include + +namespace DB::DM +{ +template +class WorkQueue +{ + // Protects all member variable access + std::mutex mutex_; + std::condition_variable readerCv_; + std::condition_variable writerCv_; + std::condition_variable finishCv_; + std::queue queue_; + bool done_; + std::size_t maxSize_; + + std::size_t peak_queue_size; + int64_t pop_times; + int64_t pop_empty_times; + // Must have lock to call this function + bool full() const + { + if (maxSize_ == 0) + { + return false; + } + return queue_.size() >= maxSize_; + } + +public: + /** + * Constructs an empty work queue with an optional max size. + * If `maxSize == 0` the queue size is unbounded. + * + * @param maxSize The maximum allowed size of the work queue. + */ + WorkQueue(std::size_t maxSize = 0) + : done_(false) + , maxSize_(maxSize) + , peak_queue_size(0) + , pop_times(0) + , pop_empty_times(0) + {} + /** + * Push an item onto the work queue. Notify a single thread that work is + * available. If `finish()` has been called, do nothing and return false. + * If `push()` returns false, then `item` has not been copied from. + * + * @param item Item to push onto the queue. + * @returns True upon success, false if `finish()` has been called. An + * item was pushed iff `push()` returns true. + */ + template + bool push(U && item, size_t * size) + { + { + std::unique_lock lock(mutex_); + while (full() && !done_) + { + writerCv_.wait(lock); + } + if (done_) + { + return false; + } + queue_.push(std::forward(item)); + peak_queue_size = std::max(queue_.size(), peak_queue_size); + if (size != nullptr) + { + *size = queue_.size(); + } + } + readerCv_.notify_one(); + return true; + } + /** + * Attempts to pop an item off the work queue. It will block until data is + * available or `finish()` has been called. + * + * @param[out] item If `pop` returns `true`, it contains the popped item. + * If `pop` returns `false`, it is unmodified. + * @returns True upon success. False if the queue is empty and + * `finish()` has been called. + */ + bool pop(T & item) + { + { + std::unique_lock lock(mutex_); + pop_times++; + while (queue_.empty() && !done_) + { + pop_empty_times++; + readerCv_.wait(lock); + } + if (queue_.empty()) + { + assert(done_); + return false; + } + item = std::move(queue_.front()); + queue_.pop(); + } + writerCv_.notify_one(); + return true; + } + /** + * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. + * + * @param maxSize The new maximum queue size. + */ + void setMaxSize(std::size_t maxSize) + { + { + std::lock_guard lock(mutex_); + maxSize_ = maxSize; + } + writerCv_.notify_all(); + } + /** + * Promise that `push()` won't be called again, so once the queue is empty + * there will never any more work. + */ + void finish() + { + { + std::lock_guard lock(mutex_); + assert(!done_); + done_ = true; + } + readerCv_.notify_all(); + writerCv_.notify_all(); + finishCv_.notify_all(); + } + /// Blocks until `finish()` has been called (but the queue may not be empty). + void waitUntilFinished() + { + std::unique_lock lock(mutex_); + while (!done_) + { + finishCv_.wait(lock); + } + } + + size_t size() + { + std::lock_guard lock(mutex_); + return queue_.size(); + } + + std::tuple getStat() const + { + return std::tuple{pop_times, pop_empty_times, peak_queue_size}; + } +}; +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 0e61a01b01a..80ff8487c21 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -61,7 +61,7 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas return a->ranges.size() < b->ranges.size(); }; std::priority_queue, decltype(cmp)> largest_ranges_first(cmp); - for (auto & task : tasks) + for (const auto & task : tasks) largest_ranges_first.push(task); // Split the top task. @@ -95,4 +95,171 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas return result_tasks; } +BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) +{ + MemoryTrackerSetter setter(true, mem_tracker); + auto seg = t->segment; + BlockInputStreamPtr stream; + if (is_raw) + { + stream = seg->getInputStreamRaw(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, do_range_filter_for_raw); + } + else + { + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + stream = seg->getInputStream(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); + } + LOG_FMT_DEBUG(log, "getInputStream pool {} seg_id {} succ", pool_id, seg->segmentId()); + return stream; +} + +void SegmentReadTaskPool::finishSegment(const SegmentPtr & seg) +{ + after_segment_read(dm_context, seg); + bool pool_finished = false; + { + std::lock_guard lock(mutex); + active_segment_ids.erase(seg->segmentId()); + pool_finished = active_segment_ids.empty() && tasks.empty(); + } + LOG_FMT_DEBUG(log, "finishSegment pool {} seg_id {} pool_finished {}", pool_id, seg->segmentId(), pool_finished); + if (pool_finished) + { + q.finish(); + } +} + +SegmentReadTaskPtr SegmentReadTaskPool::getTask(uint64_t seg_id) +{ + std::lock_guard lock(mutex); + // TODO(jinhelin): use unordered_map + auto itr = std::find_if(tasks.begin(), tasks.end(), [seg_id](const SegmentReadTaskPtr & task) { return task->segment->segmentId() == seg_id; }); + if (itr == tasks.end()) + { + throw Exception(fmt::format("pool {} seg_id {} not found", pool_id, seg_id)); + } + auto t = *(itr); + tasks.erase(itr); + active_segment_ids.insert(seg_id); + return t; +} + +// Choose a segment to read. +// Returns . +std::unordered_map>::const_iterator SegmentReadTaskPool::scheduleSegment(const std::unordered_map> & segments, uint64_t expected_merge_count) +{ + auto target = segments.end(); + std::lock_guard lock(mutex); + if (getFreeActiveSegmentCountUnlock() <= 0) + { + return target; + } + for (const auto & task : tasks) + { + auto itr = segments.find(task->segment->segmentId()); + if (itr == segments.end()) + { + throw DB::Exception(fmt::format("seg_id {} not found from merging segments", task->segment->segmentId())); + } + if (std::find(itr->second.begin(), itr->second.end(), poolId()) == itr->second.end()) + { + throw DB::Exception(fmt::format("pool {} not found from merging segment {}=>{}", poolId(), itr->first, itr->second)); + } + if (target == segments.end() || itr->second.size() > target->second.size()) + { + target = itr; + } + if (target->second.size() >= expected_merge_count) + { + break; + } + } + return target; +} + +bool SegmentReadTaskPool::readOneBlock(BlockInputStreamPtr & stream, const SegmentPtr & seg) +{ + MemoryTrackerSetter setter(true, mem_tracker); + auto block = stream->read(); + if (block) + { + pushBlock(std::move(block)); + return true; + } + else + { + finishSegment(seg); + return false; + } +} + +void SegmentReadTaskPool::popBlock(Block & block) +{ + q.pop(block); + blk_stat.pop(block); + global_blk_stat.pop(block); + if (exceptionHappened()) + { + throw exception; + } +} + +void SegmentReadTaskPool::pushBlock(Block && block) +{ + blk_stat.push(block); + global_blk_stat.push(block); + q.push(std::move(block), nullptr); +} + +int64_t SegmentReadTaskPool::increaseUnorderedInputStreamRefCount() +{ + return unordered_input_stream_ref_count.fetch_add(1, std::memory_order_relaxed); +} +int64_t SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount() +{ + return unordered_input_stream_ref_count.fetch_sub(1, std::memory_order_relaxed); +} + +int64_t SegmentReadTaskPool::getFreeBlockSlots() const +{ + double block_slots_scale = dm_context->db_context.getSettingsRef().dt_block_slots_scale; + auto block_slots = static_cast(std::ceil(unordered_input_stream_ref_count.load(std::memory_order_relaxed) * block_slots_scale)); + if (block_slots < 3) + { + block_slots = 3; + } + return block_slots - blk_stat.pendingCount(); +} + +int64_t SegmentReadTaskPool::getFreeActiveSegmentCountUnlock() +{ + double active_segments_scale = dm_context->db_context.getSettingsRef().dt_active_segments_scale; + auto active_segment_limit = static_cast(std::ceil(unordered_input_stream_ref_count.load(std::memory_order_relaxed) * active_segments_scale)); + if (active_segment_limit < 2) + { + active_segment_limit = 2; + } + return active_segment_limit - static_cast(active_segment_ids.size()); +} + +bool SegmentReadTaskPool::exceptionHappened() const +{ + return exception_happened.load(std::memory_order_relaxed); +} + +bool SegmentReadTaskPool::valid() const +{ + return !exceptionHappened() && unordered_input_stream_ref_count.load(std::memory_order_relaxed) > 0; +} +void SegmentReadTaskPool::setException(const DB::Exception & e) +{ + std::lock_guard lock(mutex); + if (!exceptionHappened()) + { + exception = e; + exception_happened.store(true, std::memory_order_relaxed); + q.finish(); + } +} + } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index dc14716af80..e87099a22b4 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -13,23 +13,22 @@ // limitations under the License. #pragma once - +#include +#include +#include +#include #include -#include - namespace DB { namespace DM { -struct DMContext; struct SegmentReadTask; class Segment; using SegmentPtr = std::shared_ptr; struct SegmentSnapshot; using SegmentSnapshotPtr = std::shared_ptr; -using DMContextPtr = std::shared_ptr; using SegmentReadTaskPtr = std::shared_ptr; using SegmentReadTasks = std::list; using AfterSegmentRead = std::function; @@ -57,13 +56,113 @@ struct SegmentReadTask static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size); }; +class BlockStat +{ +public: + BlockStat() + : pending_count(0) + , pending_bytes(0) + , total_count(0) + , total_bytes(0) + {} + + void push(const Block & blk) + { + pending_count.fetch_add(1, std::memory_order_relaxed); + total_count.fetch_add(1, std::memory_order_relaxed); + + auto b = blk.bytes(); + pending_bytes.fetch_add(b, std::memory_order_relaxed); + total_bytes.fetch_add(b, std::memory_order_relaxed); + } + + void pop(const Block & blk) + { + if (likely(blk)) + { + pending_count.fetch_sub(1, std::memory_order_relaxed); + pending_bytes.fetch_sub(blk.bytes(), std::memory_order_relaxed); + } + } + + int64_t pendingCount() const + { + return pending_count.load(std::memory_order_relaxed); + } + + int64_t pendingBytes() const + { + return pending_bytes.load(std::memory_order_relaxed); + } + + int64_t totalCount() const + { + return total_count.load(std::memory_order_relaxed); + } + int64_t totalBytes() const + { + return total_bytes.load(std::memory_order_relaxed); + } + +private: + std::atomic pending_count; + std::atomic pending_bytes; + std::atomic total_count; + std::atomic total_bytes; +}; + class SegmentReadTaskPool : private boost::noncopyable { public: - explicit SegmentReadTaskPool(SegmentReadTasks && tasks_) - : tasks(std::move(tasks_)) + explicit SegmentReadTaskPool( + int64_t table_id_, + const DMContextPtr & dm_context_, + const ColumnDefines & columns_to_read_, + const RSOperatorPtr & filter_, + uint64_t max_version_, + size_t expected_block_size_, + bool is_raw_, + bool do_range_filter_for_raw_, + SegmentReadTasks && tasks_, + AfterSegmentRead after_segment_read_) + : pool_id(nextPoolId()) + , table_id(table_id_) + , dm_context(dm_context_) + , columns_to_read(columns_to_read_) + , filter(filter_) + , max_version(max_version_) + , expected_block_size(expected_block_size_) + , is_raw(is_raw_) + , do_range_filter_for_raw(do_range_filter_for_raw_) + , tasks(std::move(tasks_)) + , after_segment_read(after_segment_read_) + , log(&Poco::Logger::get("SegmentReadTaskPool")) + , unordered_input_stream_ref_count(0) + , exception_happened(false) + , mem_tracker(current_memory_tracker) {} + ~SegmentReadTaskPool() + { + auto [pop_times, pop_empty_times, max_queue_size] = q.getStat(); + auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; + auto total_count = blk_stat.totalCount(); + auto total_bytes = blk_stat.totalBytes(); + auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; + auto approximate_max_pending_block_bytes = blk_avg_bytes * max_queue_size; + LOG_FMT_DEBUG(log, "pool {} table {} pop {} pop_empty {} pop_empty_ratio {} max_queue_size {} blk_avg_bytes {} approximate_max_pending_block_bytes {} MB total_count {} total_bytes {} MB", // + pool_id, + table_id, + pop_times, + pop_empty_times, + pop_empty_ratio, + max_queue_size, + blk_avg_bytes, + approximate_max_pending_block_bytes / 1024.0 / 1024.0, + total_count, + total_bytes / 1024.0 / 1024.0); + } + SegmentReadTaskPtr nextTask() { std::lock_guard lock(mutex); @@ -74,13 +173,68 @@ class SegmentReadTaskPool : private boost::noncopyable return task; } + uint64_t poolId() const { return pool_id; } + + int64_t tableId() const { return table_id; } + + const SegmentReadTasks & getTasks() const { return tasks; } + + BlockInputStreamPtr buildInputStream(SegmentReadTaskPtr & t); + + bool readOneBlock(BlockInputStreamPtr & stream, const SegmentPtr & seg); + void popBlock(Block & block); + + std::unordered_map>::const_iterator scheduleSegment( + const std::unordered_map> & segments, + uint64_t expected_merge_count); + + int64_t increaseUnorderedInputStreamRefCount(); + int64_t decreaseUnorderedInputStreamRefCount(); + int64_t getFreeBlockSlots() const; + bool valid() const; + void setException(const DB::Exception & e); + SegmentReadTaskPtr getTask(uint64_t seg_id); + private: + int64_t getFreeActiveSegmentCountUnlock(); + bool exceptionHappened() const; + void finishSegment(const SegmentPtr & seg); + void pushBlock(Block && block); + + const uint64_t pool_id; + const int64_t table_id; + DMContextPtr dm_context; + ColumnDefines columns_to_read; + RSOperatorPtr filter; + const uint64_t max_version; + const size_t expected_block_size; + const bool is_raw; + const bool do_range_filter_for_raw; SegmentReadTasks tasks; - + AfterSegmentRead after_segment_read; std::mutex mutex; + std::unordered_set active_segment_ids; + WorkQueue q; + BlockStat blk_stat; + Poco::Logger * log; + + std::atomic unordered_input_stream_ref_count; + + std::atomic exception_happened; + DB::Exception exception; + + MemoryTracker * mem_tracker; + + inline static std::atomic pool_id_gen{1}; + inline static BlockStat global_blk_stat; + static uint64_t nextPoolId() + { + return pool_id_gen.fetch_add(1, std::memory_order_relaxed); + } }; using SegmentReadTaskPoolPtr = std::shared_ptr; +using SegmentReadTaskPools = std::vector; } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp new file mode 100644 index 00000000000..ab44b3b2ec2 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_circular_scan_list.cpp @@ -0,0 +1,136 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include + +#include + +namespace DB::DM::tests +{ +class Node +{ +public: + explicit Node(uint64_t id_) + : id(id_) + , table_id(1) + , v(true) + {} + + bool valid() const + { + return v; + } + uint64_t poolId() const + { + return id; + } + int64_t tableId() const + { + return table_id; + } + void setInvalid() + { + v = false; + } + +private: + uint64_t id; + int64_t table_id; + bool v; +}; + +TEST(CircularScanListTest, Normal) +{ + CircularScanList lst; + + { + ASSERT_EQ(lst.next(), nullptr); + auto [valid, invalid] = lst.count(0); + ASSERT_EQ(valid, 0); + ASSERT_EQ(invalid, 0); + ASSERT_EQ(lst.get(1), nullptr); + } + + for (uint64_t i = 0; i < 10; i++) + { + lst.add(std::make_shared(i)); + } + + { + auto [valid, invalid] = lst.count(0); + ASSERT_EQ(valid, 10); + ASSERT_EQ(invalid, 0); + } + + for (uint64_t i = 0; i < 20; i++) + { + auto sp = lst.next(); + ASSERT_EQ(sp->poolId(), i % 10); + } + + lst.get(1)->setInvalid(); + lst.get(3)->setInvalid(); + lst.get(5)->setInvalid(); + + { + auto [valid, invalid] = lst.count(0); + ASSERT_EQ(valid, 7); + ASSERT_EQ(invalid, 3); + } + + const std::vector valid_ids = {0, 2, 4, 6, 7, 8, 9}; + for (uint64_t i = 0; i < 20; i++) + { + auto sp = lst.next(); + ASSERT_EQ(sp->poolId(), valid_ids[i % valid_ids.size()]); + } + + { + auto [valid, invalid] = lst.count(0); + ASSERT_EQ(valid, 7); + ASSERT_EQ(invalid, 0); + } + + for (uint64_t id : valid_ids) + { + lst.get(id)->setInvalid(); + } + + { + auto [valid, invalid] = lst.count(0); + ASSERT_EQ(valid, 0); + ASSERT_EQ(invalid, 7); + } + + ASSERT_EQ(lst.next(), nullptr); +} + +TEST(CircularScanListTest, valid) +{ + CircularScanList l; + l.add(std::make_shared(1)); + + ASSERT_EQ(l.next()->poolId(), 1); + ASSERT_EQ(l.next()->poolId(), 1); + + l.next()->setInvalid(); + + ASSERT_EQ(l.next(), nullptr); + ASSERT_EQ(l.next(), nullptr); + l.add(std::make_shared(2)); + + ASSERT_EQ(l.next()->poolId(), 2); + ASSERT_EQ(l.next()->poolId(), 2); +} +} // namespace DB::DM::tests \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp new file mode 100644 index 00000000000..82ac6bdecf2 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp @@ -0,0 +1,122 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include + +#include "gtest/gtest.h" + +namespace DB::DM::tests +{ + +constexpr int TEST_PACK_ROWS = 1024; +DataTypePtr TEST_DATA_TYPE = std::make_shared(); + +ColumnPtr createColumn(int packs) +{ + int rows = packs * TEST_PACK_ROWS; + + auto mut_col = TEST_DATA_TYPE->createColumn(); + for (int i = 0; i < rows; i++) + { + Field f = std::to_string(packs * 100000 + i); + mut_col->insert(f); + } + return std::move(mut_col); +} + +void compareColumn(ColumnPtr & col1, ColumnPtr & col2, int rows) +{ + for (int i = 0; i < rows; i++) + { + ASSERT_EQ((*col1)[i].toString(), (*col2)[i].toString()); + } +} + +TEST(ColumnSharingCacheTest, AddAndGet) +{ + ColumnSharingCache cache; + + auto col = createColumn(8); + cache.add(1, 8, col); + + ColumnPtr col1; + auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_HIT); + ASSERT_EQ(col1->size(), 8 * TEST_PACK_ROWS); + compareColumn(col1, col, col1->size()); + + ColumnPtr col2; + st = cache.get(1, 7, 7 * TEST_PACK_ROWS, col2, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_COPY); + ASSERT_EQ(col2->size(), 7 * TEST_PACK_ROWS); + compareColumn(col2, col, col2->size()); + + ColumnPtr col3; + st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col3, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_PART); + ASSERT_EQ(col3, nullptr); + + ColumnPtr col4; + st = cache.get(2, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_MISS); + ASSERT_EQ(col4, nullptr); + + auto col5 = createColumn(7); + cache.add(1, 7, col5); + ColumnPtr col6; + st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col6, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_HIT); + ASSERT_EQ(col6->size(), 8 * TEST_PACK_ROWS); + compareColumn(col6, col, col6->size()); + + auto col7 = createColumn(9); + cache.add(1, 9, col7); + ColumnPtr col8; + st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_COPY); + ASSERT_EQ(col8->size(), 8 * TEST_PACK_ROWS); + compareColumn(col8, col7, col8->size()); +} + +TEST(ColumnSharingCacheTest, Del) +{ + ColumnSharingCache cache; + + auto col1 = createColumn(8); + cache.add(1, 8, col1); + + auto col2 = createColumn(8); + cache.add(9, 8, col2); + + auto col3 = createColumn(8); + cache.add(17, 8, col3); + + cache.del(10); + + ColumnPtr col4; + auto st = cache.get(9, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_HIT); + ASSERT_EQ(col4->size(), 8 * TEST_PACK_ROWS); + compareColumn(col4, col2, col4->size()); + + ColumnPtr col5; + st = cache.get(17, 6, 6 * TEST_PACK_ROWS, col5, TEST_DATA_TYPE); + ASSERT_EQ(st, ColumnCacheStatus::GET_COPY); + ASSERT_EQ(col5->size(), 6 * TEST_PACK_ROWS); + compareColumn(col5, col2, col5->size()); +} + +} // namespace DB::DM::tests \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 79038e1a349..89fc676dfee 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -262,6 +262,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -301,7 +302,7 @@ try { // test readRaw const auto & columns = store->getTableColumns(); - BlockInputStreamPtr in = store->readRaw(*db_context, db_context->getSettingsRef(), columns, 1)[0]; + BlockInputStreamPtr in = store->readRaw(*db_context, db_context->getSettingsRef(), columns, 1, /* keep_order= */ false)[0]; size_t num_rows_read = 0; in->readPrefix(); @@ -405,6 +406,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -491,6 +493,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -563,6 +566,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -601,6 +605,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -687,6 +692,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -773,6 +779,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -805,6 +812,7 @@ try /* max_version= */ UInt64(1), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -861,6 +869,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -901,6 +910,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -961,6 +971,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -985,6 +996,7 @@ try /* max_version= */ tso2, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1009,6 +1021,7 @@ try /* max_version= */ tso1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1033,6 +1046,7 @@ try /* max_version= */ tso1 - 1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1095,6 +1109,7 @@ try /* max_version= */ tso1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1133,6 +1148,7 @@ try /* max_version= */ tso2 - 1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1172,6 +1188,7 @@ try /* max_version= */ tso3 - 1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1198,6 +1215,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1224,6 +1242,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1286,6 +1305,7 @@ try /* max_version= */ tso1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); @@ -1324,6 +1344,7 @@ try /* max_version= */ tso2 - 1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); @@ -1363,6 +1384,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); @@ -1420,6 +1442,7 @@ try /* max_version= */ tso1, EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); @@ -1458,6 +1481,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); @@ -1545,6 +1569,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1654,6 +1679,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1762,6 +1788,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1851,6 +1878,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -1955,6 +1983,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2032,6 +2061,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2109,6 +2139,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2186,6 +2217,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2263,6 +2295,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2338,6 +2371,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2412,6 +2446,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2503,6 +2538,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -2638,6 +2674,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -2700,6 +2737,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); @@ -2800,6 +2838,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2854,6 +2893,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -2971,6 +3011,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; @@ -3009,7 +3050,7 @@ try { // test readRaw const auto & columns = store->getTableColumns(); - BlockInputStreamPtr in = store->readRaw(*db_context, db_context->getSettingsRef(), columns, 1)[0]; + BlockInputStreamPtr in = store->readRaw(*db_context, db_context->getSettingsRef(), columns, 1, /* keep_order= */ false)[0]; size_t num_rows_read = 0; in->readPrefix(); @@ -3101,6 +3142,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -3176,6 +3218,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -3208,6 +3251,7 @@ try /* max_version= */ UInt64(1), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -3267,6 +3311,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -3318,6 +3363,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -3392,6 +3438,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp index 360f3e96315..8cfa7c07642 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp @@ -89,6 +89,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastModeWithOnlyInsertWithoutRangeFilter) /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; @@ -192,6 +193,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastModeWithOnlyInsertWithRangeFilter) /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; @@ -287,6 +289,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -433,6 +436,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -580,6 +584,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -731,6 +736,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -826,6 +832,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1011,6 +1018,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1052,6 +1060,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1114,6 +1123,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1153,6 +1163,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order = */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1229,6 +1240,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1331,6 +1343,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1445,6 +1458,7 @@ try /* max_version= */ UInt64(1), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1513,6 +1527,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -1571,6 +1586,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, TRACING_NAME, + /* keep_order= */ false, /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index a575d022476..f066237d8e2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -124,7 +124,7 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; - auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter, name); + auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter, name, false); streams[0]->readPrefix(); auto rows = streams[0]->read().rows(); streams[0]->readSuffix(); diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index faa0100e9e2..93877181e55 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -190,7 +190,7 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); - auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, "DTWorkload", opts->is_fast_mode, excepted_block_size); + auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, "DTWorkload", false, opts->is_fast_mode, excepted_block_size); std::vector threads; threads.reserve(streams.size()); for (auto & stream : streams) diff --git a/dbms/src/Storages/SelectQueryInfo.cpp b/dbms/src/Storages/SelectQueryInfo.cpp index 075c0ad0631..9b5dbf0d30f 100644 --- a/dbms/src/Storages/SelectQueryInfo.cpp +++ b/dbms/src/Storages/SelectQueryInfo.cpp @@ -29,6 +29,7 @@ SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & rhs) , mvcc_query_info(rhs.mvcc_query_info != nullptr ? std::make_unique(*rhs.mvcc_query_info) : nullptr) , dag_query(rhs.dag_query != nullptr ? std::make_unique(*rhs.dag_query) : nullptr) , req_id(rhs.req_id) + , keep_order(rhs.keep_order) {} SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept @@ -37,6 +38,7 @@ SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept , mvcc_query_info(std::move(rhs.mvcc_query_info)) , dag_query(std::move(rhs.dag_query)) , req_id(std::move(rhs.req_id)) + , keep_order(rhs.keep_order) {} } // namespace DB diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index d49d4c831d1..0807647aa26 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -51,7 +51,7 @@ struct SelectQueryInfo std::unique_ptr dag_query; std::string req_id; - + bool keep_order = true; SelectQueryInfo(); ~SelectQueryInfo(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index c39a882d53c..1bf3e7c1b73 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -632,6 +632,7 @@ BlockInputStreams StorageDeltaMerge::read( context.getSettingsRef(), columns_to_read, num_streams, + query_info.keep_order, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); } @@ -756,6 +757,7 @@ BlockInputStreams StorageDeltaMerge::read( /*max_version=*/mvcc_query_info.read_tso, rs_operator, query_info.req_id, + query_info.keep_order, /* is_fast_mode */ tidb_table_info.tiflash_mode == TiDB::TiFlashMode::Fast, // read in normal mode or read in fast mode max_block_size, parseSegmentSet(select_query.segment_expression_list), @@ -838,7 +840,8 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM 1, std::numeric_limits::max(), EMPTY_FILTER, - /*tracing_id*/ "getRows")[0]; + /*tracing_id*/ "getRows", + /*keep_order*/ false)[0]; stream->readPrefix(); Block block; while ((block = stream->read())) @@ -863,7 +866,8 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context 1, std::numeric_limits::max(), EMPTY_FILTER, - /*tracing_id*/ "getRange")[0]; + /*tracing_id*/ "getRange", + /*keep_order*/ false)[0]; stream->readPrefix(); Block block; size_t index = 0; diff --git a/dbms/src/Storages/Transaction/Collator.cpp b/dbms/src/Storages/Transaction/Collator.cpp index 1b0221a6829..d11d693a8a4 100644 --- a/dbms/src/Storages/Transaction/Collator.cpp +++ b/dbms/src/Storages/Transaction/Collator.cpp @@ -92,15 +92,12 @@ class Pattern : public ITiDBCollator::IPattern tp = MatchType::Match; if (offset < pattern.length()) { - auto old_offset = offset; - c = Collator::decodeChar(pattern.data(), old_offset); - if (c == escape || c == '_' || c == '%') - offset = old_offset; - else - { - assert(escape >= 0); - c = static_cast(escape); // NOLINT(bugprone-signed-char-misuse) - } + // use next char to match + c = Collator::decodeChar(pattern.data(), offset); + } + else + { + // use `escape` to match } } else if (c == '_') diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index 26c456b5b31..9c53ccb9084 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -13,6 +13,9 @@ // limitations under the License. #include +#include +#include +#include #include namespace DB::FailPoints @@ -25,6 +28,10 @@ int main(int argc, char ** argv) { DB::tests::TiFlashTestEnv::setupLogger(); DB::tests::TiFlashTestEnv::initializeGlobalContext(); + DB::ServerInfo server_info; + DB::DM::SegmentReaderPoolManager::instance().init(server_info); + DB::DM::SegmentReadTaskScheduler::instance(); + DB::DM::DMFileReaderPool::instance(); #ifdef FIU_ENABLE fiu_init(0); // init failpoint diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index cc8160761e6..c5c25d1adff 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -270,8 +270,10 @@ DAGRequestBuilder & DAGRequestBuilder::aggregation(ASTPtr agg_func, ASTPtr group { auto agg_funcs = std::make_shared(); auto group_by_exprs = std::make_shared(); - agg_funcs->children.push_back(agg_func); - group_by_exprs->children.push_back(group_by_expr); + if (agg_func) + agg_funcs->children.push_back(agg_func); + if (group_by_expr) + group_by_exprs->children.push_back(group_by_expr); return buildAggregation(agg_funcs, group_by_exprs); } diff --git a/tests/fullstack-test/expr/like.test b/tests/fullstack-test/expr/like.test index af76095d157..5f13c7a8159 100644 --- a/tests/fullstack-test/expr/like.test +++ b/tests/fullstack-test/expr/like.test @@ -38,4 +38,29 @@ mysql> set @@tidb_isolation_read_engines='tiflash'; select * from test.t where ' | a | b | +------+------+ | aaaa | %a% | -+------+------+ \ No newline at end of file ++------+------+ + +mysql> insert into test.t values ('1234', ''); + +mysql> set @@tidb_isolation_read_engines='tiflash'; select a from test.t where a like '1234' escape '4'; ++------+ +| a | ++------+ +| 1234 | ++------+ + +mysql> set @@tidb_isolation_read_engines='tiflash'; select a from test.t where a like '1234' escape '2'; + +mysql> set @@tidb_isolation_read_engines='tiflash'; select a from test.t where a like '15234' escape '5'; ++------+ +| a | ++------+ +| 1234 | ++------+ + +mysql> set @@tidb_isolation_read_engines='tiflash'; select a from test.t where a like '_223_' escape '2'; ++------+ +| a | ++------+ +| 1234 | ++------+