Skip to content

Commit

Permalink
Fix PageStorage GC with high valid rate PageFile (#2436)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Aug 3, 2021
1 parent ae1fc3b commit 543e252
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 65 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(random_exception_after_page_storage_sequence_acquired) \
M(random_slow_page_storage_remove_expired_snapshots) \
M(random_slow_page_storage_list_all_live_files) \
M(force_set_safepoint_when_decode_block)
M(force_set_safepoint_when_decode_block) \
M(force_set_page_data_compact_batch)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/stress/DMStressProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct StressOptions
String table_name;
bool verify;
UInt32 verify_sleep_sec;

std::vector<std::string> failpoints;
};

template <typename T>
Expand Down
44 changes: 29 additions & 15 deletions dbms/src/Storages/DeltaMerge/tests/stress/main.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Common/FailPoint.h>
#include <Storages/DeltaMerge/tests/stress/DMStressProxy.h>
#include <fmt/format.h>

#include <boost/program_options.hpp>

Expand All @@ -9,19 +10,23 @@ StressOptions parseStressOptions(int argc, char * argv[])
{
using boost::program_options::value;
boost::program_options::options_description desc("Allowed options");
desc.add_options()("help", "produce help message")(
"insert_concurrency", value<UInt32>()->default_value(58), "number of insert thread")(
"update_concurrency", value<UInt32>()->default_value(40), "number of update thread")(
"delete_concurrency", value<UInt32>()->default_value(1), "number of delete thread")(
"write_sleep_us", value<UInt32>()->default_value(10), "sleep microseconds between write operators")(
"write_rows_per_block", value<UInt32>()->default_value(8), "number of rows per write(insert or update)")(
"read_concurrency", value<UInt32>()->default_value(20), "number of read thread")(
"read_sleep_us", value<UInt32>()->default_value(20), "sleep microseconds between read operations")(
"gen_total_rows", value<UInt64>()->default_value(100000000), "generate data total rows")(
"gen_rows_per_block", value<UInt32>()->default_value(128), "generate data rows per block")(
"gen_concurrency", value<UInt32>()->default_value(100), "number of generate thread")(
"table_name", value<String>()->default_value("stress2"), "Table name")("verify", value<bool>()->default_value(true), "Verify")(
"verify_sleep_sec", value<UInt32>()->default_value(120), "Verify sleep seconds");
desc.add_options() //
("help", "produce help message") //
("insert_concurrency", value<UInt32>()->default_value(58), "number of insert thread") //
("update_concurrency", value<UInt32>()->default_value(40), "number of update thread") //
("delete_concurrency", value<UInt32>()->default_value(1), "number of delete thread") //
("write_sleep_us", value<UInt32>()->default_value(10), "sleep microseconds between write operators") //
("write_rows_per_block", value<UInt32>()->default_value(8), "number of rows per write(insert or update)") //
("read_concurrency", value<UInt32>()->default_value(20), "number of read thread") //
("read_sleep_us", value<UInt32>()->default_value(20), "sleep microseconds between read operations") //
("gen_total_rows", value<UInt64>()->default_value(100000000), "generate data total rows") //
("gen_rows_per_block", value<UInt32>()->default_value(128), "generate data rows per block") //
("gen_concurrency", value<UInt32>()->default_value(100), "number of generate thread") //
("table_name", value<String>()->default_value("stress2"), "Table name") //
("verify", value<bool>()->default_value(true), "Verify") //
("verify_sleep_sec", value<UInt32>()->default_value(120), "Verify sleep seconds") //
("failpoints,F", value<std::vector<std::string>>(), "failpoint(s) to enable") //
;

boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
Expand All @@ -47,14 +52,19 @@ StressOptions parseStressOptions(int argc, char * argv[])
stress_options.verify = options["verify"].as<bool>();
stress_options.verify_sleep_sec = options["verify_sleep_sec"].as<UInt32>();

if (options.count("failpoints"))
stress_options.failpoints = options["failpoints"].as<std::vector<std::string>>();

std::cout << " insert_concurrency: " << stress_options.insert_concurrency
<< " update_concurrency: " << stress_options.update_concurrency
<< " delete_concurrency: " << stress_options.delete_concurrency << " write_sleep_us: " << stress_options.write_sleep_us
<< " write_row_per_block: " << stress_options.write_rows_per_block << " read_concurrency: " << stress_options.read_concurrency
<< " read_sleep_us: " << stress_options.read_sleep_us << " gen_row_count: " << stress_options.gen_total_rows
<< " gen_row_per_block: " << stress_options.gen_rows_per_block << " gen_concurrency: " << stress_options.gen_concurrency
<< " table_name: " << stress_options.table_name << " verify: " << stress_options.verify
<< "verify_sleep_sec: " << stress_options.verify_sleep_sec << std::endl;
<< " verify_sleep_sec: " << stress_options.verify_sleep_sec //
<< fmt::format(" failpoints: [{}]", fmt::join(stress_options.failpoints.begin(), stress_options.failpoints.end(), ","))
<< std::endl;

return stress_options;
}
Expand All @@ -72,7 +82,11 @@ int main(int argc, char * argv[])
{
init();
auto opts = parseStressOptions(argc, argv);
auto log = &Poco::Logger::get("DMStressProxy");
for (const auto & fp : opts.failpoints)
{
DB::FailPointHelper::enableFailPoint(fp);
}
auto log = &Poco::Logger::get("DMStressProxy");
try
{
UInt64 run_count = 0;
Expand Down
Loading

0 comments on commit 543e252

Please sign in to comment.