diff --git a/be/src/http/action/compaction_score_action.cpp b/be/src/http/action/compaction_score_action.cpp new file mode 100644 index 00000000000000..10b8cc6bdbab04 --- /dev/null +++ b/be/src/http/action/compaction_score_action.cpp @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "http/action/compaction_score_action.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cloud/config.h" +#include "common/status.h" +#include "http/http_channel.h" +#include "http/http_handler_with_auth.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/tablet_fwd.h" +#include "olap/tablet_manager.h" +#include "util/stopwatch.hpp" + +namespace doris { + +const std::string TOP_N = "top_n"; +const std::string SYNC_META = "sync_meta"; +const std::string COMPACTION_SCORE = "compaction_score"; +constexpr size_t DEFAULT_TOP_N = std::numeric_limits::max(); +constexpr bool DEFAULT_SYNC_META = false; +constexpr std::string_view TABLET_ID = "tablet_id"; + +template +concept CompactionScoreAccessble = requires(T t) { + { t.get_real_compaction_score() } -> std::same_as; +}; + +template +std::vector calculate_compaction_scores( + std::span> tablets) { + std::vector result; + result.reserve(tablets.size()); + std::ranges::transform(tablets, std::back_inserter(result), + [](const std::shared_ptr& tablet) -> CompactionScoreResult { + return {.tablet_id = tablet->tablet_id(), + .compaction_score = tablet->get_real_compaction_score()}; + }); + return result; +} + +struct LocalCompactionScoreAccessor final : CompactionScoresAccessor { + LocalCompactionScoreAccessor(TabletManager* tablet_mgr) : tablet_mgr(tablet_mgr) {} + + std::vector get_all_tablet_compaction_scores() override { + auto tablets = tablet_mgr->get_all_tablet(); + std::span s = {tablets.begin(), tablets.end()}; + return calculate_compaction_scores(s); + } + + TabletManager* tablet_mgr; +}; + +struct CloudCompactionScoresAccessor final : CompactionScoresAccessor { + CloudCompactionScoresAccessor(CloudTabletMgr& tablet_mgr) : tablet_mgr(tablet_mgr) {} + + std::vector get_all_tablet_compaction_scores() override { + auto tablets = get_all_tablets(); + std::span s = {tablets.begin(), tablets.end()}; + return calculate_compaction_scores(s); + } + + Status sync_meta() { + auto tablets = get_all_tablets(); + LOG(INFO) << "start to sync meta from ms"; + + MonotonicStopWatch stopwatch; + stopwatch.start(); + + for (const auto& tablet : tablets) { + RETURN_IF_ERROR(tablet->sync_meta()); + RETURN_IF_ERROR(tablet->sync_rowsets()); + } + + stopwatch.stop(); + LOG(INFO) << "sync meta finish, time=" << stopwatch.elapsed_time() << "ns"; + + return Status::OK(); + } + + std::vector get_all_tablets() { + auto weak_tablets = tablet_mgr.get_weak_tablets(); + std::vector tablets; + tablets.reserve(weak_tablets.size()); + for (auto& weak_tablet : weak_tablets) { + if (auto tablet = weak_tablet.lock(); + tablet != nullptr and tablet->tablet_state() == TABLET_RUNNING) { + tablets.push_back(std::move(tablet)); + } + } + return tablets; + } + + CloudTabletMgr& tablet_mgr; +}; + +static rapidjson::Value jsonfy_tablet_compaction_score( + const CompactionScoreResult& result, rapidjson::MemoryPoolAllocator<>& allocator) { + rapidjson::Value node; + node.SetObject(); + + rapidjson::Value tablet_id_key; + tablet_id_key.SetString(TABLET_ID.data(), TABLET_ID.length(), allocator); + rapidjson::Value tablet_id_val; + auto tablet_id_str = std::to_string(result.tablet_id); + tablet_id_val.SetString(tablet_id_str.c_str(), tablet_id_str.length(), allocator); + + rapidjson::Value score_key; + score_key.SetString(COMPACTION_SCORE.data(), COMPACTION_SCORE.size()); + rapidjson::Value score_val; + auto score_str = std::to_string(result.compaction_score); + score_val.SetString(score_str.c_str(), score_str.length(), allocator); + node.AddMember(score_key, score_val, allocator); + + node.AddMember(tablet_id_key, tablet_id_val, allocator); + return node; +} + +CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, TabletManager* tablet_mgr) + : HttpHandlerWithAuth(exec_env, hier, type), + _accessor(std::make_unique(tablet_mgr)) {} + +CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, CloudTabletMgr& tablet_mgr) + : HttpHandlerWithAuth(exec_env, hier, type), + _accessor(std::make_unique(tablet_mgr)) {} + +void CompactionScoreAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data()); + auto top_n_param = req->param(TOP_N); + + size_t top_n = DEFAULT_TOP_N; + if (!top_n_param.empty()) { + try { + auto tmp_top_n = std::stoll(top_n_param); + if (tmp_top_n < 0) { + throw std::invalid_argument("`top_n` cannot less than 0"); + } + top_n = tmp_top_n; + } catch (const std::exception& e) { + LOG(WARNING) << "convert failed:" << e.what(); + auto msg = fmt::format("invalid argument: top_n={}", top_n_param); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + } + + auto sync_meta_param = req->param(SYNC_META); + bool sync_meta = DEFAULT_SYNC_META; + if (!sync_meta_param.empty() and !config::is_cloud_mode()) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + "param `sync_meta` is only available for cloud mode"); + return; + } + if (sync_meta_param == "true") { + sync_meta = true; + } else if (sync_meta_param == "false") { + sync_meta = false; + } else if (!sync_meta_param.empty()) { + auto msg = fmt::format("invalid argument: sync_meta={}", sync_meta_param); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + + std::string result; + if (auto st = _handle(top_n, sync_meta, &result); !st) { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json()); + return; + } + HttpChannel::send_reply(req, HttpStatus::OK, result); +} + +Status CompactionScoreAction::_handle(size_t top_n, bool sync_meta, std::string* result) { + if (sync_meta) { + DCHECK(config::is_cloud_mode()); + RETURN_IF_ERROR(static_cast(_accessor.get())->sync_meta()); + } + + auto scores = _accessor->get_all_tablet_compaction_scores(); + top_n = std::min(top_n, scores.size()); + std::partial_sort(scores.begin(), scores.begin() + top_n, scores.end(), std::greater<>()); + + rapidjson::Document root; + root.SetArray(); + auto& allocator = root.GetAllocator(); + std::for_each(scores.begin(), scores.begin() + top_n, [&](const auto& score) { + root.PushBack(jsonfy_tablet_compaction_score(score, allocator), allocator); + }); + rapidjson::StringBuffer str_buf; + rapidjson::PrettyWriter writer(str_buf); + root.Accept(writer); + *result = str_buf.GetString(); + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/http/action/compaction_score_action.h b/be/src/http/action/compaction_score_action.h new file mode 100644 index 00000000000000..1c345a4ae24c65 --- /dev/null +++ b/be/src/http/action/compaction_score_action.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include + +#include "cloud/cloud_tablet_mgr.h" +#include "common/status.h" +#include "http/http_handler_with_auth.h" +#include "http/http_request.h" +#include "olap/storage_engine.h" +#include "runtime/exec_env.h" +namespace doris { + +struct CompactionScoreResult { + int64_t tablet_id; + size_t compaction_score; +}; + +inline bool operator>(const CompactionScoreResult& lhs, const CompactionScoreResult& rhs) { + return lhs.compaction_score > rhs.compaction_score; +} + +struct CompactionScoresAccessor { + virtual ~CompactionScoresAccessor() = default; + + virtual std::vector get_all_tablet_compaction_scores() = 0; +}; + +// topn, sync +class CompactionScoreAction : public HttpHandlerWithAuth { +public: + explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, TabletManager* tablet_mgr); + + explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, CloudTabletMgr& tablet_mgr); + + void handle(HttpRequest* req) override; + +private: + Status _handle(size_t top_n, bool sync_meta, std::string* result); + + std::unique_ptr _accessor; +}; + +} // namespace doris diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 143c1ad706bbe7..1fd3b785b9072f 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -29,6 +29,7 @@ #include "olap/rowid_conversion.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_reader.h" #include "olap/tablet_fwd.h" #include "olap/txn_manager.h" @@ -182,6 +183,14 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_ return Status::OK(); } +uint32_t BaseTablet::get_real_compaction_score() const { + const auto& rs_metas = _tablet_meta->all_rs_metas(); + return std::accumulate(rs_metas.begin(), rs_metas.end(), 0, + [](uint32_t score, const RowsetMetaSharedPtr& rs_meta) { + return score + rs_meta->get_compaction_score(); + }); +} + Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path, std::vector* rs_splits) const { DCHECK(rs_splits != nullptr && rs_splits->empty()); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index cfaf536902e03e..943f815581809a 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -105,6 +105,10 @@ class BaseTablet { virtual size_t tablet_footprint() = 0; + // this method just return the compaction sum on each rowset + // note(tsy): we should unify the compaction score calculation finally + uint32_t get_real_compaction_score() const; + // MUST hold shared meta lock Status capture_rs_readers_unlocked(const Versions& version_path, std::vector* rs_splits) const; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index da7a4ec8a6e260..66278afdb666ee 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1023,6 +1023,9 @@ uint32_t Tablet::calc_cold_data_compaction_score() const { uint32_t Tablet::_calc_cumulative_compaction_score( std::shared_ptr cumulative_compaction_policy) { + if (cumulative_compaction_policy == nullptr) [[unlikely]] { + return 0; + } #ifndef BE_TEST if (_cumulative_compaction_policy == nullptr || _cumulative_compaction_policy->name() != cumulative_compaction_policy->name()) { diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 9f98a86bda4c98..f2c325bebc7806 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -37,6 +38,7 @@ #include "http/action/checksum_action.h" #include "http/action/clear_cache_action.h" #include "http/action/compaction_action.h" +#include "http/action/compaction_score_action.h" #include "http/action/config_action.h" #include "http/action/debug_point_action.h" #include "http/action/download_action.h" @@ -381,6 +383,11 @@ void HttpService::register_local_handler(StorageEngine& engine) { new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file", show_nested_index_file_action); + + CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction( + _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_manager())); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", + compaction_score_action); } void HttpService::register_cloud_handler(CloudStorageEngine& engine) { @@ -417,6 +424,10 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file", show_nested_index_file_action); + CompactionScoreAction* compaction_score_action = _pool.add(new CompactionScoreAction( + _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_mgr())); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", + compaction_score_action); } // NOLINTEND(readability-function-size) diff --git a/regression-test/suites/compaction/test_compaction_score_action.groovy b/regression-test/suites/compaction/test_compaction_score_action.groovy new file mode 100644 index 00000000000000..9ab8743778fb10 --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_score_action.groovy @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compaction_score_action") { + def tableName = "test_compaction_score_action"; + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id INT NOT NULL, + name STRING NOT NULL + ) DUPLICATE KEY (`id`) + PROPERTIES ("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + for (i in 0..<30) { + sql """ INSERT INTO ${tableName} VALUES(1, "Vedal") """ + sql """ INSERT INTO ${tableName} VALUES(2, "Neuro") """ + sql """ INSERT INTO ${tableName} VALUES(3, "Evil") """ + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (int i=0;i= 90) + } else { + def (code, text, err) = curl("GET",beHttpAddress+"/api/compaction_score?top_n=1") + def score_str = parseJson(text).get(0).get("compaction_score") + def score = Integer.parseInt(score_str) + assertTrue(score >= 90) + } + } +}