diff --git a/CMakeLists.txt b/CMakeLists.txt index 4e14c205f18..f2ec9f3316b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -432,9 +432,6 @@ else (ENABLE_FAILPOINTS) message (STATUS "Failpoints are disabled") endif (ENABLE_FAILPOINTS) -# Enable PageStorage V3 test. -option (ENABLE_V3_PAGESTORAGE "Enables V3 PageStorage" ON) - # Flags for test coverage option (TEST_COVERAGE "Enables flags for test coverage" OFF) option (TEST_COVERAGE_XML "Output XML report for test coverage" OFF) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index cce11bd6997..e1e52fab73b 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -91,12 +91,10 @@ add_headers_and_sources(dbms src/Storages/Page/V2/VersionSet) add_headers_and_sources(dbms src/Storages/Page/V2/gc) add_headers_and_sources(dbms src/WindowFunctions) add_headers_and_sources(dbms src/TiDB/Schema) -if (ENABLE_V3_PAGESTORAGE) - add_headers_and_sources(dbms src/Storages/Page/V3) - add_headers_and_sources(dbms src/Storages/Page/V3/LogFile) - add_headers_and_sources(dbms src/Storages/Page/V3/WAL) - add_headers_and_sources(dbms src/Storages/Page/V3/spacemap) -endif() +add_headers_and_sources(dbms src/Storages/Page/V3) +add_headers_and_sources(dbms src/Storages/Page/V3/LogFile) +add_headers_and_sources(dbms src/Storages/Page/V3/WAL) +add_headers_and_sources(dbms src/Storages/Page/V3/spacemap) add_headers_and_sources(dbms src/Storages/Page/) add_headers_and_sources(dbms src/TiDB) add_headers_and_sources(dbms src/Client) @@ -323,6 +321,9 @@ if (ENABLE_TESTS) if (ENABLE_TIFLASH_DTWORKLOAD) target_link_libraries(bench_dbms dt-workload-lib) endif () + if (ENABLE_TIFLASH_PAGEWORKLOAD) + target_link_libraries(bench_dbms page-workload-lib) + endif () add_check(bench_dbms) endif () diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h new file mode 100644 index 00000000000..4fa33262e66 --- /dev/null +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -0,0 +1,246 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} // namespace ErrorCodes + +class MultiPartitionStreamPool +{ +public: + MultiPartitionStreamPool() = default; + + void addPartitionStreams(const BlockInputStreams & cur_streams) + { + if (cur_streams.empty()) + return; + std::unique_lock lk(mu); + streams_queue_by_partition.push_back( + std::make_shared>>()); + for (const auto & stream : cur_streams) + streams_queue_by_partition.back()->push(stream); + added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end()); + } + + std::shared_ptr pickOne() + { + std::unique_lock lk(mu); + if (streams_queue_by_partition.empty()) + return nullptr; + if (streams_queue_id >= static_cast(streams_queue_by_partition.size())) + streams_queue_id = 0; + + auto & q = *streams_queue_by_partition[streams_queue_id]; + std::shared_ptr ret = nullptr; + assert(!q.empty()); + ret = q.front(); + q.pop(); + if (q.empty()) + streams_queue_id = removeQueue(streams_queue_id); + else + streams_queue_id = nextQueueId(streams_queue_id); + return ret; + } + + int exportAddedStreams(BlockInputStreams & ret_streams) + { + std::unique_lock lk(mu); + for (auto & stream : added_streams) + ret_streams.push_back(stream); + return added_streams.size(); + } + + int addedStreamsCnt() + { + std::unique_lock lk(mu); + return added_streams.size(); + } + +private: + int removeQueue(int queue_id) + { + streams_queue_by_partition[queue_id] = nullptr; + if (queue_id != static_cast(streams_queue_by_partition.size()) - 1) + { + swap(streams_queue_by_partition[queue_id], streams_queue_by_partition.back()); + streams_queue_by_partition.pop_back(); + return queue_id; + } + else + { + streams_queue_by_partition.pop_back(); + return 0; + } + } + + int nextQueueId(int queue_id) const + { + if (queue_id + 1 < static_cast(streams_queue_by_partition.size())) + return queue_id + 1; + else + return 0; + } + + static void swap(std::shared_ptr>> & a, + std::shared_ptr>> & b) + { + a.swap(b); + } + + std::vector< + std::shared_ptr>>> + streams_queue_by_partition; + std::vector> added_streams; + int streams_queue_id = 0; + std::mutex mu; +}; + +class MultiplexInputStream final : public IProfilingBlockInputStream +{ +private: + static constexpr auto NAME = "Multiplex"; + +public: + MultiplexInputStream( + std::shared_ptr & shared_pool, + const String & req_id) + : log(Logger::get(NAME, req_id)) + , shared_pool(shared_pool) + { + shared_pool->exportAddedStreams(children); + size_t num_children = children.size(); + if (num_children > 1) + { + Block header = children.at(0)->getHeader(); + for (size_t i = 1; i < num_children; ++i) + assertBlocksHaveEqualStructure( + children[i]->getHeader(), + header, + "MULTIPLEX"); + } + } + + String getName() const override { return NAME; } + + ~MultiplexInputStream() override + { + try + { + if (!all_read) + cancel(false); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + } + + /** Different from the default implementation by trying to stop all sources, + * skipping failed by execution. + */ + void cancel(bool kill) override + { + if (kill) + is_killed = true; + + bool old_val = false; + if (!is_cancelled.compare_exchange_strong( + old_val, + true, + std::memory_order_seq_cst, + std::memory_order_relaxed)) + return; + + if (cur_stream) + { + if (IProfilingBlockInputStream * child = dynamic_cast(&*cur_stream)) + { + child->cancel(kill); + } + } + } + + Block getHeader() const override { return children.at(0)->getHeader(); } + +protected: + /// Do nothing, to make the preparation when underlying InputStream is picked from the pool + void readPrefix() override + { + } + + /** The following options are possible: + * 1. `readImpl` function is called until it returns an empty block. + * Then `readSuffix` function is called and then destructor. + * 2. `readImpl` function is called. At some point, `cancel` function is called perhaps from another thread. + * Then `readSuffix` function is called and then destructor. + * 3. At any time, the object can be destroyed (destructor called). + */ + + Block readImpl() override + { + if (all_read) + return {}; + + Block ret; + while (!cur_stream || !(ret = cur_stream->read())) + { + if (cur_stream) + cur_stream->readSuffix(); // release old inputstream + cur_stream = shared_pool->pickOne(); + if (!cur_stream) + { // shared_pool is empty + all_read = true; + return {}; + } + cur_stream->readPrefix(); + } + return ret; + } + + /// Called either after everything is read, or after cancel. + void readSuffix() override + { + if (!all_read && !is_cancelled) + throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR); + + if (cur_stream) + { + cur_stream->readSuffix(); + cur_stream = nullptr; + } + } + +private: + LoggerPtr log; + + std::shared_ptr shared_pool; + std::shared_ptr cur_stream; + + bool all_read = false; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index df7e504d2c4..14cddd94730 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -634,6 +635,9 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max if (total_local_region_num == 0) return; const auto table_query_infos = generateSelectQueryInfos(); + bool has_multiple_partitions = table_query_infos.size() > 1; + // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case + std::shared_ptr stream_pool = has_multiple_partitions ? std::make_shared() : nullptr; for (const auto & table_query_info : table_query_infos) { DAGPipeline current_pipeline; @@ -642,9 +646,6 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max size_t region_num = query_info.mvcc_query_info->regions_query_info.size(); if (region_num == 0) continue; - /// calculate weighted max_streams for each partition, note at least 1 stream is needed for each partition - size_t current_max_streams = table_query_infos.size() == 1 ? max_streams : (max_streams * region_num + total_local_region_num - 1) / total_local_region_num; - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end()); auto & storage = storages_with_structure_lock[table_id].storage; @@ -654,7 +655,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max { try { - current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, current_max_streams); + current_pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); // After getting streams from storage, we need to validate whether Regions have changed or not after learner read. // (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `streams` @@ -778,7 +779,19 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max throw; } } - pipeline.streams.insert(pipeline.streams.end(), current_pipeline.streams.begin(), current_pipeline.streams.end()); + if (has_multiple_partitions) + stream_pool->addPartitionStreams(current_pipeline.streams); + else + pipeline.streams.insert(pipeline.streams.end(), current_pipeline.streams.begin(), current_pipeline.streams.end()); + } + if (has_multiple_partitions) + { + String req_info = dag_context.isMPPTask() ? dag_context.getMPPTaskId().toString() : ""; + int exposed_streams_cnt = std::min(static_cast(max_streams), stream_pool->addedStreamsCnt()); + for (int i = 0; i < exposed_streams_cnt; ++i) + { + pipeline.streams.push_back(std::make_shared(stream_pool, req_info)); + } } } diff --git a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp index df6c881c306..1e9da93ffe3 100644 --- a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp +++ b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -48,7 +47,6 @@ class BasicManualCompactTest BasicManualCompactTest() { - log = &Poco::Logger::get(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); pk_type = GetParam(); } @@ -63,7 +61,7 @@ class BasicManualCompactTest setupStorage(); // In tests let's only compact one segment. - db_context->setSetting("manual_compact_more_until_ms", UInt64(0)); + db_context->setSetting("manual_compact_more_until_ms", Field(UInt64(0))); // Split into 4 segments, and prepare some delta data for first 3 segments. helper = std::make_unique(*db_context); @@ -116,8 +114,6 @@ class BasicManualCompactTest std::unique_ptr manager; DM::tests::DMTestEnv::PkType pk_type; - - [[maybe_unused]] Poco::Logger * log; }; @@ -315,7 +311,7 @@ CATCH TEST_P(BasicManualCompactTest, CompactMultiple) try { - db_context->setSetting("manual_compact_more_until_ms", UInt64(60 * 1000)); // Hope it's long enough! + db_context->setSetting("manual_compact_more_until_ms", Field(UInt64(60 * 1000))); // Hope it's long enough! auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(TABLE_ID); diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt index 63cf6d0e1f9..104b4f34e4a 100644 --- a/dbms/src/Server/CMakeLists.txt +++ b/dbms/src/Server/CMakeLists.txt @@ -22,6 +22,7 @@ option(ENABLE_CLICKHOUSE_SERVER "Enable server" ${ENABLE_CLICKHOUSE_ALL}) option(ENABLE_CLICKHOUSE_CLIENT "Enable client" ${ENABLE_CLICKHOUSE_ALL}) option(ENABLE_TIFLASH_DTTOOL "Enable dttool: tools to manage dmfile" ${ENABLE_CLICKHOUSE_ALL}) option(ENABLE_TIFLASH_DTWORKLOAD "Enable dtworkload: tools to test and stress DeltaTree" ${ENABLE_CLICKHOUSE_ALL}) +option(ENABLE_TIFLASH_PAGEWORKLOAD "Enable pageworkload: tools to test and stress PageStorage" ${ENABLE_CLICKHOUSE_ALL}) option(ENABLE_TIFLASH_PAGECTL "Enable pagectl: tools to debug page storage" ${ENABLE_CLICKHOUSE_ALL}) configure_file (config_tools.h.in ${CMAKE_CURRENT_BINARY_DIR}/config_tools.h) @@ -136,6 +137,9 @@ endif () if (ENABLE_TIFLASH_DTWORKLOAD) target_link_libraries(tiflash dt-workload-lib) endif () +if (ENABLE_TIFLASH_PAGEWORKLOAD) + target_link_libraries(tiflash page-workload-lib) +endif() if (ENABLE_TIFLASH_PAGECTL) target_link_libraries(tiflash page-ctl-lib) endif () diff --git a/dbms/src/Server/config_tools.h.in b/dbms/src/Server/config_tools.h.in index 61aa3f41591..03a478a6473 100644 --- a/dbms/src/Server/config_tools.h.in +++ b/dbms/src/Server/config_tools.h.in @@ -6,4 +6,5 @@ #cmakedefine01 ENABLE_CLICKHOUSE_CLIENT #cmakedefine01 ENABLE_TIFLASH_DTTOOL #cmakedefine01 ENABLE_TIFLASH_DTWORKLOAD +#cmakedefine01 ENABLE_TIFLASH_PAGEWORKLOAD #cmakedefine01 ENABLE_TIFLASH_PAGECTL diff --git a/dbms/src/Server/main.cpp b/dbms/src/Server/main.cpp index 11cccf84729..dbcaa4f38fc 100644 --- a/dbms/src/Server/main.cpp +++ b/dbms/src/Server/main.cpp @@ -36,7 +36,10 @@ #include #endif #if ENABLE_TIFLASH_DTWORKLOAD -#include +#include +#endif +#if ENABLE_TIFLASH_PAGEWORKLOAD +#include #endif #if ENABLE_TIFLASH_PAGECTL #include @@ -107,6 +110,9 @@ std::pair clickhouse_applications[] = { #if ENABLE_TIFLASH_DTWORKLOAD {"dtworkload", DB::DM::tests::DTWorkload::mainEntry}, #endif +#if ENABLE_TIFLASH_PAGEWORKLOAD + {"pageworkload", DB::PS::tests::StressWorkload::mainEntry}, +#endif #if ENABLE_TIFLASH_PAGECTL {"pagectl", DB::PageStorageCtl::mainEntry}, #endif diff --git a/dbms/src/Storages/CMakeLists.txt b/dbms/src/Storages/CMakeLists.txt index 90cc7a01d5b..68a2e6c9a74 100644 --- a/dbms/src/Storages/CMakeLists.txt +++ b/dbms/src/Storages/CMakeLists.txt @@ -15,16 +15,15 @@ add_subdirectory (System) add_subdirectory (Page) add_subdirectory (DeltaMerge/File/dtpb) -add_subdirectory (DeltaMerge/tools) +add_subdirectory (DeltaMerge/workload) +add_subdirectory (Page/workload) if (ENABLE_TESTS) add_subdirectory (tests EXCLUDE_FROM_ALL) add_subdirectory (Transaction/tests EXCLUDE_FROM_ALL) add_subdirectory (Page/V2/tests EXCLUDE_FROM_ALL) - if (ENABLE_V3_PAGESTORAGE) - add_subdirectory (Page/V3 EXCLUDE_FROM_ALL) - add_subdirectory (Page/V3/tests EXCLUDE_FROM_ALL) - endif () + add_subdirectory (Page/V3 EXCLUDE_FROM_ALL) + add_subdirectory (Page/V3/tests EXCLUDE_FROM_ALL) add_subdirectory (DeltaMerge/tests EXCLUDE_FROM_ALL) endif () diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a74404f3dbb..195ed5c53c2 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -1137,9 +1138,11 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, } fiu_do_on(FailPoints::force_slow_page_storage_snapshot_release, { - std::thread thread_hold_snapshots([tasks]() { + std::thread thread_hold_snapshots([this, tasks]() { + LOG_FMT_WARNING(log, "failpoint force_slow_page_storage_snapshot_release begin"); std::this_thread::sleep_for(std::chrono::seconds(5 * 60)); (void)tasks; + LOG_FMT_WARNING(log, "failpoint force_slow_page_storage_snapshot_release end"); }); thread_hold_snapshots.detach(); }); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 752898f9c75..2791a74e9e3 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -624,8 +624,8 @@ PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, cons auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false); fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, { - static size_t fail_point_called = 0; - if (existed_path.empty() && fail_point_called % 10 == 0) + static std::atomic fail_point_called(0); + if (existed_path.empty() && fail_point_called.load() % 10 == 0) { existed_path = ""; } diff --git a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h index 7c5b0b2416d..787a521ded3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h +++ b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h @@ -88,6 +88,7 @@ class MultiSegmentTestUtil : private boost::noncopyable // Check there is only one segment ASSERT_EQ(store->segments.size(), 1); const auto & [_key, seg] = *store->segments.begin(); + (void)_key; ASSERT_EQ(seg->getDelta()->getRows(), n_avg_rows_per_segment * 4); ASSERT_EQ(seg->getStable()->getRows(), 0); @@ -108,6 +109,7 @@ class MultiSegmentTestUtil : private boost::noncopyable auto segment_idx = 0; for (auto & [_key, seg] : store->segments) { + (void)_key; LOG_FMT_INFO(log, "Segment #{}: Range = {}", segment_idx, seg->getRowKeyRange().toDebugString()); ASSERT_EQ(seg->getDelta()->getRows(), 0); ASSERT_GT(seg->getStable()->getRows(), 0); // We don't check the exact rows of each segment. @@ -147,6 +149,7 @@ class MultiSegmentTestUtil : private boost::noncopyable auto segment_idx = 0; for (auto & [_key, seg] : store->segments) { + (void)_key; ASSERT_EQ(seg->getDelta()->getRows(), expected_delta_rows[segment_idx]) << "Assert failed for segment #" << segment_idx; ASSERT_EQ(seg->getStable()->getRows(), expected_stable_rows[segment_idx]) << "Assert failed for segment #" << segment_idx; segment_idx++; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index e934f7a2049..d46e1b7aa36 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3564,7 +3564,6 @@ class DeltaMergeStoreMergeDeltaBySegmentTest public: DeltaMergeStoreMergeDeltaBySegmentTest() { - log = &Poco::Logger::get(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); std::tie(ps_ver, pk_type) = GetParam(); } @@ -3607,8 +3606,6 @@ class DeltaMergeStoreMergeDeltaBySegmentTest UInt64 ps_ver; DMTestEnv::PkType pk_type; - - [[maybe_unused]] Poco::Logger * log; }; INSTANTIATE_TEST_CASE_P( diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Main.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/Main.cpp deleted file mode 100644 index 092c8a89a42..00000000000 --- a/dbms/src/Storages/DeltaMerge/tools/workload/Main.cpp +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -using namespace DB::DM::tests; - -int main(int argc, char ** argv) -{ - return DTWorkload::mainEntry(argc, argv); -} diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/workload/CMakeLists.txt similarity index 86% rename from dbms/src/Storages/DeltaMerge/tools/workload/CMakeLists.txt rename to dbms/src/Storages/DeltaMerge/workload/CMakeLists.txt index 7227f1cf563..7a83cbec57c 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/CMakeLists.txt +++ b/dbms/src/Storages/DeltaMerge/workload/CMakeLists.txt @@ -18,6 +18,3 @@ set(dt-workload-src MainEntry.cpp DTWorkload.cpp KeyGenerator.cpp TableGenerator add_library(dt-workload-lib ${dt-workload-src}) target_link_libraries(dt-workload-lib dbms clickhouse_functions clickhouse-server-lib) - -add_executable(dt-workload Main.cpp ${dt-workload-src}) -target_link_libraries(dt-workload dbms gtest clickhouse_functions clickhouse-server-lib) diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp similarity index 94% rename from dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp rename to dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index a6113f91d91..a53a1b9ebbd 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -19,16 +19,16 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.h b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.h similarity index 97% rename from dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.h rename to dbms/src/Storages/DeltaMerge/workload/DTWorkload.h index 26cc5b6e07c..1ee5ba6b871 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.h +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.h @@ -73,7 +73,7 @@ class ThreadStat class Statistics { public: - Statistics(int write_thread_count = 0, int read_thread_count = 0) + explicit Statistics(int write_thread_count = 0, int read_thread_count = 0) : init_ms(0) , write_stats(write_thread_count) , read_stats(read_thread_count) diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp b/dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp similarity index 95% rename from dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp rename to dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp index be6ff1dcbbe..479977d46d1 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp @@ -13,11 +13,11 @@ // limitations under the License. #include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include #include #include @@ -33,7 +33,7 @@ class RandomDataGenerator : public DataGenerator , rand_gen(std::random_device()()) {} - virtual std::tuple get(uint64_t key) override + std::tuple get(uint64_t key) override { Block block; // Generate 'rowkeys'. @@ -227,7 +227,9 @@ class RandomDataGenerator : public DataGenerator struct tm randomLocalTime() { time_t t = randomUTCTimestamp(); - struct tm res; + struct tm res + { + }; if (localtime_r(&t, &res) == nullptr) { throw std::invalid_argument(fmt::format("localtime_r({}) ret {}", t, strerror(errno))); diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.h b/dbms/src/Storages/DeltaMerge/workload/DataGenerator.h similarity index 96% rename from dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.h rename to dbms/src/Storages/DeltaMerge/workload/DataGenerator.h index e32de4591e6..cd29f1a3a80 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.h +++ b/dbms/src/Storages/DeltaMerge/workload/DataGenerator.h @@ -27,7 +27,7 @@ class DataGenerator public: static std::unique_ptr create(const WorkloadOptions & opts, const TableInfo & table_info, TimestampGenerator & ts_gen); virtual std::tuple get(uint64_t key) = 0; - virtual ~DataGenerator() {} + virtual ~DataGenerator() = default; }; std::string blockToString(const Block & block); diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Handle.h b/dbms/src/Storages/DeltaMerge/workload/Handle.h similarity index 90% rename from dbms/src/Storages/DeltaMerge/tools/workload/Handle.h rename to dbms/src/Storages/DeltaMerge/workload/Handle.h index eb117a4fddd..c4949c15a1f 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/Handle.h +++ b/dbms/src/Storages/DeltaMerge/workload/Handle.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include @@ -40,7 +40,7 @@ class HandleLock static constexpr uint64_t default_lock_count = 4096; static std::unique_ptr create(const TableInfo & table_info); - HandleLock(uint64_t lock_count = default_lock_count) + explicit HandleLock(uint64_t lock_count = default_lock_count) : rmtxs(lock_count) {} @@ -51,14 +51,14 @@ class HandleLock std::vector> getLocks(const std::vector & handles) { - std::vector indexes; + std::vector indexes(handles.size()); for (const auto & h : handles) { indexes.push_back(index(h)); } // Sort mutex indexes to avoid dead lock. sort(indexes.begin(), indexes.end()); - std::vector> locks; + std::vector> locks(indexes.size()); for (auto i : indexes) { locks.push_back(getLockByIndex(i)); @@ -105,7 +105,7 @@ class HandleTable std::lock_guard lock(mtx); handle_to_ts[handle] = ts; Record r{handle, ts}; - if (wal != nullptr && wal->write((char *)&r, sizeof(r)) != sizeof(r)) + if (wal != nullptr && wal->write(reinterpret_cast(&r), sizeof(r)) != sizeof(r)) { throw std::runtime_error(fmt::format("write ret {}", strerror(errno))); } @@ -134,8 +134,8 @@ class HandleTable try { PosixRandomAccessFile f(fname, -1); - Record r; - while (f.read((char *)&r, sizeof(r)) == sizeof(r)) + Record r{}; + while (f.read(reinterpret_cast(&r), sizeof(r)) == sizeof(r)) { handle_to_ts[r.handle] = r.ts; } @@ -156,7 +156,7 @@ class HandleTable for (const auto & pa : handle_to_ts) { Record r{pa.first, pa.second}; - if (f.write((char *)&r, sizeof(r)) != sizeof(r)) + if (f.write(reinterpret_cast(&r), sizeof(r)) != sizeof(r)) { throw std::runtime_error(fmt::format("write ret {}", strerror(errno))); } @@ -191,7 +191,7 @@ class SharedHandleTable public: static constexpr uint64_t default_shared_count = 4096; - SharedHandleTable(uint64_t max_key_count, const std::string & waldir = "", uint64_t shared_cnt = default_shared_count) + explicit SharedHandleTable(uint64_t max_key_count, const std::string & waldir = "", uint64_t shared_cnt = default_shared_count) : tables(shared_cnt) { uint64_t max_key_count_per_shared = max_key_count / default_shared_count + 1; diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/KeyGenerator.cpp b/dbms/src/Storages/DeltaMerge/workload/KeyGenerator.cpp similarity index 92% rename from dbms/src/Storages/DeltaMerge/tools/workload/KeyGenerator.cpp rename to dbms/src/Storages/DeltaMerge/workload/KeyGenerator.cpp index bb2f2253279..f899ec71b4b 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/KeyGenerator.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/KeyGenerator.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include +#include +#include #include #include @@ -31,7 +31,7 @@ class IncrementalKeyGenerator : public KeyGenerator , key(0) {} - virtual uint64_t get64() override + uint64_t get64() override { return key.fetch_add(1, std::memory_order_relaxed) % key_count + start_key; } @@ -54,7 +54,7 @@ class UniformDistributionKeyGenerator : public KeyGenerator , uniform_dist(0, key_count) {} - virtual uint64_t get64() override + uint64_t get64() override { std::lock_guard lock(mtx); return uniform_dist(rand_gen); @@ -78,7 +78,7 @@ class NormalDistributionKeyGenerator : public KeyGenerator , normal_dist(key_count / 2.0, key_count / 20.0) {} - virtual uint64_t get64() override + uint64_t get64() override { std::lock_guard lock(mtx); return normal_dist(rand_gen); diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/KeyGenerator.h b/dbms/src/Storages/DeltaMerge/workload/KeyGenerator.h similarity index 92% rename from dbms/src/Storages/DeltaMerge/tools/workload/KeyGenerator.h rename to dbms/src/Storages/DeltaMerge/workload/KeyGenerator.h index 447f3ffc27a..7c8b8fd0080 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/KeyGenerator.h +++ b/dbms/src/Storages/DeltaMerge/workload/KeyGenerator.h @@ -23,8 +23,8 @@ class KeyGenerator public: static std::unique_ptr create(const WorkloadOptions & opts); - KeyGenerator() {} - virtual ~KeyGenerator() {} + KeyGenerator() = default; + virtual ~KeyGenerator() = default; virtual uint64_t get64() = 0; }; diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Limiter.cpp b/dbms/src/Storages/DeltaMerge/workload/Limiter.cpp similarity index 77% rename from dbms/src/Storages/DeltaMerge/tools/workload/Limiter.cpp rename to dbms/src/Storages/DeltaMerge/workload/Limiter.cpp index 73764d27bc5..65f9e3ce72c 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/Limiter.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/Limiter.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include -#include -#include +#include +#include #include #include @@ -24,10 +24,10 @@ namespace DB::DM::tests class ConstantLimiter : public Limiter { public: - ConstantLimiter(uint64_t rate_per_sec) + explicit ConstantLimiter(uint64_t rate_per_sec) : limiter(rate_per_sec, LimiterType::UNKNOW) {} - virtual void request() override + void request() override { limiter.request(1); } @@ -38,7 +38,7 @@ class ConstantLimiter : public Limiter std::unique_ptr Limiter::create(const WorkloadOptions & opts) { - uint64_t per_sec = std::ceil(static_cast(opts.max_write_per_sec / opts.write_thread_count)); + uint64_t per_sec = std::ceil(opts.max_write_per_sec * 1.0 / opts.write_thread_count); return std::make_unique(per_sec); } diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Limiter.h b/dbms/src/Storages/DeltaMerge/workload/Limiter.h similarity index 96% rename from dbms/src/Storages/DeltaMerge/tools/workload/Limiter.h rename to dbms/src/Storages/DeltaMerge/workload/Limiter.h index e2892b178a2..da2d31c7915 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/Limiter.h +++ b/dbms/src/Storages/DeltaMerge/workload/Limiter.h @@ -23,6 +23,6 @@ class Limiter public: static std::unique_ptr create(const WorkloadOptions & opts); virtual void request() = 0; - virtual ~Limiter() {} + virtual ~Limiter() = default; }; } // namespace DB::DM::tests \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/MainEntry.cpp b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp similarity index 97% rename from dbms/src/Storages/DeltaMerge/tools/workload/MainEntry.cpp rename to dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp index f79d414f20b..88cf0b6322f 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/MainEntry.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp @@ -14,10 +14,10 @@ #include #include -#include -#include -#include -#include +#include +#include +#include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Options.cpp b/dbms/src/Storages/DeltaMerge/workload/Options.cpp similarity index 98% rename from dbms/src/Storages/DeltaMerge/tools/workload/Options.cpp rename to dbms/src/Storages/DeltaMerge/workload/Options.cpp index 1c6409f3c53..8545d22ca8d 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/Options.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/Options.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include -#include -#include +#include +#include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Options.h b/dbms/src/Storages/DeltaMerge/workload/Options.h similarity index 100% rename from dbms/src/Storages/DeltaMerge/tools/workload/Options.h rename to dbms/src/Storages/DeltaMerge/workload/Options.h diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/ReadColumnsGenerator.h b/dbms/src/Storages/DeltaMerge/workload/ReadColumnsGenerator.h similarity index 93% rename from dbms/src/Storages/DeltaMerge/tools/workload/ReadColumnsGenerator.h rename to dbms/src/Storages/DeltaMerge/workload/ReadColumnsGenerator.h index 180409f89e1..c881bb148a2 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/ReadColumnsGenerator.h +++ b/dbms/src/Storages/DeltaMerge/workload/ReadColumnsGenerator.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include @@ -28,7 +28,7 @@ class ReadColumnsGenerator return std::make_unique(table_info); } - ReadColumnsGenerator(const TableInfo & table_info_) + explicit ReadColumnsGenerator(const TableInfo & table_info_) : table_info(table_info_) , rand_gen(std::random_device()()) , uniform_dist(0, table_info_.columns->size() - 1) diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/TableGenerator.cpp b/dbms/src/Storages/DeltaMerge/workload/TableGenerator.cpp similarity index 97% rename from dbms/src/Storages/DeltaMerge/tools/workload/TableGenerator.cpp rename to dbms/src/Storages/DeltaMerge/workload/TableGenerator.cpp index cf52e808ab1..ec29a476d6a 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/TableGenerator.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/TableGenerator.cpp @@ -15,8 +15,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -237,7 +237,7 @@ class RandomTableGenerator : public TableGenerator , rand_gen(std::random_device()()) {} - virtual TableInfo get(int64_t table_id, std::string table_name) override + TableInfo get(int64_t table_id, std::string table_name) override { TableInfo table_info; @@ -293,7 +293,7 @@ class RandomTableGenerator : public TableGenerator class ConstantTableGenerator : public TableGenerator { - virtual TableInfo get(int64_t table_id, std::string table_name) override + TableInfo get(int64_t table_id, std::string table_name) override { TableInfo table_info; diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/TableGenerator.h b/dbms/src/Storages/DeltaMerge/workload/TableGenerator.h similarity index 96% rename from dbms/src/Storages/DeltaMerge/tools/workload/TableGenerator.h rename to dbms/src/Storages/DeltaMerge/workload/TableGenerator.h index aba5c1590b7..b88bf2b72e2 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/TableGenerator.h +++ b/dbms/src/Storages/DeltaMerge/workload/TableGenerator.h @@ -38,6 +38,6 @@ class TableGenerator virtual TableInfo get(int64_t table_id, std::string table_name) = 0; - virtual ~TableGenerator() {} + virtual ~TableGenerator() = default; }; } // namespace DB::DM::tests \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/TimestampGenerator.h b/dbms/src/Storages/DeltaMerge/workload/TimestampGenerator.h similarity index 100% rename from dbms/src/Storages/DeltaMerge/tools/workload/TimestampGenerator.h rename to dbms/src/Storages/DeltaMerge/workload/TimestampGenerator.h diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Utils.cpp b/dbms/src/Storages/DeltaMerge/workload/Utils.cpp similarity index 94% rename from dbms/src/Storages/DeltaMerge/tools/workload/Utils.cpp rename to dbms/src/Storages/DeltaMerge/workload/Utils.cpp index 1cefae724c6..80d9f788016 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/Utils.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/Utils.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include @@ -83,7 +83,7 @@ std::string fieldToString(const DataTypePtr & data_type, const Field & f) } else if (t == Field::Types::Which::Decimal256) { - auto i = f.get(); + const auto & i = f.get(); auto scale = dynamic_cast(data_type.get())->getScale(); return i.toString(scale); } @@ -105,8 +105,8 @@ std::vector colToVec(const DataTypePtr & data_type, const ColumnPtr std::string blockToString(const Block & block) { std::string s = "id name type values\n"; - auto & cols = block.getColumnsWithTypeAndName(); - for (auto & col : cols) + const auto & cols = block.getColumnsWithTypeAndName(); + for (const auto & col : cols) { s += fmt::format("{} {} {} {}\n", col.column_id, col.name, col.type->getFamilyName(), colToVec(col.type, col.column)); } diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/Utils.h b/dbms/src/Storages/DeltaMerge/workload/Utils.h similarity index 100% rename from dbms/src/Storages/DeltaMerge/tools/workload/Utils.h rename to dbms/src/Storages/DeltaMerge/workload/Utils.h diff --git a/dbms/src/Storages/Page/CMakeLists.txt b/dbms/src/Storages/Page/CMakeLists.txt index cead83fa126..f208dc84be2 100644 --- a/dbms/src/Storages/Page/CMakeLists.txt +++ b/dbms/src/Storages/Page/CMakeLists.txt @@ -14,13 +14,3 @@ add_subdirectory(V2) add_subdirectory(tools) - -# PageStorage Stress test -if (ENABLE_V3_PAGESTORAGE) - add_headers_and_sources(page_stress_testing stress) - add_headers_and_sources(page_stress_testing stress/workload) - add_executable(page_stress_testing EXCLUDE_FROM_ALL ${page_stress_testing_sources}) - target_link_libraries(page_stress_testing dbms page_storage_v3) - target_include_directories(page_stress_testing PRIVATE stress) - target_compile_options(page_stress_testing PRIVATE -Wno-format -lc++) # turn off printf format check -endif() \ No newline at end of file diff --git a/dbms/src/Storages/Page/PageUtil.h b/dbms/src/Storages/Page/PageUtil.h index cebcbdb27f2..b0d8f0f88c8 100644 --- a/dbms/src/Storages/Page/PageUtil.h +++ b/dbms/src/Storages/Page/PageUtil.h @@ -281,7 +281,7 @@ void readFile(T & file, } if (unlikely(bytes_read != expected_bytes)) - throw DB::TiFlashException(fmt::format("No enough data in file {}, read bytes: {} , expected bytes: {}", file->getFileName(), bytes_read, expected_bytes), + throw DB::TiFlashException(fmt::format("No enough data in file {}, read bytes: {}, expected bytes: {}, offset: {}", file->getFileName(), bytes_read, expected_bytes, offset), Errors::PageStorage::FileSizeNotMatch); } diff --git a/dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp b/dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp index e72c7a87541..c4dd2178eb9 100644 --- a/dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp +++ b/dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB { @@ -30,6 +31,7 @@ namespace tests static const std::string FileName = "page_util_test"; TEST(PageUtilsTest, ReadWriteFile) +try { ::remove(FileName.c_str()); @@ -52,6 +54,7 @@ TEST(PageUtilsTest, ReadWriteFile) ::remove(FileName.c_str()); } +CATCH TEST(PageUtilsTest, FileNotExists) { diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index d5f71841b91..37a4fd429f4 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -555,7 +556,7 @@ void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler, for (const auto & [page_id_v3, entry] : entries) { - auto blob_file = read(entry.file_id, entry.offset, data_buf, entry.size, read_limiter); + auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, entry.size, read_limiter); if constexpr (BLOBSTORE_CHECKSUM_ON_READ) { @@ -635,7 +636,7 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li // TODO: Continuously fields can read by one system call. const auto [beg_offset, end_offset] = entry.getFieldOffsets(field_index); const auto size_to_read = end_offset - beg_offset; - auto blob_file = read(entry.file_id, entry.offset + beg_offset, write_offset, size_to_read, read_limiter); + auto blob_file = read(page_id_v3, entry.file_id, entry.offset + beg_offset, write_offset, size_to_read, read_limiter); fields_offset_in_page.emplace(field_index, read_size_this_entry); if constexpr (BLOBSTORE_CHECKSUM_ON_READ) @@ -732,7 +733,7 @@ PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & rea PageMap page_map; for (const auto & [page_id_v3, entry] : entries) { - auto blob_file = read(entry.file_id, entry.offset, pos, entry.size, read_limiter); + auto blob_file = read(page_id_v3, entry.file_id, entry.offset, pos, entry.size, read_limiter); if constexpr (BLOBSTORE_CHECKSUM_ON_READ) { @@ -797,7 +798,7 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r free(p, buf_size); }); - auto blob_file = read(entry.file_id, entry.offset, data_buf, buf_size, read_limiter); + auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, buf_size, read_limiter); if constexpr (BLOBSTORE_CHECKSUM_ON_READ) { ChecksumClass digest; @@ -824,11 +825,20 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r return page; } -BlobFilePtr BlobStore::read(BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background) +BlobFilePtr BlobStore::read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background) { assert(buffers != nullptr); - auto blob_file = getBlobFile(blob_id); - blob_file->read(buffers, offset, size, read_limiter, background); + BlobFilePtr blob_file = getBlobFile(blob_id); + try + { + blob_file->read(buffers, offset, size, read_limiter, background); + } + catch (DB::Exception & e) + { + // add debug message + e.addMessage(fmt::format("(error while reading page data [page_id={}] [blob_id={}] [offset={}] [size={}] [background={}])", page_id_v3, blob_id, offset, size, background)); + e.rethrow(); + } return blob_file; } @@ -1117,21 +1127,15 @@ PageEntriesEdit BlobStore::gc(std::map & std::tie(blobfile_id, file_offset_beg) = getPosFromStats(next_alloc_size); } - PageEntryV3 new_entry; - - read(file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true); - - // No need do crc again, crc won't be changed. - new_entry.checksum = entry.checksum; - - // Need copy the field_offsets - new_entry.field_offsets = entry.field_offsets; - - // Entry size won't be changed. - new_entry.size = entry.size; + // Read the data into buffer by old entry + read(page_id, file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true); + // Most vars of the entry is not changed, but the file id and offset + // need to be updated. + PageEntryV3 new_entry = entry; new_entry.file_id = blobfile_id; new_entry.offset = file_offset_beg + offset_in_data; + new_entry.padded_size = 0; // reset padded size to be zero offset_in_data += new_entry.size; data_pos += new_entry.size; diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index 24bf4652123..6b139b98557 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -296,7 +296,7 @@ class BlobStore : private Allocator PageEntriesEdit handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr); - BlobFilePtr read(BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false); + BlobFilePtr read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false); /** * Ask BlobStats to get a span from BlobStat. diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index 048140ed04f..fdd08c7cb8e 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -82,6 +82,7 @@ try stats.restoreByEntry(PageEntryV3{ .file_id = file_id1, .size = 128, + .padded_size = 0, .tag = 0, .offset = 1024, .checksum = 0x4567, @@ -89,6 +90,7 @@ try stats.restoreByEntry(PageEntryV3{ .file_id = file_id1, .size = 512, + .padded_size = 0, .tag = 0, .offset = 2048, .checksum = 0x4567, @@ -96,6 +98,7 @@ try stats.restoreByEntry(PageEntryV3{ .file_id = file_id2, .size = 512, + .padded_size = 0, .tag = 0, .offset = 2048, .checksum = 0x4567, @@ -531,7 +534,8 @@ TEST_F(BlobStoreTest, testWriteRead) ASSERT_EQ(record.entry.file_id, 1); // Read directly from the file - blob_store.read(record.entry.file_id, + blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id), + record.entry.file_id, record.entry.offset, c_buff_read + index * buff_size, record.entry.size, @@ -631,7 +635,8 @@ TEST_F(BlobStoreTest, testWriteReadWithIOLimiter) { for (const auto & record : edits[i].getRecords()) { - blob_store.read(record.entry.file_id, + blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id), + record.entry.file_id, record.entry.offset, c_buff_read + i * buff_size, record.entry.size, @@ -809,7 +814,8 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) ASSERT_EQ(check_field_sizes, offsets); // Read - blob_store.read(record.entry.file_id, + blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id), + record.entry.file_id, record.entry.offset, c_buff_read + index * buff_size, record.entry.size, diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 6e2b0efa1ea..83e07f75d37 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -75,7 +75,7 @@ try auto snap0 = dir->createSnapshot(); EXPECT_ENTRY_NOT_EXIST(dir, 1, snap0); - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -85,7 +85,7 @@ try auto snap1 = dir->createSnapshot(); EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(2, entry2); @@ -102,7 +102,7 @@ try EXPECT_ENTRIES_EQ(expected_entries, dir, ids, snap2); } - PageEntryV3 entry2_v2{.file_id = 2 + 102, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2_v2{.file_id = 2 + 102, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -123,7 +123,7 @@ try auto snap0 = dir->createSnapshot(); EXPECT_ENTRY_NOT_EXIST(dir, page_id, snap0); - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry1); @@ -133,7 +133,7 @@ try auto snap1 = dir->createSnapshot(); EXPECT_ENTRY_EQ(entry1, dir, page_id, snap1); - PageEntryV3 entry2{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x1234, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x1234, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry2); @@ -151,7 +151,7 @@ try // Put identical page within one `edit` page_id++; - PageEntryV3 entry3{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x12345, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x12345, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry1); @@ -172,8 +172,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyPutDelRead) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -185,8 +185,8 @@ try EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); - PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry4{.file_id = 4, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry4{.file_id = 4, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -217,8 +217,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyUpdateOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -236,14 +236,14 @@ try EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); // Update on ref page is not allowed - PageEntryV3 entry_updated{.file_id = 999, .size = 16, .tag = 0, .offset = 0x123, .checksum = 0x123}; + PageEntryV3 entry_updated{.file_id = 999, .size = 16, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x123}; { PageEntriesEdit edit; edit.put(3, entry_updated); ASSERT_ANY_THROW(dir->apply(std::move(edit))); } - PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .tag = 0, .offset = 0x123, .checksum = 0x123}; + PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x123}; { PageEntriesEdit edit; edit.put(2, entry_updated2); @@ -255,8 +255,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyDeleteOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -305,8 +305,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyRefOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -343,8 +343,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyDuplicatedRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -410,8 +410,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyCollapseDuplicatedRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -447,9 +447,9 @@ CATCH TEST_F(PageDirectoryTest, ApplyRefToNotExistEntry) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -628,12 +628,12 @@ try } CATCH -#define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ - PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ +#define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ + PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ entries.createNewEntry(PageVersion(VERSION), entry_v##VERSION); #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) -#define INSERT_GC_ENTRY(VERSION, EPOCH) \ - PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ +#define INSERT_GC_ENTRY(VERSION, EPOCH) \ + PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .padded_size = 0, .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ entries.createNewEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); class VersionedEntriesTest : public ::testing::Test @@ -1271,12 +1271,12 @@ class PageDirectoryGCTest : public PageDirectoryTest { }; -#define INSERT_ENTRY_TO(PAGE_ID, VERSION, BLOB_FILE_ID) \ - PageEntryV3 entry_v##VERSION{.file_id = (BLOB_FILE_ID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ - { \ - PageEntriesEdit edit; \ - edit.put((PAGE_ID), entry_v##VERSION); \ - dir->apply(std::move(edit)); \ +#define INSERT_ENTRY_TO(PAGE_ID, VERSION, BLOB_FILE_ID) \ + PageEntryV3 entry_v##VERSION{.file_id = (BLOB_FILE_ID), .size = (VERSION), .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ + { \ + PageEntriesEdit edit; \ + edit.put((PAGE_ID), entry_v##VERSION); \ + dir->apply(std::move(edit)); \ } // Insert an entry into mvcc directory #define INSERT_ENTRY(PAGE_ID, VERSION) INSERT_ENTRY_TO(PAGE_ID, VERSION, 1) @@ -1566,7 +1566,7 @@ try INSERT_ENTRY_ACQ_SNAP(page_id, 5); INSERT_ENTRY(another_page_id, 6); INSERT_ENTRY(another_page_id, 7); - PageEntryV3 entry_v8{.file_id = 1, .size = 8, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_v8{.file_id = 1, .size = 8, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(page_id); @@ -1756,7 +1756,7 @@ TEST_F(PageDirectoryGCTest, GCOnRefedEntries) try { // 10->entry1, 11->10=>11->entry1; del 10->entry1 - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(10, entry1); @@ -1793,7 +1793,7 @@ TEST_F(PageDirectoryGCTest, GCOnRefedEntries2) try { // 10->entry1, 11->10=>11->entry1; del 10->entry1 - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(10, entry1); @@ -1836,7 +1836,7 @@ TEST_F(PageDirectoryGCTest, UpsertOnRefedEntries) try { // 10->entry1, 11->10, 12->10 - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(10, entry1); @@ -1860,7 +1860,7 @@ try } // upsert 10->entry2 - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; auto full_gc_entries = dir->getEntriesByBlobIds({1}); @@ -2024,10 +2024,10 @@ try return d; }; - PageEntryV3 entry_1_v1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_1_v2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_2_v1{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_2_v2{.file_id = 2, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_1_v1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_1_v2{.file_id = 1, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_2_v1{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_2_v2{.file_id = 2, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry_1_v1); @@ -2055,8 +2055,8 @@ try // 10->ext, 11->10, del 10->ext // 50->entry, 51->50, 52->51=>50, del 50 - PageEntryV3 entry_50{.file_id = 1, .size = 50, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_60{.file_id = 1, .size = 90, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_50{.file_id = 1, .size = 50, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_60{.file_id = 1, .size = 90, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -2218,9 +2218,9 @@ try Poco::File(fmt::format("{}/{}{}", path, BlobFile::BLOB_PREFIX_NAME, file_id1)).createFile(); Poco::File(fmt::format("{}/{}{}", path, BlobFile::BLOB_PREFIX_NAME, file_id2)).createFile(); - PageEntryV3 entry_1_v1{.file_id = file_id1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_5_v1{.file_id = file_id2, .size = 255, .tag = 0, .offset = 0x100, .checksum = 0x4567}; - PageEntryV3 entry_5_v2{.file_id = file_id2, .size = 255, .tag = 0, .offset = 0x400, .checksum = 0x4567}; + PageEntryV3 entry_1_v1{.file_id = file_id1, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_5_v1{.file_id = file_id2, .size = 255, .padded_size = 0, .tag = 0, .offset = 0x100, .checksum = 0x4567}; + PageEntryV3 entry_5_v2{.file_id = file_id2, .size = 255, .padded_size = 0, .tag = 0, .offset = 0x400, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry_1_v1); @@ -2275,8 +2275,8 @@ CATCH TEST_F(PageDirectoryGCTest, CleanAfterDecreaseRef) try { - PageEntryV3 entry_50_1{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_50_2{.file_id = 2, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_50_1{.file_id = 1, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_50_2{.file_id = 2, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; auto restore_from_edit = [](const PageEntriesEdit & edit) { auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index f7ba33c46c8..f9ef25cb973 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1441,6 +1441,55 @@ try } CATCH +TEST_F(PageStorageTest, EntryTagAfterFullGC) +try +{ + { + PageStorage::Config config; + config.blob_heavy_gc_valid_rate = 1.0; /// always run full gc + page_storage = reopenWithConfig(config); + } + + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + PageId page_id = 120; + UInt64 tag = 12345; + { + WriteBatch batch; + batch.putPage(page_id, tag, std::make_shared(c_buff, buf_sz), buf_sz, {}); + page_storage->write(std::move(batch)); + } + + { + auto entry = page_storage->getEntry(page_id); + ASSERT_EQ(entry.tag, tag); + auto page = page_storage->read(page_id); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page.data.begin() + i), static_cast(i % 0xff)); + } + } + + auto done_full_gc = page_storage->gc(); + EXPECT_TRUE(done_full_gc); + + { + auto entry = page_storage->getEntry(page_id); + ASSERT_EQ(entry.tag, tag); + auto page = page_storage->read(page_id); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page.data.begin() + i), static_cast(i % 0xff)); + } + } +} +CATCH } // namespace PS::V3::tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index 6d47adabbc5..b4e6c2d9204 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -34,8 +34,8 @@ namespace DB::PS::V3::tests { TEST(WALSeriTest, AllPuts) { - PageEntryV3 entry_p1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p2{.file_id = 1, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver20(/*seq=*/20); PageEntriesEdit edit; edit.put(1, entry_p1); @@ -56,8 +56,8 @@ TEST(WALSeriTest, AllPuts) TEST(WALSeriTest, PutsAndRefsAndDels) try { - PageEntryV3 entry_p3{.file_id = 1, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p5{.file_id = 1, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p3{.file_id = 1, .size = 3, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p5{.file_id = 1, .size = 5, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver21(/*seq=*/21); PageEntriesEdit edit; edit.put(3, entry_p3); @@ -104,9 +104,9 @@ CATCH TEST(WALSeriTest, Upserts) { - PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver20_1(/*seq=*/20, /*epoch*/ 1); PageVersion ver21_1(/*seq=*/21, /*epoch*/ 1); PageEntriesEdit edit; @@ -164,7 +164,7 @@ TEST(WALSeriTest, RefExternalAndEntry) { PageEntriesEdit edit; - PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; edit.varEntry(1, ver1_0, entry_p1_2, 2); edit.varDel(1, ver2_0); edit.varRef(2, ver3_0, 1); @@ -405,8 +405,8 @@ try ASSERT_NE(wal, nullptr); // Stage 2. Apply with only puts - PageEntryV3 entry_p1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p2{.file_id = 1, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver20(/*seq=*/20); { PageEntriesEdit edit; @@ -435,8 +435,8 @@ try } // Stage 3. Apply with puts and refs - PageEntryV3 entry_p3{.file_id = 1, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p5{.file_id = 1, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p3{.file_id = 1, .size = 3, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p5{.file_id = 1, .size = 5, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver21(/*seq=*/21); { PageEntriesEdit edit; @@ -468,9 +468,9 @@ try // Stage 4. Apply with delete and upsert - PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver20_1(/*seq=*/20, /*epoch*/ 1); PageVersion ver21_1(/*seq=*/21, /*epoch*/ 1); { @@ -514,8 +514,8 @@ try std::vector size_each_edit; // Stage 1. Apply with only puts - PageEntryV3 entry_p1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p2{.file_id = 1, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver20(/*seq=*/20); { PageEntriesEdit edit; @@ -526,8 +526,8 @@ try } // Stage 2. Apply with puts and refs - PageEntryV3 entry_p3{.file_id = 1, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p5{.file_id = 1, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p3{.file_id = 1, .size = 3, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p5{.file_id = 1, .size = 5, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver21(/*seq=*/21); { PageEntriesEdit edit; @@ -540,9 +540,9 @@ try } // Stage 3. Apply with delete and upsert - PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageVersion ver20_1(/*seq=*/20, /*epoch*/ 1); PageVersion ver21_1(/*seq=*/21, /*epoch*/ 1); { @@ -615,7 +615,7 @@ try PageVersion ver(/*seq*/ 32); for (size_t i = 0; i < num_edits_test; ++i) { - PageEntryV3 entry{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntriesEdit edit; const size_t num_pages_put = d_20(rd); for (size_t p = 0; p < num_pages_put; ++p) @@ -660,7 +660,7 @@ try .persisted_log_files = persisted_log_files}; PageEntriesEdit snap_edit; - PageEntryV3 entry{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; std::uniform_int_distribution<> d_10000(0, 10000); // just fill in some random entry for (size_t i = 0; i < 70; ++i) diff --git a/dbms/src/Storages/Page/stress/stress_page_storage.cpp b/dbms/src/Storages/Page/stress/stress_page_storage.cpp deleted file mode 100644 index 818be710363..00000000000 --- a/dbms/src/Storages/Page/stress/stress_page_storage.cpp +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -namespace DB -{ -// Define is_background_thread for this binary -// It is required for `RateLimiter` but we do not link with `BackgroundProcessingPool`. -#if __APPLE__ && __clang__ -__thread bool is_background_thread = false; -#else -thread_local bool is_background_thread = false; -#endif -} // namespace DB - -int main(int argc, char ** argv) -try -{ - StressEnv::initGlobalLogger(); - auto env = StressEnv::parse(argc, argv); - env.setup(); - - auto & mamager = StressWorkloadManger::getInstance(); - mamager.setEnv(env); - mamager.runWorkload(); - - return StressEnvStatus::getInstance().isSuccess(); -} -catch (...) -{ - DB::tryLogCurrentException(""); - exit(-1); -} diff --git a/dbms/src/Storages/DeltaMerge/tools/CMakeLists.txt b/dbms/src/Storages/Page/workload/CMakeLists.txt similarity index 53% rename from dbms/src/Storages/DeltaMerge/tools/CMakeLists.txt rename to dbms/src/Storages/Page/workload/CMakeLists.txt index 36270a0c8e4..5c8ecb34d97 100644 --- a/dbms/src/Storages/DeltaMerge/tools/CMakeLists.txt +++ b/dbms/src/Storages/Page/workload/CMakeLists.txt @@ -14,4 +14,9 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) -add_subdirectory (workload EXCLUDE_FROM_ALL) +set (page-workload-src HeavyMemoryCostInGC.cpp HeavyRead.cpp HeavySkewWriteRead.cpp HeavyWrite.cpp HighValidBigFileGC.cpp HoldSnapshotsLongTime.cpp Normal.cpp + PageStorageInMemoryCapacity.cpp ThousandsOfOffset.cpp MainEntry.cpp Normal.cpp PageStorageInMemoryCapacity.cpp PSBackground.cpp PSRunnable.cpp PSStressEnv.cpp PSWorkload.cpp) + +add_library (page-workload-lib ${page-workload-src}) +target_link_libraries (page-workload-lib dbms clickhouse_functions clickhouse-server-lib) +target_compile_options (page-workload-lib PRIVATE -Wno-format -lc++) \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/HeavyMemoryCostInGC.cpp b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.cpp similarity index 96% rename from dbms/src/Storages/Page/stress/workload/HeavyMemoryCostInGC.cpp rename to dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.cpp index 40595f0cb59..7e745e29fc2 100644 --- a/dbms/src/Storages/Page/stress/workload/HeavyMemoryCostInGC.cpp +++ b/dbms/src/Storages/Page/workload/HeavyMemoryCostInGC.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class HeavyMemoryCostInGC : public StressWorkload , public StressWorkloadFunc @@ -81,3 +83,4 @@ class HeavyMemoryCostInGC }; REGISTER_WORKLOAD(HeavyMemoryCostInGC) +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/workload/HeavyRead.cpp b/dbms/src/Storages/Page/workload/HeavyRead.cpp similarity index 94% rename from dbms/src/Storages/Page/stress/workload/HeavyRead.cpp rename to dbms/src/Storages/Page/workload/HeavyRead.cpp index 15aeb1320cf..a67c435e84c 100644 --- a/dbms/src/Storages/Page/stress/workload/HeavyRead.cpp +++ b/dbms/src/Storages/Page/workload/HeavyRead.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class HeavyRead : public StressWorkload , public StressWorkloadFunc { @@ -69,4 +71,5 @@ class HeavyRead : public StressWorkload } }; -REGISTER_WORKLOAD(HeavyRead) \ No newline at end of file +REGISTER_WORKLOAD(HeavyRead) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/HeavySkewWriteRead.cpp b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.cpp similarity index 95% rename from dbms/src/Storages/Page/stress/workload/HeavySkewWriteRead.cpp rename to dbms/src/Storages/Page/workload/HeavySkewWriteRead.cpp index 78ffa5b60e0..805bf105358 100644 --- a/dbms/src/Storages/Page/stress/workload/HeavySkewWriteRead.cpp +++ b/dbms/src/Storages/Page/workload/HeavySkewWriteRead.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class HeavySkewWriteRead : public StressWorkload , public StressWorkloadFunc { @@ -84,4 +86,5 @@ class HeavySkewWriteRead : public StressWorkload } }; -REGISTER_WORKLOAD(HeavySkewWriteRead) \ No newline at end of file +REGISTER_WORKLOAD(HeavySkewWriteRead) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/HeavyWrite.cpp b/dbms/src/Storages/Page/workload/HeavyWrite.cpp similarity index 94% rename from dbms/src/Storages/Page/stress/workload/HeavyWrite.cpp rename to dbms/src/Storages/Page/workload/HeavyWrite.cpp index 265b289db56..8dfd7f810f7 100644 --- a/dbms/src/Storages/Page/stress/workload/HeavyWrite.cpp +++ b/dbms/src/Storages/Page/workload/HeavyWrite.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class HeavyWrite : public StressWorkload , public StressWorkloadFunc { @@ -71,4 +73,5 @@ class HeavyWrite : public StressWorkload } }; -REGISTER_WORKLOAD(HeavyWrite) \ No newline at end of file +REGISTER_WORKLOAD(HeavyWrite) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/HighValidBigFileGC.cpp b/dbms/src/Storages/Page/workload/HighValidBigFileGC.cpp similarity index 97% rename from dbms/src/Storages/Page/stress/workload/HighValidBigFileGC.cpp rename to dbms/src/Storages/Page/workload/HighValidBigFileGC.cpp index 866782c9578..a9af6aebb76 100644 --- a/dbms/src/Storages/Page/stress/workload/HighValidBigFileGC.cpp +++ b/dbms/src/Storages/Page/workload/HighValidBigFileGC.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class HighValidBigFileGCWorkload : public StressWorkload , public StressWorkloadFunc @@ -129,3 +131,4 @@ class HighValidBigFileGCWorkload }; REGISTER_WORKLOAD(HighValidBigFileGCWorkload) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/HoldSnapshotsLongTime.cpp b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.cpp similarity index 95% rename from dbms/src/Storages/Page/stress/workload/HoldSnapshotsLongTime.cpp rename to dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.cpp index b49347fc858..f02fbf65bcd 100644 --- a/dbms/src/Storages/Page/stress/workload/HoldSnapshotsLongTime.cpp +++ b/dbms/src/Storages/Page/workload/HoldSnapshotsLongTime.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class HoldSnapshotsLongTime : public StressWorkload , public StressWorkloadFunc { @@ -93,4 +95,5 @@ class HoldSnapshotsLongTime : public StressWorkload } }; -REGISTER_WORKLOAD(HoldSnapshotsLongTime) \ No newline at end of file +REGISTER_WORKLOAD(HoldSnapshotsLongTime) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/workload/MainEntry.cpp b/dbms/src/Storages/Page/workload/MainEntry.cpp new file mode 100644 index 00000000000..ac82e1ea4bc --- /dev/null +++ b/dbms/src/Storages/Page/workload/MainEntry.cpp @@ -0,0 +1,70 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include + +using namespace DB::PS::tests; + +int StressWorkload::mainEntry(int argc, char ** argv) +{ + { + // maybe due to sequence of linking, REGISTER_WORKLOAD is not visible to main function in dbms/src/Server/main.cpp + // cause that REGISTER_WORKLOAD will not be triggered before mainEntry + // we do this to trigger REGISTER_WORKLOAD explicitly. + void _work_load_register_named_HeavyMemoryCostInGC(); + void (*f)() = _work_load_register_named_HeavyMemoryCostInGC; + (void)f; + void _work_load_register_named_HeavyRead(); + f = _work_load_register_named_HeavyRead; + (void)f; + void _work_load_register_named_HeavySkewWriteRead(); + f = _work_load_register_named_HeavySkewWriteRead; + (void)f; + void _work_load_register_named_HeavyWrite(); + f = _work_load_register_named_HeavyWrite; + (void)f; + void _work_load_register_named_HighValidBigFileGCWorkload(); + f = _work_load_register_named_HighValidBigFileGCWorkload; + (void)f; + void _work_load_register_named_HoldSnapshotsLongTime(); + f = _work_load_register_named_HoldSnapshotsLongTime; + (void)f; + void _work_load_register_named_PageStorageInMemoryCapacity(); + f = _work_load_register_named_PageStorageInMemoryCapacity; + (void)f; + void _work_load_register_named_NormalWorkload(); + f = _work_load_register_named_NormalWorkload; + (void)f; + void _work_load_register_named_ThousandsOfOffset(); + f = _work_load_register_named_ThousandsOfOffset; + (void)f; + } + try + { + StressEnv::initGlobalLogger(); + auto env = StressEnv::parse(argc, argv); + env.setup(); + + auto & mamager = StressWorkloadManger::getInstance(); + mamager.setEnv(env); + mamager.runWorkload(); + + return StressEnvStatus::getInstance().isSuccess(); + } + catch (...) + { + DB::tryLogCurrentException(""); + exit(-1); + } +} \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/Normal.cpp b/dbms/src/Storages/Page/workload/Normal.cpp similarity index 95% rename from dbms/src/Storages/Page/stress/workload/Normal.cpp rename to dbms/src/Storages/Page/workload/Normal.cpp index 0323b857613..57229395809 100644 --- a/dbms/src/Storages/Page/stress/workload/Normal.cpp +++ b/dbms/src/Storages/Page/workload/Normal.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class NormalWorkload : public StressWorkload , public StressWorkloadFunc @@ -77,3 +79,4 @@ class NormalWorkload }; REGISTER_WORKLOAD(NormalWorkload) +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSBackground.cpp b/dbms/src/Storages/Page/workload/PSBackground.cpp similarity index 96% rename from dbms/src/Storages/Page/stress/PSBackground.cpp rename to dbms/src/Storages/Page/workload/PSBackground.cpp index af7329e8348..247bea23dcc 100644 --- a/dbms/src/Storages/Page/stress/PSBackground.cpp +++ b/dbms/src/Storages/Page/workload/PSBackground.cpp @@ -13,11 +13,14 @@ // limitations under the License. #include -#include #include #include +#include #include + +namespace DB::PS::tests +{ void PSMetricsDumper::onTime(Poco::Timer & /*timer*/) { for (auto & metric : metrics) @@ -107,3 +110,4 @@ void StressTimeout::start() { timeout_timer.start(Poco::TimerCallback(*this, &StressTimeout::onTime)); } +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSBackground.h b/dbms/src/Storages/Page/workload/PSBackground.h similarity index 97% rename from dbms/src/Storages/Page/stress/PSBackground.h rename to dbms/src/Storages/Page/workload/PSBackground.h index 8c22458c5e8..c91dad1361f 100644 --- a/dbms/src/Storages/Page/stress/PSBackground.h +++ b/dbms/src/Storages/Page/workload/PSBackground.h @@ -15,14 +15,16 @@ #pragma once #include #include -#include #include +#include namespace CurrentMetrics { extern const Metric PSMVCCSnapshotsList; } +namespace DB::PS::tests +{ class PSMetricsDumper { public: @@ -162,3 +164,4 @@ class StressTimeout Poco::Timer timeout_timer; }; using StressTimeoutPtr = std::shared_ptr; +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp similarity index 97% rename from dbms/src/Storages/Page/stress/PSRunnable.cpp rename to dbms/src/Storages/Page/workload/PSRunnable.cpp index 5d6c8ecc5c6..5e9774ccc99 100644 --- a/dbms/src/Storages/Page/stress/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -16,14 +16,16 @@ #include #include #include -#include #include #include +#include #include #include #include +namespace DB::PS::tests +{ void PSRunnable::run() try { @@ -69,7 +71,7 @@ DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder std::uniform_int_distribution<> dist(0, 3000); const size_t buff_sz = approx_page_mb * DB::MB + dist(size_gen); - char * buff = static_cast(malloc(buff_sz)); + char * buff = static_cast(malloc(buff_sz)); // NOLINT if (buff == nullptr) { throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); @@ -78,7 +80,7 @@ DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder const char buff_ch = pageId % 0xFF; memset(buff, buff_ch, buff_sz); - holder = DB::createMemHolder(buff, [&](char * p) { free(p); }); + holder = DB::createMemHolder(buff, [&](char * p) { free(p); }); // NOLINT return std::make_shared(const_cast(buff), buff_sz); } @@ -88,7 +90,7 @@ void PSWriter::updatedRandomData() size_t memory_size = approx_page_mb * DB::MB * 2; if (memory == nullptr) { - memory = static_cast(malloc(memory_size)); + memory = static_cast(malloc(memory_size)); // NOLINT if (memory == nullptr) { throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); @@ -147,7 +149,7 @@ void PSCommonWriter::updatedRandomData() if (memory == nullptr) { - memory = static_cast(malloc(memory_size)); + memory = static_cast(malloc(memory_size)); // NOLINT if (memory == nullptr) { throw DB::Exception("Alloc fix memory failed.", DB::ErrorCodes::LOGICAL_ERROR); @@ -415,3 +417,4 @@ DB::PageId PSIncreaseWriter::genRandomPageId() { return static_cast(begin_page_id++); } +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSRunnable.h b/dbms/src/Storages/Page/workload/PSRunnable.h similarity index 90% rename from dbms/src/Storages/Page/stress/PSRunnable.h rename to dbms/src/Storages/Page/workload/PSRunnable.h index 3ddcd73c093..b723236391d 100644 --- a/dbms/src/Storages/Page/stress/PSRunnable.h +++ b/dbms/src/Storages/Page/workload/PSRunnable.h @@ -13,12 +13,14 @@ // limitations under the License. #pragma once -#include #include #include +#include const DB::PageId MAX_PAGE_ID_DEFAULT = 1000; +namespace DB::PS::tests +{ class PSRunnable : public Poco::Runnable { public: @@ -46,7 +48,7 @@ class PSWriter : public PSRunnable gen.seed(time(nullptr)); } - virtual ~PSWriter() + ~PSWriter() override { if (memory != nullptr) { @@ -54,7 +56,7 @@ class PSWriter : public PSRunnable } } - virtual String description() override + String description() override { return fmt::format("(Stress Test Writer {})", index); } @@ -67,7 +69,7 @@ class PSWriter : public PSRunnable static void fillAllPages(const PSPtr & ps); - virtual bool runImpl() override; + bool runImpl() override; protected: virtual DB::PageId genRandomPageId(); @@ -91,11 +93,11 @@ class PSCommonWriter : public PSWriter : PSWriter(ps_, index_) {} - virtual void updatedRandomData() override; + void updatedRandomData() override; - virtual String description() override { return fmt::format("(Stress Test Common Writer {})", index); } + String description() override { return fmt::format("(Stress Test Common Writer {})", index); } - virtual bool runImpl() override; + bool runImpl() override; void setBatchBufferNums(size_t numbers); @@ -120,7 +122,7 @@ class PSCommonWriter : public PSWriter DB::PageFieldSizes data_sizes = {}; - virtual DB::PageId genRandomPageId() override; + DB::PageId genRandomPageId() override; virtual size_t genBufferSize(); }; @@ -154,7 +156,7 @@ class PSWindowWriter : public PSCommonWriter void setNormalDistributionSigma(size_t sigma); protected: - virtual DB::PageId genRandomPageId() override; + DB::PageId genRandomPageId() override; protected: size_t window_size = 100; @@ -170,12 +172,12 @@ class PSIncreaseWriter : public PSCommonWriter String description() override { return fmt::format("(Stress Test Increase Writer {})", index); } - virtual bool runImpl() override; + bool runImpl() override; void setPageRange(size_t page_range); protected: - virtual DB::PageId genRandomPageId() override; + DB::PageId genRandomPageId() override; protected: size_t begin_page_id = 1; @@ -192,9 +194,9 @@ class PSReader : public PSRunnable gen.seed(time(nullptr)); } - virtual String description() override { return fmt::format("(Stress Test PSReader {})", index); } + String description() override { return fmt::format("(Stress Test PSReader {})", index); } - virtual bool runImpl() override; + bool runImpl() override; void setPageReadOnce(size_t page_read_once); @@ -242,7 +244,7 @@ class PSWindowReader : public PSReader void setWriterNums(size_t writer_nums); protected: - virtual DB::PageIds genRandomPageIds() override; + DB::PageIds genRandomPageIds() override; protected: size_t window_size = 100; @@ -261,12 +263,13 @@ class PSSnapshotReader : public PSReader : PSReader(ps_, index_) {} - virtual bool runImpl() override; + bool runImpl() override; void setSnapshotGetIntervalMs(size_t snapshot_get_interval_ms_); protected: - size_t snapshots_hold_num; + size_t snapshots_hold_num = 0; size_t snapshot_get_interval_ms = 0; std::list snapshots; -}; \ No newline at end of file +}; +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSStressEnv.cpp b/dbms/src/Storages/Page/workload/PSStressEnv.cpp similarity index 97% rename from dbms/src/Storages/Page/stress/PSStressEnv.cpp rename to dbms/src/Storages/Page/workload/PSStressEnv.cpp index 7d680cd43c0..f5cead0a158 100644 --- a/dbms/src/Storages/Page/stress/PSStressEnv.cpp +++ b/dbms/src/Storages/Page/workload/PSStressEnv.cpp @@ -16,18 +16,20 @@ #include #include #include -#include -#include #include #include #include #include #include #include +#include +#include #include #include +namespace DB::PS::tests +{ Poco::Logger * StressEnv::logger; void StressEnv::initGlobalLogger() { @@ -146,3 +148,4 @@ void StressEnv::setup() init_pages = true; setupSignal(); } +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSStressEnv.h b/dbms/src/Storages/Page/workload/PSStressEnv.h similarity index 98% rename from dbms/src/Storages/Page/stress/PSStressEnv.h rename to dbms/src/Storages/Page/workload/PSStressEnv.h index 1c7d8ee761f..e67cb325430 100644 --- a/dbms/src/Storages/Page/stress/PSStressEnv.h +++ b/dbms/src/Storages/Page/workload/PSStressEnv.h @@ -25,6 +25,8 @@ namespace Poco class Logger; } +namespace DB::PS::tests +{ using PSPtr = std::shared_ptr; enum StressEnvStat @@ -124,3 +126,4 @@ struct StressEnv void setup(); }; +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSWorkload.cpp b/dbms/src/Storages/Page/workload/PSWorkload.cpp similarity index 98% rename from dbms/src/Storages/Page/stress/PSWorkload.cpp rename to dbms/src/Storages/Page/workload/PSWorkload.cpp index ce1f8d92ce0..81f13527f48 100644 --- a/dbms/src/Storages/Page/stress/PSWorkload.cpp +++ b/dbms/src/Storages/Page/workload/PSWorkload.cpp @@ -14,12 +14,14 @@ #include #include -#include #include #include #include +#include #include +namespace DB::PS::tests +{ void StressWorkload::onDumpResult() { UInt64 time_interval = stop_watch.elapsedMilliseconds(); @@ -177,3 +179,4 @@ void StressWorkloadManger::runWorkload() } } } +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/PSWorkload.h b/dbms/src/Storages/Page/workload/PSWorkload.h similarity index 92% rename from dbms/src/Storages/Page/stress/PSWorkload.h rename to dbms/src/Storages/Page/workload/PSWorkload.h index cb099b4203a..eaaaf4eba5b 100644 --- a/dbms/src/Storages/Page/stress/PSWorkload.h +++ b/dbms/src/Storages/Page/workload/PSWorkload.h @@ -16,15 +16,17 @@ #include #include -#include -#include -#include #include #include #include +#include +#include +#include #include #define NORMAL_WORKLOAD 0 +namespace DB::PS::tests +{ template class StressWorkloadFunc { @@ -45,6 +47,8 @@ class StressWorkloadFunc class StressWorkload { public: + static int mainEntry(int argc, char ** argv); + explicit StressWorkload(StressEnv options_) : options(options_) {} @@ -189,13 +193,15 @@ class StressWorkloadManger StressEnv options; }; -#define REGISTER_WORKLOAD(WORKLOAD) \ - static void __attribute__((constructor)) _work_load_register_named_##WORKLOAD(void) \ - { \ - StressWorkloadManger::getInstance().reg( \ - WORKLOAD::nameFunc(), \ - WORKLOAD::maskFunc(), \ - [](const StressEnv & opts) -> std::shared_ptr { \ - return std::make_shared(opts); \ - }); \ +#define REGISTER_WORKLOAD(WORKLOAD) \ + void __attribute__((constructor)) _work_load_register_named_##WORKLOAD(void) \ + { \ + StressWorkloadManger::getInstance().reg( \ + WORKLOAD::nameFunc(), \ + WORKLOAD::maskFunc(), \ + [](const StressEnv & opts) -> std::shared_ptr { \ + return std::make_shared(opts); \ + }); \ } + +} // namespace DB::PS::tests diff --git a/dbms/src/Storages/Page/stress/workload/PageStorageInMemoryCapacity.cpp b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.cpp similarity index 96% rename from dbms/src/Storages/Page/stress/workload/PageStorageInMemoryCapacity.cpp rename to dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.cpp index 190cbf6b323..6ab321d1a10 100644 --- a/dbms/src/Storages/Page/stress/workload/PageStorageInMemoryCapacity.cpp +++ b/dbms/src/Storages/Page/workload/PageStorageInMemoryCapacity.cpp @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include #include #include - #ifdef __APPLE__ #include @@ -27,6 +26,8 @@ #include #endif +namespace DB::PS::tests +{ class PageStorageInMemoryCapacity : public StressWorkload , public StressWorkloadFunc { @@ -89,14 +90,14 @@ class PageStorageInMemoryCapacity : public StressWorkload } FILE * file = fopen("/proc/meminfo", "r"); - if (file != NULL) + if (file != nullptr) { char buffer[128]; #define MEMORY_TOTAL_LABEL "MemTotal:" while (fgets(buffer, 128, file)) { if ((strncmp((buffer), (MEMORY_TOTAL_LABEL), strlen(MEMORY_TOTAL_LABEL)) == 0) - && sscanf(buffer + strlen(MEMORY_TOTAL_LABEL), " %32llu kB", &total_mem)) + && sscanf(buffer + strlen(MEMORY_TOTAL_LABEL), " %32llu kB", &total_mem)) // NOLINT { break; } @@ -174,4 +175,5 @@ class PageStorageInMemoryCapacity : public StressWorkload } }; -REGISTER_WORKLOAD(PageStorageInMemoryCapacity) \ No newline at end of file +REGISTER_WORKLOAD(PageStorageInMemoryCapacity) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/stress/workload/ThousandsOfOffset.cpp b/dbms/src/Storages/Page/workload/ThousandsOfOffset.cpp similarity index 97% rename from dbms/src/Storages/Page/stress/workload/ThousandsOfOffset.cpp rename to dbms/src/Storages/Page/workload/ThousandsOfOffset.cpp index 3a215f76769..5a02ef48d68 100644 --- a/dbms/src/Storages/Page/stress/workload/ThousandsOfOffset.cpp +++ b/dbms/src/Storages/Page/workload/ThousandsOfOffset.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +namespace DB::PS::tests +{ class ThousandsOfOffset : public StressWorkload , public StressWorkloadFunc { @@ -168,4 +170,5 @@ class ThousandsOfOffset : public StressWorkload } }; -REGISTER_WORKLOAD(ThousandsOfOffset) \ No newline at end of file +REGISTER_WORKLOAD(ThousandsOfOffset) +} // namespace DB::PS::tests \ No newline at end of file diff --git a/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp b/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp index dcf727614b1..ea19ff08dd3 100644 --- a/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp +++ b/dbms/src/TestUtils/ColumnsToTiPBExpr.cpp @@ -36,6 +36,7 @@ void columnToTiPBExpr(tipb::Expr * expr, const ColumnWithTypeAndName column, siz if (column.column->isColumnNullable()) { auto [col, null_map] = removeNullable(column.column.get()); + (void)null_map; is_const = col->isColumnConst(); } } @@ -97,6 +98,7 @@ void columnsToTiPBExprForTiDBCast( if (type_column.column->isColumnNullable()) { auto [col, null_map] = removeNullable(type_column.column.get()); + (void)null_map; is_const = col->isColumnConst(); } }