diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index aa91b7988d..f04ac19294 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -77,6 +77,10 @@ info_collector::info_collector() "storage_size_fetch_interval_seconds", 3600, // default value 1h "storage size fetch interval seconds"); + _hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector", + "hotspot_detect_algorithm", + "hotspot_algo_qps_variance", + "hotspot_detect_algorithm"); // _storage_size_retry_wait_seconds is in range of [1, 60] _storage_size_retry_wait_seconds = std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10)); @@ -148,6 +152,9 @@ void info_collector::on_app_stat() // hotspot_calculator is to detect hotspots hotspot_calculator *hotspot_calculator = get_hotspot_calculator(app_rows.first, app_rows.second.size()); + if (!hotspot_calculator) { + continue; + } hotspot_calculator->aggregate(app_rows.second); // new policy can be designed by strategy pattern in hotspot_partition_data.h hotspot_calculator->start_alg(); @@ -285,9 +292,20 @@ hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &ap if (iter != _hotspot_calculator_store.end()) { return iter->second; } - hotspot_calculator *calculator_address = new hotspot_calculator(app_name, partition_num); - _hotspot_calculator_store[app_name] = calculator_address; - return calculator_address; + std::unique_ptr policy; + if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") { + policy.reset(new hotspot_algo_qps_variance()); + } else if (_hotspot_detect_algorithm == "hotspot_algo_qps_skew") { + policy.reset(new hotspot_algo_qps_skew()); + } else { + dwarn("hotspot detection is disabled"); + _hotspot_calculator_store[app_name] = nullptr; + return nullptr; + } + hotspot_calculator *calculator = + new hotspot_calculator(app_name, partition_num, std::move(policy)); + _hotspot_calculator_store[app_name] = calculator; + return calculator; } } // namespace server diff --git a/src/server/info_collector.h b/src/server/info_collector.h index e06d121dd0..a55fa1824a 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -132,6 +132,7 @@ class info_collector uint32_t _storage_size_fetch_interval_seconds; uint32_t _storage_size_retry_wait_seconds; uint32_t _storage_size_retry_max_count; + std::string _hotspot_detect_algorithm; ::dsn::task_ptr _storage_size_stat_timer_task; ::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock; // mapping 'node address' --> 'last updated timestamp' diff --git a/src/server/table_hotspot_policy.h b/src/server/table_hotspot_policy.h index 87881ddca0..2eff0229bc 100644 --- a/src/server/table_hotspot_policy.h +++ b/src/server/table_hotspot_policy.h @@ -8,6 +8,8 @@ #include #include +#include + #include namespace pegasus { @@ -20,14 +22,14 @@ class hotspot_policy // vector is used to save the partitions' data of this app // hotspot_partition_data is used to save data of one partition virtual void analysis(const std::queue> &hotspot_app_data, - std::vector<::dsn::perf_counter_wrapper> &hot_points) = 0; + std::vector<::dsn::perf_counter_wrapper> &perf_counters) = 0; }; class hotspot_algo_qps_skew : public hotspot_policy { public: void analysis(const std::queue> &hotspot_app_data, - std::vector<::dsn::perf_counter_wrapper> &hot_points) + std::vector<::dsn::perf_counter_wrapper> &perf_counters) { const auto &anly_data = hotspot_app_data.back(); double min_total_qps = INT_MAX; @@ -35,9 +37,56 @@ class hotspot_algo_qps_skew : public hotspot_policy min_total_qps = std::min(min_total_qps, partition_anly_data.total_qps); } min_total_qps = std::max(1.0, min_total_qps); - dassert(anly_data.size() == hot_points.size(), "partition counts error, please check"); - for (int i = 0; i < hot_points.size(); i++) { - hot_points[i]->set(anly_data[i].total_qps / min_total_qps); + dassert(anly_data.size() == perf_counters.size(), "partition counts error, please check"); + for (int i = 0; i < perf_counters.size(); i++) { + perf_counters[i]->set(anly_data[i].total_qps / min_total_qps); + } + } +}; + +// PauTa Criterion +class hotspot_algo_qps_variance : public hotspot_policy +{ +public: + void analysis(const std::queue> &hotspot_app_data, + std::vector<::dsn::perf_counter_wrapper> &perf_counters) + { + dassert(hotspot_app_data.back().size() == perf_counters.size(), + "partition counts error, please check"); + std::vector data_samples; + data_samples.reserve(hotspot_app_data.size() * perf_counters.size()); + auto temp_data = hotspot_app_data; + double total = 0, sd = 0, avg = 0; + int sample_count = 0; + // avg: Average number + // sd: Standard deviation + // sample_count: Number of samples + while (!temp_data.empty()) { + for (auto partition_data : temp_data.front()) { + if (partition_data.total_qps - 1.00 > 0) { + data_samples.push_back(partition_data.total_qps); + total += partition_data.total_qps; + sample_count++; + } + } + temp_data.pop(); + } + if (sample_count == 0) { + ddebug("hotspot_app_data size == 0"); + return; + } + avg = total / sample_count; + for (auto data_sample : data_samples) { + sd += pow((data_sample - avg), 2); + } + sd = sqrt(sd / sample_count); + const auto &anly_data = hotspot_app_data.back(); + for (int i = 0; i < perf_counters.size(); i++) { + double hot_point = (anly_data[i].total_qps - avg) / sd; + // perf_counter->set can only be unsigned __int64 + // use ceil to guarantee conversion results + hot_point = ceil(std::max(hot_point, double(0))); + perf_counters[i]->set(hot_point); } } }; @@ -46,8 +95,10 @@ class hotspot_algo_qps_skew : public hotspot_policy class hotspot_calculator { public: - hotspot_calculator(const std::string &app_name, const int partition_num) - : _app_name(app_name), _points(partition_num), _policy(new hotspot_algo_qps_skew()) + hotspot_calculator(const std::string &app_name, + const int partition_num, + std::unique_ptr policy) + : _app_name(app_name), _points(partition_num), _policy(std::move(policy)) { init_perf_counter(partition_num); } @@ -62,6 +113,7 @@ class hotspot_calculator std::unique_ptr _policy; static const int kMaxQueueSize = 100; + FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_variance); FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_skew); }; } // namespace server diff --git a/src/server/test/pegasus_tablehotspot_test.cpp b/src/server/test/pegasus_tablehotspot_test.cpp index 5cbe6dd015..cc6ee83a41 100644 --- a/src/server/test/pegasus_tablehotspot_test.cpp +++ b/src/server/test/pegasus_tablehotspot_test.cpp @@ -14,7 +14,8 @@ TEST(table_hotspot_policy, hotspot_algo_qps_skew) std::vector test_rows(2); test_rows[0].get_qps = 1234.0; test_rows[1].get_qps = 4321.0; - hotspot_calculator test_hotspot_calculator("TEST", 2); + std::unique_ptr policy(new hotspot_algo_qps_skew()); + hotspot_calculator test_hotspot_calculator("TEST", 2, std::move(policy)); test_hotspot_calculator.aggregate(test_rows); test_hotspot_calculator.start_alg(); std::vector result(2); @@ -25,5 +26,28 @@ TEST(table_hotspot_policy, hotspot_algo_qps_skew) ASSERT_EQ(expect_vector, result); } +TEST(table_hotspot_policy, hotspot_algo_qps_variance) +{ + std::vector test_rows(8); + test_rows[0].get_qps = 1000.0; + test_rows[1].get_qps = 1000.0; + test_rows[2].get_qps = 1000.0; + test_rows[3].get_qps = 1000.0; + test_rows[4].get_qps = 1000.0; + test_rows[5].get_qps = 1000.0; + test_rows[6].get_qps = 1000.0; + test_rows[7].get_qps = 5000.0; + std::unique_ptr policy(new hotspot_algo_qps_variance()); + hotspot_calculator test_hotspot_calculator("TEST", 8, std::move(policy)); + test_hotspot_calculator.aggregate(test_rows); + test_hotspot_calculator.start_alg(); + std::vector result(8); + for (int i = 0; i < test_hotspot_calculator._points.size(); i++) { + result[i] = test_hotspot_calculator._points[i]->get_value(); + } + std::vector expect_vector{0, 0, 0, 0, 0, 0, 0, 3}; + ASSERT_EQ(expect_vector, result); +} + } // namespace server } // namespace pegasus