Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get correct cpu cores in k8s pod (#6430) #6449

Merged
74 changes: 74 additions & 0 deletions dbms/src/Common/getNumberOfLogicalCPUCores.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Common/getNumberOfLogicalCPUCores.h>
#include <common/logger_useful.h>
#include <common/types.h>

namespace CPUCores
{
UInt16 number_of_logical_cpu_cores = std::thread::hardware_concurrency();
UInt16 number_of_physical_cpu_cores = std::thread::hardware_concurrency() / 2;
} // namespace CPUCores


UInt16 getNumberOfLogicalCPUCores()
{
return CPUCores::number_of_logical_cpu_cores;
}

UInt16 getNumberOfPhysicalCPUCores()
{
return CPUCores::number_of_physical_cpu_cores;
}

// We should call this function before Context has been created,
// which will call `getNumberOfLogicalCPUCores`, or we can not
// set cpu cores any more.
void setNumberOfLogicalCPUCores(UInt16 number_of_logical_cpu_cores_)
{
CPUCores::number_of_logical_cpu_cores = number_of_logical_cpu_cores_;
}

void computeAndSetNumberOfPhysicalCPUCores(UInt16 number_of_logical_cpu_cores_, UInt16 number_of_hardware_physical_cores)
{
// First of all, we need to take consideration of two situation:
// 1. tiflash on physical machine.
// In old codes, tiflash needs to set max_threads which is equal to
// physical cpu cores, so we need to ensure this behavior is not broken.
// 2. tiflash on virtual environment.
// In virtual environment, when setting max_threads, we can't directly
// get physical cpu cores to set this variable because only host machine's
// physical cpu core can be reached. So, number of physical cpus cores can
// only be assigned by calculated with logical cpu cores.
//
// - `number_of_logical_cpu_cores_` which represents how many logical cpu cores a tiflash could use(no matter in physical or virtual environment) is assigned from ServerInfo.
// - `hardware_logical_cpu_cores` represents how many logical cpu cores the host physical machine has.
// - `number_of_hardware_physical_cores` represents how many physical cpu cores the host physical machine has.
// - `(hardware_logical_cpu_cores / number_of_hardware_physical_cores)` means how many logical cpu core a physical cpu core has.
// - `number_of_logical_cpu_cores_ / (hardware_logical_cpu_cores / number_of_hardware_physical_cores)` means how many physical cpu cores the tiflash process could use. (Actually, it's needless to get physical cpu cores in virtual environment, but we must ensure the behavior `1` is not broken)
auto hardware_logical_cpu_cores = std::thread::hardware_concurrency();
UInt16 physical_cpu_cores = number_of_logical_cpu_cores_ / (hardware_logical_cpu_cores / number_of_hardware_physical_cores);
CPUCores::number_of_physical_cpu_cores = physical_cpu_cores > 0 ? physical_cpu_cores : 1;
auto log = DB::Logger::get("CPUCores");
LOG_INFO(
log,
"logical cpu cores: {}, hardware logical cpu cores: {}, hardware physical cpu cores: {}, physical cpu cores: {}, number_of_physical_cpu_cores: {}",
number_of_logical_cpu_cores_,
hardware_logical_cpu_cores,
number_of_hardware_physical_cores,
physical_cpu_cores,
CPUCores::number_of_physical_cpu_cores);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,16 @@

#pragma once

/// Get number of CPU cores without hyper-threading.
unsigned getNumberOfPhysicalCPUCores();
#include <common/types.h>

#include <thread>

UInt16 getNumberOfLogicalCPUCores();
UInt16 getNumberOfPhysicalCPUCores();

// We should call this function before Context has been created,
// which will call `getNumberOfLogicalCPUCores`, or we can not
// set cpu cores any more.
void setNumberOfLogicalCPUCores(UInt16 number_of_logical_cpu_cores_);

void computeAndSetNumberOfPhysicalCPUCores(UInt16 number_of_logical_cpu_cores, UInt16 number_of_hardware_physical_cores);
65 changes: 0 additions & 65 deletions dbms/src/Common/getNumberOfPhysicalCPUCores.cpp

This file was deleted.

4 changes: 3 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Common/Stopwatch.h>
#include <Common/ThreadMetricUtil.h>
#include <Common/TiFlashMetrics.h>
#include <Common/VariantOp.h>
#include <Common/getNumberOfLogicalCPUCores.h>
#include <Common/setThreadName.h>
#include <Core/Types.h>
#include <Flash/BatchCoprocessorHandler.h>
Expand Down Expand Up @@ -55,7 +57,7 @@ FlashService::FlashService(IServer & server_)
auto settings = server_.context().getSettingsRef();
enable_local_tunnel = settings.enable_local_tunnel;
enable_async_grpc_client = settings.enable_async_grpc_client;
const size_t default_size = 2 * getNumberOfPhysicalCPUCores();
const size_t default_size = getNumberOfLogicalCPUCores();

auto cop_pool_size = static_cast<size_t>(settings.cop_pool_size);
cop_pool_size = cop_pool_size ? cop_pool_size : default_size;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Common/getNumberOfLogicalCPUCores.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Flash/Mpp/MinTSOScheduler.h>

Expand All @@ -28,8 +29,12 @@ MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit)
, estimated_thread_usage(0)
, log(&Poco::Logger::get("MinTSOScheduler"))
{
auto cores = getNumberOfPhysicalCPUCores();
active_set_soft_limit = (cores + 2) / 2; /// at least 1
auto cores = static_cast<size_t>(getNumberOfLogicalCPUCores() / 2);
if (active_set_soft_limit == 0 || active_set_soft_limit > 10 * cores)
{
/// set active_set_soft_limit to a reasonable value
active_set_soft_limit = (cores + 2) / 2; /// at least 1
}
if (isDisabled())
{
LOG_FMT_INFO(log, "MinTSOScheduler is disabled!");
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Interpreters/SettingsCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Common/Checksum.h>
#include <Common/FieldVisitors.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfLogicalCPUCores.h>
#include <Core/Field.h>
#include <DataStreams/SizeLimits.h>
#include <IO/CompressedStream.h>
Expand All @@ -26,7 +26,6 @@
#include <Poco/String.h>
#include <Poco/Timespan.h>


namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -166,18 +165,12 @@ struct SettingMaxThreads
is_auto = true;
}

UInt64 getAutoValue() const
static UInt64 getAutoValue()
{
static auto res = getAutoValueImpl();
static auto res = getNumberOfLogicalCPUCores();
return res;
}

/// Executed once for all time. Executed from one thread.
UInt64 getAutoValueImpl() const
{
return getNumberOfPhysicalCPUCores();
}

UInt64 get() const
{
return value;
Expand Down
23 changes: 21 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include <Common/formatReadable.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getNumberOfLogicalCPUCores.h>
#include <Common/setThreadName.h>
#include <Encryption/DataKeyManager.h>
#include <Encryption/FileProvider.h>
Expand Down Expand Up @@ -1049,7 +1049,26 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_FMT_INFO(log, "tiflash proxy thread is joined");
});

CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get());
/// get CPU/memory/disk info of this server
if (tiflash_instance_wrap.proxy_helper)
{
diagnosticspb::ServerInfoRequest request;
request.set_tp(static_cast<diagnosticspb::ServerInfoType>(1));
diagnosticspb::ServerInfoResponse response;
std::string req = request.SerializeAsString();
auto * helper = tiflash_instance_wrap.proxy_helper;
helper->fn_server_info(helper->proxy_ptr, strIntoView(&req), &response);
server_info.parseSysInfo(response);
setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores);
computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores);
LOG_INFO(log, "ServerInfo: {}", server_info.debugString());
}
else
{
setNumberOfLogicalCPUCores(std::thread::hardware_concurrency());
computeAndSetNumberOfPhysicalCPUCores(std::thread::hardware_concurrency(), std::thread::hardware_concurrency() / 2);
LOG_INFO(log, "TiFlashRaftProxyHelper is null, failed to get server info");
}

// print necessary grpc log.
grpc_log = &Poco::Logger::get("grpc");
Expand Down
98 changes: 98 additions & 0 deletions dbms/src/Server/ServerInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <common/getMemoryAmount.h>
#include <common/types.h>

#include <thread>
#include <vector>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#ifdef __clang__
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <kvproto/diagnosticspb.grpc.pb.h>
#pragma GCC diagnostic pop

namespace DB
{
class ServerInfo
{
public:
struct CPUInfo
{
/// number of logical CPU cores
UInt16 logical_cores = std::thread::hardware_concurrency();
/// number of physical CPU cores
UInt16 physical_cores = std::thread::hardware_concurrency() / 2;
/// number of L1 cache size
/// units: Byte
UInt32 l1_cache_size = 16384; // 16KB (typical value)
/// number of L2 cache size
/// units: Byte
UInt32 l2_cache_size = 65536; // 64KB (typical value)
/// number of L3 cache size
/// units: Byte
UInt32 l3_cache_size = 2097152; // 2MB (typical value)
/// number of L1 cache line size
UInt8 l1_cache_line_size = 64; // 64B (typical value)
/// number of L2 cache line size
UInt8 l2_cache_line_size = 64; // 64B (typical value)
/// number of L3 cache line size
UInt8 l3_cache_line_size = 64; // 64B (typical value)
/// CPU architecture
String arch;
/// CPU frequency
String frequency;
};

struct Disk
{
String name;
enum DiskType
{
UNKNOWN = 0,
HDD = 1,
SSD = 2
};
DiskType disk_type;
UInt64 total_space = 0;
UInt64 free_space = 0;
String mount_point;
String fs_type;
};
using DiskInfo = std::vector<Disk>;

struct MemoryInfo
{
/// total memory size
/// units: Byte
UInt64 capacity = getMemoryAmount();
};

ServerInfo() = default;
~ServerInfo() = default;
void parseCPUInfo(const diagnosticspb::ServerInfoItem & cpu_info_item);
void parseDiskInfo(const diagnosticspb::ServerInfoItem & disk_info_item);
void parseMemoryInfo(const diagnosticspb::ServerInfoItem & memory_info_item);
void parseSysInfo(const diagnosticspb::ServerInfoResponse & sys_info_response);
String debugString() const;

CPUInfo cpu_info;
DiskInfo disk_infos;
MemoryInfo memory_info;
};
} // namespace DB