diff --git a/dbms/src/Common/getNumberOfLogicalCPUCores.cpp b/dbms/src/Common/getNumberOfLogicalCPUCores.cpp new file mode 100644 index 00000000000..16854909636 --- /dev/null +++ b/dbms/src/Common/getNumberOfLogicalCPUCores.cpp @@ -0,0 +1,34 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace CPUCores +{ +UInt16 number_of_logical_cpu_cores = std::thread::hardware_concurrency(); +} // namespace CPUCores + + +UInt16 getNumberOfLogicalCPUCores() +{ + return CPUCores::number_of_logical_cpu_cores; +} + +// We should call this function before Context has been created, +// which will call `getNumberOfLogicalCPUCores`, or we can not +// set cpu cores any more. +void setNumberOfLogicalCPUCores(UInt16 max_logical_cpu_cores) +{ + CPUCores::number_of_logical_cpu_cores = max_logical_cpu_cores; +} diff --git a/dbms/src/Common/getNumberOfLogicalCPUCores.h b/dbms/src/Common/getNumberOfLogicalCPUCores.h new file mode 100644 index 00000000000..ec7a9fadaf9 --- /dev/null +++ b/dbms/src/Common/getNumberOfLogicalCPUCores.h @@ -0,0 +1,17 @@ +#pragma once + +<<<<<<< HEAD:dbms/src/Common/getNumberOfPhysicalCPUCores.h +/// Get number of CPU cores without hyper-threading. +unsigned getNumberOfPhysicalCPUCores(); +======= +#include + +#include + +UInt16 getNumberOfLogicalCPUCores(); + +// We should call this function before Context has been created, +// which will call `getNumberOfLogicalCPUCores`, or we can not +// set cpu cores any more. +void setNumberOfLogicalCPUCores(UInt16 max_logical_cpu_cores); +>>>>>>> 966e7e228e (Get correct cpu cores in k8s pod (#6430)):dbms/src/Common/getNumberOfLogicalCPUCores.h diff --git a/dbms/src/Common/getNumberOfPhysicalCPUCores.h b/dbms/src/Common/getNumberOfPhysicalCPUCores.h deleted file mode 100644 index 827e95e1bea..00000000000 --- a/dbms/src/Common/getNumberOfPhysicalCPUCores.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once - -/// Get number of CPU cores without hyper-threading. -unsigned getNumberOfPhysicalCPUCores(); diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index da76211b4ab..900cb9eaf05 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -1,5 +1,10 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +>>>>>>> 966e7e228e (Get correct cpu cores in k8s pod (#6430)) #include #include #include @@ -31,8 +36,22 @@ FlashService::FlashService(IServer & server_) security_config(server_.securityConfig()), log(&Logger::get("FlashService")) { +<<<<<<< HEAD auto settings = server_.context().getSettingsRef(); const size_t default_size = 2 * getNumberOfPhysicalCPUCores(); +======= + security_config = &security_config_; + context = &context_; + log = &Poco::Logger::get("FlashService"); + manual_compact_manager = std::make_unique( + context->getGlobalContext(), + context->getGlobalContext().getSettingsRef()); + + auto settings = context->getSettingsRef(); + enable_local_tunnel = settings.enable_local_tunnel; + enable_async_grpc_client = settings.enable_async_grpc_client; + const size_t default_size = getNumberOfLogicalCPUCores(); +>>>>>>> 966e7e228e (Get correct cpu cores in k8s pod (#6430)) size_t cop_pool_size = static_cast(settings.cop_pool_size); cop_pool_size = cop_pool_size ? cop_pool_size : default_size; diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp new file mode 100644 index 00000000000..8107a9e3712 --- /dev/null +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -0,0 +1,269 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ +namespace FailPoints +{ +extern const char random_min_tso_scheduler_failpoint[]; +} // namespace FailPoints + +constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); +constexpr UInt64 OS_THREAD_SOFT_LIMIT = 100000; + +MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 active_set_soft_limit_) + : min_tso(MAX_UINT64) + , thread_soft_limit(soft_limit) + , thread_hard_limit(hard_limit) + , estimated_thread_usage(0) + , active_set_soft_limit(active_set_soft_limit_) + , log(Logger::get()) +{ + auto cores = static_cast(getNumberOfLogicalCPUCores() / 2); + if (active_set_soft_limit == 0 || active_set_soft_limit > 10 * cores) + { + /// set active_set_soft_limit to a reasonable value + active_set_soft_limit = (cores + 2) / 2; /// at least 1 + } + if (isDisabled()) + { + LOG_INFO(log, "MinTSOScheduler is disabled!"); + } + else + { + if (thread_hard_limit <= thread_soft_limit || thread_hard_limit > OS_THREAD_SOFT_LIMIT) /// the general soft limit of OS threads is no more than 100000. + { + thread_hard_limit = 10000; + thread_soft_limit = 5000; + LOG_INFO(log, "hard limit {} should > soft limit {} and under maximum {}, so MinTSOScheduler set them as {}, {} by default, and active_set_soft_limit is {}.", hard_limit, soft_limit, OS_THREAD_SOFT_LIMIT, thread_hard_limit, thread_soft_limit, active_set_soft_limit); + } + else + { + LOG_INFO(log, "thread_hard_limit is {}, thread_soft_limit is {}, and active_set_soft_limit is {} in MinTSOScheduler.", thread_hard_limit, thread_soft_limit, active_set_soft_limit); + } + GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_tso); + GET_METRIC(tiflash_task_scheduler, type_thread_soft_limit).Set(thread_soft_limit); + GET_METRIC(tiflash_task_scheduler, type_thread_hard_limit).Set(thread_hard_limit); + GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); + GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(0); + GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(0); + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Set(0); + GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Set(0); + GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Set(0); + } +} + +bool MinTSOScheduler::tryToSchedule(MPPTaskScheduleEntry & schedule_entry, MPPTaskManager & task_manager) +{ + /// check whether this schedule is disabled or not + if (isDisabled()) + { + return true; + } + const auto & id = schedule_entry.getMPPTaskId(); + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.start_ts); + if (nullptr == query_task_set || !query_task_set->isInNormalState()) + { + LOG_WARNING(log, "{} is scheduled with miss or abort.", id.toString()); + return true; + } + bool has_error = false; + return scheduleImp(id.start_ts, query_task_set, schedule_entry, false, has_error); +} + +/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled. +/// the cancelled query maybe hang, so trigger scheduling as needed when deleting cancelled query. +void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled) +{ + if (isDisabled()) + { + return; + } + + LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size()); + active_set.erase(tso); + waiting_set.erase(tso); + GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); + GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); + + if (is_cancelled) /// cancelled queries may have waiting tasks, and finished queries haven't. + { + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso); + if (query_task_set) /// release all waiting tasks + { + while (!query_task_set->waiting_tasks.empty()) + { + auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front()); + if (task_it != query_task_set->task_map.end() && task_it->second != nullptr) + task_it->second->scheduleThisTask(ScheduleState::FAILED); + query_task_set->waiting_tasks.pop(); + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); + } + } + } + + /// NOTE: if updated min_tso query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon. + if (updateMinTSO(tso, true, is_cancelled ? "when cancelling it" : "as finishing it")) + { + scheduleWaitingQueries(task_manager); + } +} + +/// NOTE: should not throw exceptions due to being called when destruction. +void MinTSOScheduler::releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager) +{ + if (isDisabled()) + { + return; + } + + auto updated_estimated_threads = static_cast(estimated_thread_usage) - needed_threads; + RUNTIME_ASSERT(updated_estimated_threads >= 0, log, "estimated_thread_usage should not be smaller than 0, actually is {}.", updated_estimated_threads); + + estimated_thread_usage = updated_estimated_threads; + GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); + GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Decrement(); + /// as tasks release some threads, so some tasks would get scheduled. + scheduleWaitingQueries(task_manager); +} + +void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) +{ + /// schedule new tasks + while (!waiting_set.empty()) + { + auto current_query_id = *waiting_set.begin(); + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(current_query_id); + if (nullptr == query_task_set) /// silently solve this rare case + { + LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id); + updateMinTSO(current_query_id, true, "as it is not in the task manager."); + active_set.erase(current_query_id); + waiting_set.erase(current_query_id); + GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); + GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); + continue; + } + + LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_tso, query_task_set->waiting_tasks.size(), waiting_set.size()); + /// schedule tasks one by one + while (!query_task_set->waiting_tasks.empty()) + { + auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front()); + bool has_error = false; + if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second->getScheduleEntry(), true, has_error)) + { + if (has_error) + { + query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors. + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); + } + return; + } + query_task_set->waiting_tasks.pop(); + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); + } + LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_tso, waiting_set.size()); + waiting_set.erase(current_query_id); /// all waiting tasks of this query are fully active + GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); + } +} + +/// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases. +bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) +{ + auto needed_threads = schedule_entry.getNeededThreads(); + auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit; + auto check_for_not_min_tso = (active_set.size() < active_set_soft_limit || tso <= *active_set.rbegin()) && (estimated_thread_usage + needed_threads <= thread_soft_limit); + if (check_for_new_min_tso || check_for_not_min_tso) + { + updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it"); + active_set.insert(tso); + if (schedule_entry.schedule(ScheduleState::SCHEDULED)) + { + estimated_thread_usage += needed_threads; + GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment(); + } + GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); + GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); + LOG_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", schedule_entry.getMPPTaskId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit); + return true; + } + else + { + bool is_tso_min = tso <= min_tso; + fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_tso_min = true;); + if (is_tso_min) /// the min_tso query should fully run, otherwise throw errors here. + { + has_error = true; + auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); + LOG_ERROR(log, "{}", msg); + GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); + if (isWaiting) + { + /// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this tso and update metrics. + schedule_entry.schedule(ScheduleState::EXCEEDED); + waiting_set.erase(tso); /// avoid the left waiting tasks of this query reaching here many times. + } + else + { + throw Exception(msg); + } + return false; + } + if (!isWaiting) + { + waiting_set.insert(tso); + query_task_set->waiting_tasks.push(schedule_entry.getMPPTaskId()); + GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); + } + LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", tso, active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); + return false; + } +} + +/// if return true, then need to schedule the waiting tasks of the min_tso. +bool MinTSOScheduler::updateMinTSO(const UInt64 tso, const bool retired, const String & msg) +{ + auto old_min_tso = min_tso; + bool force_scheduling = false; + if (retired) + { + if (tso == min_tso) /// elect a new min_tso from all queries. + { + min_tso = active_set.empty() ? MAX_UINT64 : *active_set.begin(); + min_tso = waiting_set.empty() ? min_tso : std::min(*waiting_set.begin(), min_tso); + force_scheduling = waiting_set.find(min_tso) != waiting_set.end(); /// if this min_tso has waiting tasks, these tasks should force being scheduled. + } + } + else + { + min_tso = std::min(tso, min_tso); + } + if (min_tso != old_min_tso) /// if min_tso == MAX_UINT64 and the query tso is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. + { + GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_tso); + LOG_INFO(log, "min_tso query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_tso, min_tso, msg, estimated_thread_usage, active_set.size(), waiting_set.size()); + } + return force_scheduling; +} + +} // namespace DB diff --git a/dbms/src/Interpreters/SettingsCommon.h b/dbms/src/Interpreters/SettingsCommon.h index c9a0632bdd2..014a1307cad 100644 --- a/dbms/src/Interpreters/SettingsCommon.h +++ b/dbms/src/Interpreters/SettingsCommon.h @@ -1,5 +1,11 @@ #pragma once +<<<<<<< HEAD +======= +#include +#include +#include +>>>>>>> 966e7e228e (Get correct cpu cores in k8s pod (#6430)) #include #include #include @@ -12,7 +18,6 @@ #include #include - namespace DB { @@ -148,17 +153,27 @@ struct SettingMaxThreads is_auto = true; } - UInt64 getAutoValue() const + static UInt64 getAutoValue() { - static auto res = getAutoValueImpl(); + static auto res = getNumberOfLogicalCPUCores(); return res; } +<<<<<<< HEAD /// Executed once for all time. Executed from one thread. UInt64 getAutoValueImpl() const { return getNumberOfPhysicalCPUCores(); } +======= + UInt64 get() const + { + return value; + } + +private: + UInt64 value; +>>>>>>> 966e7e228e (Get correct cpu cores in k8s pod (#6430)) }; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index bbbac8eb229..4effef63397 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -909,7 +909,28 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "tiflash proxy thread is joined"); }); +<<<<<<< HEAD CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get()); +======= + /// get CPU/memory/disk info of this server + if (tiflash_instance_wrap.proxy_helper) + { + diagnosticspb::ServerInfoRequest request; + request.set_tp(static_cast(1)); + diagnosticspb::ServerInfoResponse response; + std::string req = request.SerializeAsString(); + auto * helper = tiflash_instance_wrap.proxy_helper; + helper->fn_server_info(helper->proxy_ptr, strIntoView(&req), &response); + server_info.parseSysInfo(response); + setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores); + LOG_INFO(log, "ServerInfo: {}", server_info.debugString()); + } + else + { + setNumberOfLogicalCPUCores(std::thread::hardware_concurrency()); + LOG_INFO(log, "TiFlashRaftProxyHelper is null, failed to get server info"); + } +>>>>>>> 966e7e228e (Get correct cpu cores in k8s pod (#6430)) // print necessary grpc log. grpc_log = &Logger::get("grpc"); diff --git a/dbms/src/Server/ServerInfo.h b/dbms/src/Server/ServerInfo.h new file mode 100644 index 00000000000..d9731e37bb8 --- /dev/null +++ b/dbms/src/Server/ServerInfo.h @@ -0,0 +1,98 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#ifdef __clang__ +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#endif +#include +#pragma GCC diagnostic pop + +namespace DB +{ +class ServerInfo +{ +public: + struct CPUInfo + { + /// number of logical CPU cores + UInt16 logical_cores = std::thread::hardware_concurrency(); + /// number of physical CPU cores + UInt16 physical_cores = std::thread::hardware_concurrency() / 2; + /// number of L1 cache size + /// units: Byte + UInt32 l1_cache_size = 16384; // 16KB (typical value) + /// number of L2 cache size + /// units: Byte + UInt32 l2_cache_size = 65536; // 64KB (typical value) + /// number of L3 cache size + /// units: Byte + UInt32 l3_cache_size = 2097152; // 2MB (typical value) + /// number of L1 cache line size + UInt8 l1_cache_line_size = 64; // 64B (typical value) + /// number of L2 cache line size + UInt8 l2_cache_line_size = 64; // 64B (typical value) + /// number of L3 cache line size + UInt8 l3_cache_line_size = 64; // 64B (typical value) + /// CPU architecture + String arch; + /// CPU frequency + String frequency; + }; + + struct Disk + { + String name; + enum DiskType + { + UNKNOWN = 0, + HDD = 1, + SSD = 2 + }; + DiskType disk_type; + UInt64 total_space = 0; + UInt64 free_space = 0; + String mount_point; + String fs_type; + }; + using DiskInfo = std::vector; + + struct MemoryInfo + { + /// total memory size + /// units: Byte + UInt64 capacity = getMemoryAmount(); + }; + + ServerInfo() = default; + ~ServerInfo() = default; + void parseCPUInfo(const diagnosticspb::ServerInfoItem & cpu_info_item); + void parseDiskInfo(const diagnosticspb::ServerInfoItem & disk_info_item); + void parseMemoryInfo(const diagnosticspb::ServerInfoItem & memory_info_item); + void parseSysInfo(const diagnosticspb::ServerInfoResponse & sys_info_response); + String debugString() const; + + CPUInfo cpu_info; + DiskInfo disk_infos; + MemoryInfo memory_info; +}; +} // namespace DB