Skip to content

Commit

Permalink
Optimize exchange operator & Support mpp version (pingcap#6596)
Browse files Browse the repository at this point in the history
close pingcap#6620

Signed-off-by: ywqzzy <592838129@qq.com>
  • Loading branch information
solotzg authored and ywqzzy committed Feb 13, 2023
1 parent 938bf07 commit e264033
Show file tree
Hide file tree
Showing 66 changed files with 2,382 additions and 303 deletions.
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,6 @@ if (ARCH_AMD64)
else()
add_definitions(-DTIFLASH_COMPILER_VPCLMULQDQ_SUPPORT=0)
endif()

check_cxx_compiler_flag("-mmovbe" TIFLASH_COMPILER_MOVBE_SUPPORT)
else()
add_definitions(-DTIFLASH_COMPILER_VPCLMULQDQ_SUPPORT=0)
endif()
Expand Down
17 changes: 16 additions & 1 deletion cmake/cpu_features.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ elseif (ARCH_AMD64)
# so we do not set the flags to avoid core dump in old machines
option (TIFLASH_ENABLE_AVX_SUPPORT "Use AVX/AVX2 instructions on x86_64" ON)
option (TIFLASH_ENABLE_AVX512_SUPPORT "Use AVX512 instructions on x86_64" ON)

# `haswell` was released since 2013 with cpu feature avx2, bmi2. It's a practical arch for optimizer
option (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT "Use instructions based on architecture `haswell` on x86_64" ON)

option (NO_SSE42_OR_HIGHER "Disable SSE42 or higher on x86_64 for maximum compatibility with older/embedded hardware." OFF)
if (NO_SSE42_OR_HIGHER)
SET(TIFLASH_ENABLE_AVX_SUPPORT OFF)
SET(TIFLASH_ENABLE_AVX512_SUPPORT OFF)
SET (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT OFF)
endif()

set (TEST_FLAG "-mssse3")
Expand Down Expand Up @@ -171,7 +175,8 @@ elseif (ARCH_AMD64)
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${TEST_FLAG}")
endif ()

set (TEST_FLAG "-mavx -mavx2")
set (TIFLASH_COMPILER_AVX2_FLAG "-mavx2")
set (TEST_FLAG "${TIFLASH_COMPILER_AVX2_FLAG}")
set (CMAKE_REQUIRED_FLAGS "${TEST_FLAG} -O0")
check_cxx_source_compiles("
#include <immintrin.h>
Expand Down Expand Up @@ -208,6 +213,16 @@ elseif (ARCH_AMD64)
add_definitions(-DTIFLASH_ENABLE_AVX512_SUPPORT=1)
endif ()

set (TIFLASH_COMPILER_MOVBE_FLAG "-mmovbe")
check_cxx_compiler_flag("${TIFLASH_COMPILER_MOVBE_FLAG}" TIFLASH_COMPILER_MOVBE_SUPPORT)
set (TIFLASH_COMPILER_BMI2_FLAG "-mbmi2")
check_cxx_compiler_flag("${TIFLASH_COMPILER_BMI2_FLAG}" TIFLASH_COMPILER_BMI2_SUPPORT)

set (TIFLASH_COMPILER_ARCH_HASWELL_FLAG "-march=haswell")
check_cxx_compiler_flag("${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}" TIFLASH_COMPILER_ARCH_HASWELL_SUPPORT)
if (NOT TIFLASH_COMPILER_ARCH_HASWELL_SUPPORT)
set (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT OFF)
endif ()
else ()
# ignore all other platforms
endif ()
Expand Down
5 changes: 5 additions & 0 deletions contrib/lz4-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ add_library (lz4
target_compile_definitions(lz4 PUBLIC LZ4_DISABLE_DEPRECATE_WARNINGS=1)

target_include_directories(lz4 PUBLIC ${LIBRARY_DIR})

if (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}")
endif ()
5 changes: 5 additions & 0 deletions contrib/zstd-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,8 @@ ENABLE_LANGUAGE(ASM)
ADD_LIBRARY(zstd ${Sources} ${Headers})

target_include_directories (zstd PUBLIC ${LIBRARY_DIR})

if (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}")
endif ()
7 changes: 6 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,13 @@ add_headers_and_sources(dbms src/Client)
add_headers_only(dbms src/Flash/Coprocessor)
add_headers_only(dbms src/Server)

add_sources_compile_flag_avx2 (
check_then_add_sources_compile_flag (
TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT
"${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}"
src/Columns/ColumnString.cpp
src/Columns/ColumnsCommon.cpp
src/Columns/ColumnVector.cpp
src/DataTypes/DataTypeString.cpp
)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
Expand Down Expand Up @@ -202,6 +206,7 @@ target_link_libraries (clickhouse_common_io
magic_enum
libsymbolization
)

target_include_directories (clickhouse_common_io BEFORE PRIVATE ${kvClient_SOURCE_DIR}/include)
target_compile_definitions(clickhouse_common_io PUBLIC -DTIFLASH_SOURCE_PREFIX=\"${TiFlash_SOURCE_DIR}\")
target_link_libraries (dbms
Expand Down
129 changes: 75 additions & 54 deletions dbms/src/Columns/ColumnsCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,81 @@
#include <Columns/IColumn.h>
#include <common/memcpy.h>

#ifdef TIFLASH_ENABLE_AVX_SUPPORT
ASSERT_USE_AVX2_COMPILE_FLAG
#endif

namespace DB
{
#if defined(__SSE2__) && defined(__POPCNT__)
#if defined(__AVX2__) || defined(__SSE2__)
/// Transform 64-byte mask to 64-bit mask.
static UInt64 toBits64(const Int8 * bytes64)
inline UInt64 ToBits64(const Int8 * bytes64)
{
static const __m128i zero16 = _mm_setzero_si128();
#if defined(__AVX2__)
const auto check_block = _mm256_setzero_si256();
uint64_t mask0 = mem_utils::details::get_block32_cmp_eq_mask(bytes64, check_block);
uint64_t mask1 = mem_utils::details::get_block32_cmp_eq_mask(bytes64 + mem_utils::details::BLOCK32_SIZE, check_block);
auto res = mask0 | (mask1 << mem_utils::details::BLOCK32_SIZE);
return ~res;
#elif defined(__SSE2__)
const auto zero16 = _mm_setzero_si128();
UInt64 res
= static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64)), zero16)))
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 16)), zero16))) << 16)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 32)), zero16))) << 32)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 48)), zero16))) << 48);

return ~res;
#endif
}
#endif

size_t countBytesInFilter(const UInt8 * filt, size_t sz)
ALWAYS_INLINE inline static size_t
CountBytesInFilter(const UInt8 * filt, size_t start, size_t end)
{
#if defined(__AVX2__)
size_t size = end - start;
auto zero_cnt = mem_utils::details::avx2_byte_count(reinterpret_cast<const char *>(filt + start), size, 0);
return size - zero_cnt;
#else
size_t count = 0;

/** NOTE: In theory, `filt` should only contain zeros and ones.
* But, just in case, here the condition > 0 (to signed bytes) is used.
* It would be better to use != 0, then this does not allow SSE2.
*/

const Int8 * pos = reinterpret_cast<const Int8 *>(filt);
const Int8 * end = pos + sz;

#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + sz / 64 * 64;
const char * pos = reinterpret_cast<const char *>(filt);
pos += start;

for (; pos < end64; pos += 64)
{
count += __builtin_popcountll(toBits64(pos));
}
/// TODO Add duff device for tail?
#endif

for (; pos < end; ++pos)
const char * end_pos = pos + (end - start);
for (; pos < end_pos; ++pos)
count += *pos != 0;

return count;
#endif
}

size_t countBytesInFilter(const UInt8 * filt, size_t sz)
{
return CountBytesInFilter(filt, 0, sz);
}

size_t countBytesInFilter(const IColumn::Filter & filt)
{
return countBytesInFilter(filt.data(), filt.size());
return CountBytesInFilter(filt.data(), 0, filt.size());
}

size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map)
static inline size_t CountBytesInFilterWithNull(const Int8 * p1, const Int8 * p2, size_t size)
{
size_t count = 0;
for (size_t i = 0; i < size; ++i)
{
count += (p1[i] & ~p2[i]) != 0;
}
return count;
}

static inline size_t CountBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map, size_t start, size_t end)
{
size_t count = 0;

Expand All @@ -75,25 +99,27 @@ size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * nu
* It would be better to use != 0, then this does not allow SSE2.
*/

const Int8 * pos = reinterpret_cast<const Int8 *>(filt.data());
const Int8 * pos2 = reinterpret_cast<const Int8 *>(null_map);
const Int8 * end = pos + filt.size();
const Int8 * p1 = reinterpret_cast<const Int8 *>(filt.data()) + start;
const Int8 * p2 = reinterpret_cast<const Int8 *>(null_map) + start;
size_t size = end - start;

#if defined(__SSE2__) && defined(__POPCNT__)
const Int8 * end64 = pos + filt.size() / 64 * 64;

for (; pos < end64; pos += 64, pos2 += 64)
count += __builtin_popcountll(toBits64(pos) & ~toBits64(pos2));

/// TODO Add duff device for tail?
#if defined(__SSE2__) || defined(__AVX2__)
for (; size >= 64;)
{
count += __builtin_popcountll(ToBits64(p1) & ~ToBits64(p2));
p1 += 64, p2 += 64;
size -= 64;
}
#endif

for (; pos < end; ++pos)
count += (*pos & ~*pos2) != 0;

count += CountBytesInFilterWithNull(p1, p2, size);
return count;
}

size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map)
{
return CountBytesInFilterWithNull(filt, null_map, 0, filt.size());
}

std::vector<size_t> countColumnsSizeInSelector(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector)
{
std::vector<size_t> counts(num_columns);
Expand Down Expand Up @@ -151,7 +177,7 @@ struct ResultOffsetsBuilder

if (diff_offset > 0)
{
auto * const res_offsets_pos = &res_offsets[offsets_size_old];
auto * res_offsets_pos = &res_offsets[offsets_size_old];

/// adjust offsets
for (size_t i = 0; i < SIMD_BYTES; ++i)
Expand All @@ -178,7 +204,6 @@ struct NoResultOffsetsBuilder
}
};


template <typename T, typename ResultOffsetsBuilder>
void filterArraysImplGeneric(
const PaddedPODArray<T> & src_elems,
Expand All @@ -204,40 +229,36 @@ void filterArraysImplGeneric(
res_elems.reserve((result_size_hint * src_elems.size() + size - 1) / size);
}

const UInt8 * filt_pos = &filt[0];
const auto * const filt_end = filt_pos + size;
const UInt8 * filt_pos = filt.data();
const auto * filt_end = filt_pos + size;

const auto * offsets_pos = &src_offsets[0];
const auto * const offsets_begin = offsets_pos;
const auto * offsets_pos = src_offsets.data();
const auto * offsets_begin = offsets_pos;

/// copy array ending at *end_offset_ptr
const auto copy_array = [&](const IColumn::Offset * offset_ptr) {
const auto offset = offset_ptr == offsets_begin ? 0 : offset_ptr[-1];
const auto size = *offset_ptr - offset;
const auto arr_offset = offset_ptr == offsets_begin ? 0 : offset_ptr[-1];
const auto arr_size = *offset_ptr - arr_offset;

result_offsets_builder.insertOne(size);
result_offsets_builder.insertOne(arr_size);

const auto elems_size_old = res_elems.size();
res_elems.resize(elems_size_old + size);
inline_memcpy(&res_elems[elems_size_old], &src_elems[offset], size * sizeof(T));
res_elems.resize(elems_size_old + arr_size);
inline_memcpy(&res_elems[elems_size_old], &src_elems[arr_offset], arr_size * sizeof(T));
};

#if __SSE2__
const __m128i zero_vec = _mm_setzero_si128();
static constexpr size_t SIMD_BYTES = 16;
const auto * const filt_end_aligned = filt_pos + size / SIMD_BYTES * SIMD_BYTES;
static constexpr size_t SIMD_BYTES = mem_utils::details::BLOCK16_SIZE;
const auto * filt_end_aligned = filt_pos + size / SIMD_BYTES * SIMD_BYTES;
const auto zero_vec = _mm_setzero_si128();

while (filt_pos < filt_end_aligned)
{
uint32_t mask = _mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)),
mem_utils::details::load_block16(filt_pos),
zero_vec));

if (mask == 0)
{
/// SIMD_BYTES consecutive rows do not pass the filter
}
else if (mask == 0xffff)
if (mem_utils::details::Block16Mask == mask)
{
/// SIMD_BYTES consecutive rows pass the filter
const auto first = offsets_pos == offsets_begin;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_ingest_via_replace) \
M(unblock_query_init_after_write) \
M(exception_in_merged_task_init) \
M(invalid_mpp_version) \
M(force_fail_in_flush_region_data)


Expand Down
12 changes: 5 additions & 7 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ class MPMCQueue
template <typename Duration>
ALWAYS_INLINE Result popTimeout(T & obj, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
auto deadline = SteadyClock::now() + timeout;
return popObj<true>(obj, &deadline);
}

Expand Down Expand Up @@ -138,8 +137,7 @@ class MPMCQueue
template <typename U, typename Duration>
ALWAYS_INLINE Result pushTimeout(U && u, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
auto deadline = SteadyClock::now() + timeout;
return pushObj<true>(std::forward<U>(u), &deadline);
}

Expand All @@ -162,8 +160,7 @@ class MPMCQueue
template <typename... Args, typename Duration>
ALWAYS_INLINE Result emplaceTimeout(Args &&... args, const Duration & timeout)
{
/// std::condition_variable::wait_until will always use system_clock.
auto deadline = std::chrono::system_clock::now() + timeout;
auto deadline = SteadyClock::now() + timeout;
return emplaceObj<true>(&deadline, std::forward<Args>(args)...);
}

Expand Down Expand Up @@ -222,7 +219,8 @@ class MPMCQueue
}

private:
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
using SteadyClock = std::chrono::steady_clock;
using TimePoint = SteadyClock::time_point;
using WaitingNode = MPMCQueueDetail::WaitingNode;

void notifyAll()
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ namespace DB
F(type_mpp_establish_conn, {{"type", "mpp_tunnel"}}), \
F(type_mpp_establish_conn_local, {{"type", "mpp_tunnel_local"}}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}})) \
M(tiflash_exchange_data_bytes, "Total bytes sent by exchange operators", Counter, \
F(type_hash_original, {"type", "hash_original"}), /*the original data size by hash exchange*/ \
F(type_hash_none_compression_remote, {"type", "hash_none_compression_remote"}), /*the remote exchange data size by hash partition with no compression*/\
F(type_hash_none_compression_local, {"type", "hash_none_compression_local"}), /*the local exchange data size by hash partition with no compression*/ \
F(type_hash_lz4_compression, {"type", "hash_lz4_compression"}), /*the exchange data size by hash partition with lz4 compression*/ \
F(type_hash_zstd_compression, {"type", "hash_zstd_compression"}), /*the exchange data size by hash partition with zstd compression*/ \
F(type_broadcast_passthrough_original, {"type", "broadcast_passthrough_original"}), /*the original exchange data size by broadcast/passthough*/ \
F(type_broadcast_passthrough_none_compression_local, {"type", "broadcast_passthrough_none_compression_local"}), /*the local exchange data size by broadcast/passthough with no compression*/ \
F(type_broadcast_passthrough_none_compression_remote, {"type", "broadcast_passthrough_none_compression_remote"}), /*the remote exchange data size by broadcast/passthough with no compression*/ \
) \
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
Expand Down
Loading

0 comments on commit e264033

Please sign in to comment.