diff --git a/CMakeLists.txt b/CMakeLists.txt index 2f5428909c8..a851788034e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -380,8 +380,6 @@ if (ARCH_AMD64) else() add_definitions(-DTIFLASH_COMPILER_VPCLMULQDQ_SUPPORT=0) endif() - - check_cxx_compiler_flag("-mmovbe" TIFLASH_COMPILER_MOVBE_SUPPORT) else() add_definitions(-DTIFLASH_COMPILER_VPCLMULQDQ_SUPPORT=0) endif() diff --git a/cmake/cpu_features.cmake b/cmake/cpu_features.cmake index 4d09329bf97..5eb56288ebe 100644 --- a/cmake/cpu_features.cmake +++ b/cmake/cpu_features.cmake @@ -95,11 +95,15 @@ elseif (ARCH_AMD64) # so we do not set the flags to avoid core dump in old machines option (TIFLASH_ENABLE_AVX_SUPPORT "Use AVX/AVX2 instructions on x86_64" ON) option (TIFLASH_ENABLE_AVX512_SUPPORT "Use AVX512 instructions on x86_64" ON) + + # `haswell` was released since 2013 with cpu feature avx2, bmi2. It's a practical arch for optimizer + option (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT "Use instructions based on architecture `haswell` on x86_64" ON) option (NO_SSE42_OR_HIGHER "Disable SSE42 or higher on x86_64 for maximum compatibility with older/embedded hardware." OFF) if (NO_SSE42_OR_HIGHER) SET(TIFLASH_ENABLE_AVX_SUPPORT OFF) SET(TIFLASH_ENABLE_AVX512_SUPPORT OFF) + SET (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT OFF) endif() set (TEST_FLAG "-mssse3") @@ -171,7 +175,8 @@ elseif (ARCH_AMD64) set (COMPILER_FLAGS "${COMPILER_FLAGS} ${TEST_FLAG}") endif () - set (TEST_FLAG "-mavx -mavx2") + set (TIFLASH_COMPILER_AVX2_FLAG "-mavx2") + set (TEST_FLAG "${TIFLASH_COMPILER_AVX2_FLAG}") set (CMAKE_REQUIRED_FLAGS "${TEST_FLAG} -O0") check_cxx_source_compiles(" #include @@ -208,6 +213,16 @@ elseif (ARCH_AMD64) add_definitions(-DTIFLASH_ENABLE_AVX512_SUPPORT=1) endif () + set (TIFLASH_COMPILER_MOVBE_FLAG "-mmovbe") + check_cxx_compiler_flag("${TIFLASH_COMPILER_MOVBE_FLAG}" TIFLASH_COMPILER_MOVBE_SUPPORT) + set (TIFLASH_COMPILER_BMI2_FLAG "-mbmi2") + check_cxx_compiler_flag("${TIFLASH_COMPILER_BMI2_FLAG}" TIFLASH_COMPILER_BMI2_SUPPORT) + + set (TIFLASH_COMPILER_ARCH_HASWELL_FLAG "-march=haswell") + check_cxx_compiler_flag("${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}" TIFLASH_COMPILER_ARCH_HASWELL_SUPPORT) + if (NOT TIFLASH_COMPILER_ARCH_HASWELL_SUPPORT) + set (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT OFF) + endif () else () # ignore all other platforms endif () diff --git a/contrib/kvproto b/contrib/kvproto index 9ccc6beaf0a..7cd28226c2a 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 9ccc6beaf0aa9b0a4adad43b497348898ba653cf +Subproject commit 7cd28226c2a21e489b077a87e1f5c9ba2c950944 diff --git a/contrib/lz4-cmake/CMakeLists.txt b/contrib/lz4-cmake/CMakeLists.txt index fd2d3d69004..d46fe8c5905 100644 --- a/contrib/lz4-cmake/CMakeLists.txt +++ b/contrib/lz4-cmake/CMakeLists.txt @@ -10,3 +10,8 @@ add_library (lz4 target_compile_definitions(lz4 PUBLIC LZ4_DISABLE_DEPRECATE_WARNINGS=1) target_include_directories(lz4 PUBLIC ${LIBRARY_DIR}) + +if (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT) + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}") + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}") +endif () diff --git a/contrib/zstd-cmake/CMakeLists.txt b/contrib/zstd-cmake/CMakeLists.txt index 0505ad60774..7ab9edd8469 100644 --- a/contrib/zstd-cmake/CMakeLists.txt +++ b/contrib/zstd-cmake/CMakeLists.txt @@ -155,3 +155,8 @@ ENABLE_LANGUAGE(ASM) ADD_LIBRARY(zstd ${Sources} ${Headers}) target_include_directories (zstd PUBLIC ${LIBRARY_DIR}) + +if (TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT) + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}") + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}") +endif () diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index f993aead814..eef01dbfc55 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -106,9 +106,13 @@ add_headers_and_sources(dbms src/Client) add_headers_only(dbms src/Flash/Coprocessor) add_headers_only(dbms src/Server) -add_sources_compile_flag_avx2 ( +check_then_add_sources_compile_flag ( + TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT + "${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}" src/Columns/ColumnString.cpp src/Columns/ColumnsCommon.cpp + src/Columns/ColumnVector.cpp + src/DataTypes/DataTypeString.cpp ) list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) @@ -202,6 +206,7 @@ target_link_libraries (clickhouse_common_io magic_enum libsymbolization ) + target_include_directories (clickhouse_common_io BEFORE PRIVATE ${kvClient_SOURCE_DIR}/include) target_compile_definitions(clickhouse_common_io PUBLIC -DTIFLASH_SOURCE_PREFIX=\"${TiFlash_SOURCE_DIR}\") target_link_libraries (dbms diff --git a/dbms/src/Columns/ColumnsCommon.cpp b/dbms/src/Columns/ColumnsCommon.cpp index da468c86505..e2d2a130313 100644 --- a/dbms/src/Columns/ColumnsCommon.cpp +++ b/dbms/src/Columns/ColumnsCommon.cpp @@ -16,25 +16,42 @@ #include #include +#ifdef TIFLASH_ENABLE_AVX_SUPPORT +ASSERT_USE_AVX2_COMPILE_FLAG +#endif + namespace DB { -#if defined(__SSE2__) && defined(__POPCNT__) +#if defined(__AVX2__) || defined(__SSE2__) /// Transform 64-byte mask to 64-bit mask. -static UInt64 toBits64(const Int8 * bytes64) +inline UInt64 ToBits64(const Int8 * bytes64) { - static const __m128i zero16 = _mm_setzero_si128(); +#if defined(__AVX2__) + const auto check_block = _mm256_setzero_si256(); + uint64_t mask0 = mem_utils::details::get_block32_cmp_eq_mask(bytes64, check_block); + uint64_t mask1 = mem_utils::details::get_block32_cmp_eq_mask(bytes64 + mem_utils::details::BLOCK32_SIZE, check_block); + auto res = mask0 | (mask1 << mem_utils::details::BLOCK32_SIZE); + return ~res; +#elif defined(__SSE2__) + const auto zero16 = _mm_setzero_si128(); UInt64 res = static_cast(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast(bytes64)), zero16))) | (static_cast(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast(bytes64 + 16)), zero16))) << 16) | (static_cast(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast(bytes64 + 32)), zero16))) << 32) | (static_cast(_mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast(bytes64 + 48)), zero16))) << 48); - return ~res; +#endif } #endif -size_t countBytesInFilter(const UInt8 * filt, size_t sz) +ALWAYS_INLINE inline static size_t +CountBytesInFilter(const UInt8 * filt, size_t start, size_t end) { +#if defined(__AVX2__) + size_t size = end - start; + auto zero_cnt = mem_utils::details::avx2_byte_count(reinterpret_cast(filt + start), size, 0); + return size - zero_cnt; +#else size_t count = 0; /** NOTE: In theory, `filt` should only contain zeros and ones. @@ -42,31 +59,38 @@ size_t countBytesInFilter(const UInt8 * filt, size_t sz) * It would be better to use != 0, then this does not allow SSE2. */ - const Int8 * pos = reinterpret_cast(filt); - const Int8 * end = pos + sz; - -#if defined(__SSE2__) && defined(__POPCNT__) - const Int8 * end64 = pos + sz / 64 * 64; + const char * pos = reinterpret_cast(filt); + pos += start; - for (; pos < end64; pos += 64) - { - count += __builtin_popcountll(toBits64(pos)); - } - /// TODO Add duff device for tail? -#endif - - for (; pos < end; ++pos) + const char * end_pos = pos + (end - start); + for (; pos < end_pos; ++pos) count += *pos != 0; return count; +#endif +} + +size_t countBytesInFilter(const UInt8 * filt, size_t sz) +{ + return CountBytesInFilter(filt, 0, sz); } size_t countBytesInFilter(const IColumn::Filter & filt) { - return countBytesInFilter(filt.data(), filt.size()); + return CountBytesInFilter(filt.data(), 0, filt.size()); } -size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map) +static inline size_t CountBytesInFilterWithNull(const Int8 * p1, const Int8 * p2, size_t size) +{ + size_t count = 0; + for (size_t i = 0; i < size; ++i) + { + count += (p1[i] & ~p2[i]) != 0; + } + return count; +} + +static inline size_t CountBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map, size_t start, size_t end) { size_t count = 0; @@ -75,25 +99,27 @@ size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * nu * It would be better to use != 0, then this does not allow SSE2. */ - const Int8 * pos = reinterpret_cast(filt.data()); - const Int8 * pos2 = reinterpret_cast(null_map); - const Int8 * end = pos + filt.size(); + const Int8 * p1 = reinterpret_cast(filt.data()) + start; + const Int8 * p2 = reinterpret_cast(null_map) + start; + size_t size = end - start; -#if defined(__SSE2__) && defined(__POPCNT__) - const Int8 * end64 = pos + filt.size() / 64 * 64; - - for (; pos < end64; pos += 64, pos2 += 64) - count += __builtin_popcountll(toBits64(pos) & ~toBits64(pos2)); - - /// TODO Add duff device for tail? +#if defined(__SSE2__) || defined(__AVX2__) + for (; size >= 64;) + { + count += __builtin_popcountll(ToBits64(p1) & ~ToBits64(p2)); + p1 += 64, p2 += 64; + size -= 64; + } #endif - - for (; pos < end; ++pos) - count += (*pos & ~*pos2) != 0; - + count += CountBytesInFilterWithNull(p1, p2, size); return count; } +size_t countBytesInFilterWithNull(const IColumn::Filter & filt, const UInt8 * null_map) +{ + return CountBytesInFilterWithNull(filt, null_map, 0, filt.size()); +} + std::vector countColumnsSizeInSelector(IColumn::ColumnIndex num_columns, const IColumn::Selector & selector) { std::vector counts(num_columns); @@ -151,7 +177,7 @@ struct ResultOffsetsBuilder if (diff_offset > 0) { - auto * const res_offsets_pos = &res_offsets[offsets_size_old]; + auto * res_offsets_pos = &res_offsets[offsets_size_old]; /// adjust offsets for (size_t i = 0; i < SIMD_BYTES; ++i) @@ -178,7 +204,6 @@ struct NoResultOffsetsBuilder } }; - template void filterArraysImplGeneric( const PaddedPODArray & src_elems, @@ -204,40 +229,36 @@ void filterArraysImplGeneric( res_elems.reserve((result_size_hint * src_elems.size() + size - 1) / size); } - const UInt8 * filt_pos = &filt[0]; - const auto * const filt_end = filt_pos + size; + const UInt8 * filt_pos = filt.data(); + const auto * filt_end = filt_pos + size; - const auto * offsets_pos = &src_offsets[0]; - const auto * const offsets_begin = offsets_pos; + const auto * offsets_pos = src_offsets.data(); + const auto * offsets_begin = offsets_pos; /// copy array ending at *end_offset_ptr const auto copy_array = [&](const IColumn::Offset * offset_ptr) { - const auto offset = offset_ptr == offsets_begin ? 0 : offset_ptr[-1]; - const auto size = *offset_ptr - offset; + const auto arr_offset = offset_ptr == offsets_begin ? 0 : offset_ptr[-1]; + const auto arr_size = *offset_ptr - arr_offset; - result_offsets_builder.insertOne(size); + result_offsets_builder.insertOne(arr_size); const auto elems_size_old = res_elems.size(); - res_elems.resize(elems_size_old + size); - inline_memcpy(&res_elems[elems_size_old], &src_elems[offset], size * sizeof(T)); + res_elems.resize(elems_size_old + arr_size); + inline_memcpy(&res_elems[elems_size_old], &src_elems[arr_offset], arr_size * sizeof(T)); }; #if __SSE2__ - const __m128i zero_vec = _mm_setzero_si128(); - static constexpr size_t SIMD_BYTES = 16; - const auto * const filt_end_aligned = filt_pos + size / SIMD_BYTES * SIMD_BYTES; + static constexpr size_t SIMD_BYTES = mem_utils::details::BLOCK16_SIZE; + const auto * filt_end_aligned = filt_pos + size / SIMD_BYTES * SIMD_BYTES; + const auto zero_vec = _mm_setzero_si128(); while (filt_pos < filt_end_aligned) { uint32_t mask = _mm_movemask_epi8(_mm_cmpgt_epi8( - _mm_loadu_si128(reinterpret_cast(filt_pos)), + mem_utils::details::load_block16(filt_pos), zero_vec)); - if (mask == 0) - { - /// SIMD_BYTES consecutive rows do not pass the filter - } - else if (mask == 0xffff) + if (mem_utils::details::Block16Mask == mask) { /// SIMD_BYTES consecutive rows pass the filter const auto first = offsets_pos == offsets_begin; diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 205fc41c6ff..6b15cb3db6c 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -95,6 +95,7 @@ std::unordered_map> FailPointHelper::f M(force_ingest_via_replace) \ M(unblock_query_init_after_write) \ M(exception_in_merged_task_init) \ + M(invalid_mpp_version) \ M(force_fail_in_flush_region_data) diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index e966cd44547..1cdf4f0eb29 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -108,8 +108,7 @@ class MPMCQueue template ALWAYS_INLINE Result popTimeout(T & obj, const Duration & timeout) { - /// std::condition_variable::wait_until will always use system_clock. - auto deadline = std::chrono::system_clock::now() + timeout; + auto deadline = SteadyClock::now() + timeout; return popObj(obj, &deadline); } @@ -138,8 +137,7 @@ class MPMCQueue template ALWAYS_INLINE Result pushTimeout(U && u, const Duration & timeout) { - /// std::condition_variable::wait_until will always use system_clock. - auto deadline = std::chrono::system_clock::now() + timeout; + auto deadline = SteadyClock::now() + timeout; return pushObj(std::forward(u), &deadline); } @@ -162,8 +160,7 @@ class MPMCQueue template ALWAYS_INLINE Result emplaceTimeout(Args &&... args, const Duration & timeout) { - /// std::condition_variable::wait_until will always use system_clock. - auto deadline = std::chrono::system_clock::now() + timeout; + auto deadline = SteadyClock::now() + timeout; return emplaceObj(&deadline, std::forward(args)...); } @@ -222,7 +219,8 @@ class MPMCQueue } private: - using TimePoint = std::chrono::time_point; + using SteadyClock = std::chrono::steady_clock; + using TimePoint = SteadyClock::time_point; using WaitingNode = MPMCQueueDetail::WaitingNode; void notifyAll() diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index be19b6a8bd1..4273e0c137c 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -86,6 +86,16 @@ namespace DB F(type_mpp_establish_conn, {{"type", "mpp_tunnel"}}), \ F(type_mpp_establish_conn_local, {{"type", "mpp_tunnel_local"}}), \ F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}})) \ + M(tiflash_exchange_data_bytes, "Total bytes sent by exchange operators", Counter, \ + F(type_hash_original, {"type", "hash_original"}), /*the original data size by hash exchange*/ \ + F(type_hash_none_compression_remote, {"type", "hash_none_compression_remote"}), /*the remote exchange data size by hash partition with no compression*/\ + F(type_hash_none_compression_local, {"type", "hash_none_compression_local"}), /*the local exchange data size by hash partition with no compression*/ \ + F(type_hash_lz4_compression, {"type", "hash_lz4_compression"}), /*the exchange data size by hash partition with lz4 compression*/ \ + F(type_hash_zstd_compression, {"type", "hash_zstd_compression"}), /*the exchange data size by hash partition with zstd compression*/ \ + F(type_broadcast_passthrough_original, {"type", "broadcast_passthrough_original"}), /*the original exchange data size by broadcast/passthough*/ \ + F(type_broadcast_passthrough_none_compression_local, {"type", "broadcast_passthrough_none_compression_local"}), /*the local exchange data size by broadcast/passthough with no compression*/ \ + F(type_broadcast_passthrough_none_compression_remote, {"type", "broadcast_passthrough_none_compression_remote"}), /*the remote exchange data size by broadcast/passthough with no compression*/ \ + ) \ M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \ M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \ M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \ diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 45698220c7f..07e008b91d0 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -32,7 +32,7 @@ namespace DB { void DataTypeString::serializeBinary(const Field & field, WriteBuffer & ostr) const { - const String & s = get(field); + const auto & s = get(field); writeVarUInt(s.size(), ostr); writeString(s, ostr); } @@ -43,7 +43,7 @@ void DataTypeString::deserializeBinary(Field & field, ReadBuffer & istr) const UInt64 size; readVarUInt(size, istr); field = String(); - String & s = get(field); + auto & s = get(field); s.resize(size); istr.readStrict(&s[0], size); } @@ -59,7 +59,7 @@ void DataTypeString::serializeBinary(const IColumn & column, size_t row_num, Wri void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr) const { - ColumnString & column_string = static_cast(column); + auto & column_string = static_cast(column); ColumnString::Chars_t & data = column_string.getChars(); ColumnString::Offsets & offsets = column_string.getOffsets(); @@ -91,7 +91,7 @@ void DataTypeString::serializeBinaryBulk(const IColumn & column, WriteBuffer & o const ColumnString::Chars_t & data = column_string.getChars(); const ColumnString::Offsets & offsets = column_string.getOffsets(); - size_t size = column.size(); + size_t size = column_string.size(); if (!size) return; @@ -103,7 +103,7 @@ void DataTypeString::serializeBinaryBulk(const IColumn & column, WriteBuffer & o { UInt64 str_size = offsets[0] - 1; writeVarUInt(str_size, ostr); - ostr.write(reinterpret_cast(&data[0]), str_size); + ostr.write(reinterpret_cast(data.data()), str_size); ++offset; } @@ -136,44 +136,21 @@ static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars_t & data, Column if (size) { -#if __SSE2__ +#ifdef __SSE2__ /// An optimistic branch in which more efficient copying is possible. if (offset + 16 * UNROLL_TIMES <= data.capacity() && istr.position() + size + 16 * UNROLL_TIMES <= istr.buffer().end()) { - const __m128i * sse_src_pos = reinterpret_cast(istr.position()); + const auto * sse_src_pos = reinterpret_cast(istr.position()); const __m128i * sse_src_end = sse_src_pos + (size + (16 * UNROLL_TIMES - 1)) / 16 / UNROLL_TIMES * UNROLL_TIMES; - __m128i * sse_dst_pos = reinterpret_cast<__m128i *>(&data[offset - size - 1]); + auto * sse_dst_pos = reinterpret_cast<__m128i *>(&data[offset - size - 1]); while (sse_src_pos < sse_src_end) { - /// NOTE gcc 4.9.2 unrolls the loop, but for some reason uses only one xmm register. - /// for (size_t j = 0; j < UNROLL_TIMES; ++j) - /// _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j)); + for (size_t j = 0; j < UNROLL_TIMES; ++j) + _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j)); sse_src_pos += UNROLL_TIMES; sse_dst_pos += UNROLL_TIMES; - - if (UNROLL_TIMES >= 4) - __asm__("movdqu %0, %%xmm0" ::"m"(sse_src_pos[-4])); - if (UNROLL_TIMES >= 3) - __asm__("movdqu %0, %%xmm1" ::"m"(sse_src_pos[-3])); - if (UNROLL_TIMES >= 2) - __asm__("movdqu %0, %%xmm2" ::"m"(sse_src_pos[-2])); - if (UNROLL_TIMES >= 1) - __asm__("movdqu %0, %%xmm3" ::"m"(sse_src_pos[-1])); - - if (UNROLL_TIMES >= 4) - __asm__("movdqu %%xmm0, %0" - : "=m"(sse_dst_pos[-4])); - if (UNROLL_TIMES >= 3) - __asm__("movdqu %%xmm1, %0" - : "=m"(sse_dst_pos[-3])); - if (UNROLL_TIMES >= 2) - __asm__("movdqu %%xmm2, %0" - : "=m"(sse_dst_pos[-2])); - if (UNROLL_TIMES >= 1) - __asm__("movdqu %%xmm3, %0" - : "=m"(sse_dst_pos[-1])); } istr.position() += size; @@ -196,22 +173,18 @@ void DataTypeString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, ColumnString::Chars_t & data = column_string.getChars(); ColumnString::Offsets & offsets = column_string.getOffsets(); - double avg_chars_size; + double avg_chars_size = 1; /// By default reserve only for empty strings. - if (avg_value_size_hint && avg_value_size_hint > sizeof(offsets[0])) + if (avg_value_size_hint > 0.0 && avg_value_size_hint > sizeof(offsets[0])) { /// Randomly selected. constexpr auto avg_value_size_hint_reserve_multiplier = 1.2; avg_chars_size = (avg_value_size_hint - sizeof(offsets[0])) * avg_value_size_hint_reserve_multiplier; } - else - { - /// By default reserve only for empty strings. - avg_chars_size = 1; - } - data.reserve(data.size() + std::ceil(limit * avg_chars_size)); + size_t size_to_reserve = data.size() + static_cast(std::ceil(limit * avg_chars_size)); + data.reserve(size_to_reserve); offsets.reserve(offsets.size() + limit); @@ -241,7 +214,7 @@ void DataTypeString::serializeTextEscaped(const IColumn & column, size_t row_num template static inline void read(IColumn & column, Reader && reader) { - ColumnString & column_string = static_cast(column); + auto & column_string = static_cast(column); ColumnString::Chars_t & data = column_string.getChars(); ColumnString::Offsets & offsets = column_string.getOffsets(); diff --git a/dbms/src/Debug/ReadIndexStressTest.cpp b/dbms/src/Debug/ReadIndexStressTest.cpp index a4bd22f8cd5..7ab02671586 100644 --- a/dbms/src/Debug/ReadIndexStressTest.cpp +++ b/dbms/src/Debug/ReadIndexStressTest.cpp @@ -164,9 +164,9 @@ ReadIndexStressTest::TimeCost ReadIndexStressTest::run( const auto & kvstore = *tmt.getKVStore(); size_t timeout_ms = 10 * 1000; const auto wrap_time_cost = [&](std::function && f) { - auto start_time = Clock::now(); + auto start_time = std::chrono::steady_clock::now(); f(); - auto end_time = Clock ::now(); + auto end_time = std::chrono::steady_clock ::now(); auto time_cost = std::chrono::duration_cast(end_time - start_time); LOG_INFO(logger, "time cost {}", time_cost); return time_cost; diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 8ea5c7a4a57..f5cc6ea66cc 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -65,13 +65,15 @@ CHBlockChunkCodec::CHBlockChunkCodec(const DAGSchema & schema) size_t getExtraInfoSize(const Block & block) { - size_t size = 64; /// to hold some length of structures, such as column number, row number... + size_t size = 8 + 8; /// to hold some length of structures, such as column number, row number... size_t columns = block.columns(); for (size_t i = 0; i < columns; ++i) { const ColumnWithTypeAndName & column = block.safeGetByPosition(i); size += column.name.size(); + size += 8; size += column.type->getName().size(); + size += 8; if (column.column->isColumnConst()) { size += column.column->byteSize() * column.column->size(); @@ -80,7 +82,7 @@ size_t getExtraInfoSize(const Block & block) return size; } -void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit) +void WriteColumnData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit) { /** If there are columns-constants - then we materialize them. * (Since the data type does not know how to serialize / deserialize constants.) @@ -106,6 +108,11 @@ void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadB type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {}); } +size_t ApproxBlockBytes(const Block & block) +{ + return block.bytes() + getExtraInfoSize(block); +} + void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end) { /// only check block schema in CHBlock codec because for both @@ -117,7 +124,7 @@ void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t e throw TiFlashException("CHBlock encode only support encode whole block", Errors::Coprocessor::Internal); assert(output == nullptr); - output = std::make_unique(block.bytes() + getExtraInfoSize(block)); + output = std::make_unique(ApproxBlockBytes(block)); block.checkNumberOfRows(); size_t columns = block.columns(); @@ -134,7 +141,7 @@ void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t e writeStringBinary(column.type->getName(), *output); if (rows) - writeData(*column.type, column.column, *output, 0, 0); + WriteColumnData(*column.type, column.column, *output, 0, 0); } } diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h index 2fa520e19a9..73f4a890054 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h @@ -25,8 +25,8 @@ class CHBlockChunkCodec final : public ChunkCodec { public: CHBlockChunkCodec() = default; - CHBlockChunkCodec(const Block & header_); - CHBlockChunkCodec(const DAGSchema & schema); + explicit CHBlockChunkCodec(const Block & header_); + explicit CHBlockChunkCodec(const DAGSchema & schema); Block decode(const String &, const DAGSchema & schema) override; static Block decode(const String &, const Block & header); diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp new file mode 100644 index 00000000000..5840b0e8e57 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp @@ -0,0 +1,523 @@ + + +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ + +size_t ApproxBlockHeaderBytes(const Block & block) +{ + size_t size = 8 + 8; /// to hold some length of structures, such as column number, row number... + size_t columns = block.columns(); + for (size_t i = 0; i < columns; ++i) + { + const ColumnWithTypeAndName & column = block.safeGetByPosition(i); + size += column.name.size(); + size += 8; + size += column.type->getName().size(); + size += 8; + } + return size; +} + +void EncodeHeader(WriteBuffer & ostr, const Block & header, size_t rows) +{ + size_t columns = header.columns(); + writeVarUInt(columns, ostr); + writeVarUInt(rows, ostr); + + for (size_t i = 0; i < columns; i++) + { + const ColumnWithTypeAndName & column = header.safeGetByPosition(i); + writeStringBinary(column.name, ostr); + writeStringBinary(column.type->getName(), ostr); + } +} + +Block DecodeHeader(ReadBuffer & istr, const Block & header, size_t & total_rows) +{ + Block res; + + assert(!istr.eof()); + + size_t columns = 0; + { + readVarUInt(columns, istr); + readVarUInt(total_rows, istr); + } + if (header) + CodecUtils::checkColumnSize(header.columns(), columns); + + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + { + readBinary(column.name, istr); + if (header) + column.name = header.getByPosition(i).name; + String type_name; + readBinary(type_name, istr); + if (header) + { + CodecUtils::checkDataTypeName(i, header.getByPosition(i).type->getName(), type_name); + column.type = header.getByPosition(i).type; + } + else + { + const auto & data_type_factory = DataTypeFactory::instance(); + column.type = data_type_factory.get(type_name); + } + } + res.insert(std::move(column)); + } + + return res; +} + +static inline void decodeColumnsByBlock(ReadBuffer & istr, Block & res, size_t rows_to_read, size_t reserve_size) +{ + if (!rows_to_read) + return; + + auto && mutable_columns = res.mutateColumns(); + for (auto && column : mutable_columns) + { + if (reserve_size > 0) + column->reserve(std::max(rows_to_read, reserve_size)); + else + column->reserve(rows_to_read + column->size()); + } + + // Contain columns of multi blocks + size_t decode_rows = 0; + for (size_t sz = 0; decode_rows < rows_to_read; decode_rows += sz) + { + readVarUInt(sz, istr); + + assert(sz > 0); + + // Decode columns of one block + for (size_t i = 0; i < res.columns(); ++i) + { + /// Data + res.getByPosition(i).type->deserializeBinaryBulkWithMultipleStreams( + *mutable_columns[i], + [&](const IDataType::SubstreamPath &) { + return &istr; + }, + sz, + 0, + {}, + {}); + } + } + + assert(decode_rows == rows_to_read); + + res.setColumns(std::move(mutable_columns)); +} + +void DecodeColumns(ReadBuffer & istr, Block & res, size_t rows_to_read, size_t reserve_size) +{ + return decodeColumnsByBlock(istr, res, rows_to_read, reserve_size); +} + +CompressionMethod ToInternalCompressionMethod(tipb::CompressionMode compression_mode) +{ + switch (compression_mode) + { + case tipb::CompressionMode::NONE: + return CompressionMethod::NONE; + case tipb::CompressionMode::FAST: + return CompressionMethod::LZ4; // use LZ4 method as fast mode + case tipb::CompressionMode::HIGH_COMPRESSION: + return CompressionMethod::ZSTD; // use ZSTD method as HC mode + default: + return CompressionMethod::NONE; + } +} + +template +constexpr static bool isBlockType() +{ + return std::is_same_v>, Block>; +} + +template +static void calcColumnEncodeInfoImpl(ColumnsHolder && columns_holder, size_t & bytes, size_t & total_rows) +{ + bytes += 8 /*rows*/; + + if constexpr (isBlockType()) + { + const Block & block = columns_holder; + if (const auto rows = block.rows(); rows) + { + block.checkNumberOfRows(); + total_rows += rows; + bytes += block.bytes(); + } + } + else + { + // check each column + if likely (columns_holder.front()) + { + const auto rows = columns_holder.front()->size(); + total_rows += rows; + for (const auto & column : columns_holder) + { + RUNTIME_ASSERT(column); + RUNTIME_ASSERT(rows == column->size()); + bytes += column->byteSize(); + } + } + else + { + for (const auto & column : columns_holder) + { + RUNTIME_ASSERT(!column); + } + } + } +} + +static void calcColumnEncodeInfo(const std::vector & batch_columns, size_t & bytes, size_t & rows) +{ + for (auto && columns : batch_columns) + { + calcColumnEncodeInfoImpl(columns, bytes, rows); + } +} +static void calcColumnEncodeInfo(const std::vector & batch_columns, size_t & bytes, size_t & rows) +{ + for (auto && columns : batch_columns) + { + calcColumnEncodeInfoImpl(columns, bytes, rows); + } +} +static void calcColumnEncodeInfo(const std::vector & blocks, size_t & bytes, size_t & rows) +{ + for (auto && block : blocks) + { + calcColumnEncodeInfoImpl(block, bytes, rows); + } +} +static void calcColumnEncodeInfo(const MutableColumns & columns, size_t & bytes, size_t & rows) +{ + calcColumnEncodeInfoImpl(columns, bytes, rows); +} +static void calcColumnEncodeInfo(const Columns & columns, size_t & bytes, size_t & rows) +{ + calcColumnEncodeInfoImpl(columns, bytes, rows); +} +static void calcColumnEncodeInfo(const Block & block, size_t & bytes, size_t & rows) +{ + calcColumnEncodeInfoImpl(block, bytes, rows); +} + +struct CHBlockChunkCodecV1Impl +{ + CHBlockChunkCodecV1 & inner; + + explicit CHBlockChunkCodecV1Impl(CHBlockChunkCodecV1 & inner_) + : inner(inner_) + {} + + CHBlockChunkCodecV1::EncodeRes encode(const Block & block, CompressionMethod compression_method) + { + return encodeImpl(block, compression_method); + } + CHBlockChunkCodecV1::EncodeRes encode(const std::vector & blocks, CompressionMethod compression_method) + { + return encodeImpl(blocks, compression_method); + } + + static const ColumnPtr & toColumnPtr(const Columns & c, size_t index) + { + return c[index]; + } + static ColumnPtr toColumnPtr(Columns && c, size_t index) + { + return std::move(c[index]); + } + static ColumnPtr toColumnPtr(MutableColumns && c, size_t index) + { + return std::move(c[index]); + } + static ColumnPtr toColumnPtr(const MutableColumns & c, size_t index) + { + return c[index]->getPtr(); + } + static const ColumnPtr & toColumnPtr(const Block & block, size_t index) + { + return block.getByPosition(index).column; + } + + template + static size_t getRows(ColumnsHolder && columns_holder) + { + if constexpr (isBlockType()) + { + const Block & block = columns_holder; + size_t rows = block.rows(); + return rows; + } + else + { + if unlikely (!columns_holder.front()) + return 0; + size_t rows = columns_holder.front()->size(); + return rows; + } + } + + template + void encodeColumnImpl(ColumnsHolder && columns_holder, WriteBuffer * ostr_ptr) + { + size_t rows = getRows(std::forward(columns_holder)); + if (!rows) + return; + + // Encode row count for next columns + writeVarUInt(rows, *ostr_ptr); + + // Encode columns data + for (size_t col_index = 0; col_index < inner.header.columns(); ++col_index) + { + auto && col_type_name = inner.header.getByPosition(col_index); + auto && column_ptr = toColumnPtr(std::forward(columns_holder), col_index); + WriteColumnData(*col_type_name.type, column_ptr, *ostr_ptr, 0, 0); + } + + inner.encoded_rows += rows; + } + void encodeColumn(const MutableColumns & columns, WriteBuffer * ostr_ptr) + { + return encodeColumnImpl(columns, ostr_ptr); + } + void encodeColumn(const Columns & columns, WriteBuffer * ostr_ptr) + { + return encodeColumnImpl(columns, ostr_ptr); + } + void encodeColumn(const std::vector & batch_columns, WriteBuffer * ostr_ptr) + { + for (auto && batch : batch_columns) + { + encodeColumnImpl(batch, ostr_ptr); + } + } + void encodeColumn(std::vector && batch_columns, WriteBuffer * ostr_ptr) + { + for (auto && batch : batch_columns) + { + encodeColumnImpl(std::move(batch), ostr_ptr); + } + } + void encodeColumn(const std::vector & batch_columns, WriteBuffer * ostr_ptr) + { + for (auto && batch : batch_columns) + { + encodeColumnImpl(batch, ostr_ptr); + } + } + void encodeColumn(std::vector && batch_columns, WriteBuffer * ostr_ptr) + { + for (auto && batch : batch_columns) + { + encodeColumnImpl(std::move(batch), ostr_ptr); + } + } + void encodeColumn(const Block & block, WriteBuffer * ostr_ptr) + { + return encodeColumnImpl(block, ostr_ptr); + } + void encodeColumn(const std::vector & blocks, WriteBuffer * ostr_ptr) + { + for (auto && block : blocks) + { + encodeColumnImpl(std::move(block), ostr_ptr); + } + } + template + CHBlockChunkCodecV1::EncodeRes encodeImpl(VecColumns && batch_columns, CompressionMethod compression_method) + { + size_t column_encode_bytes = 0; + const size_t rows = ({ + size_t rows = 0; + // Calculate total rows and check data valid + calcColumnEncodeInfo(batch_columns, column_encode_bytes, rows); + rows; + }); + + if unlikely (0 == rows) + { + // no rows and no need to encode header + return {}; + } + + // compression method flag; NONE, LZ4, ZSTD, defined in `CompressionMethodByte` + // ... encoded by compression pattern ... + // header meta: + // columns count; + // total row count (multi parts); + // for each column: + // column name; + // column type; + // for each part: + // row count; + // columns data; + + size_t init_size = column_encode_bytes + inner.header_size + 1 /*compression method*/; + auto output_buffer = std::make_unique(init_size); + std::unique_ptr compress_codec{}; + WriteBuffer * ostr_ptr = output_buffer.get(); + + // Init compression writer + if (compression_method != CompressionMethod::NONE) + { + // CompressedWriteBuffer will encode compression method flag as first byte + compress_codec = std::make_unique( + *output_buffer, + CompressionSettings(compression_method), + init_size); + ostr_ptr = compress_codec.get(); + } + else + { + // Write compression method flag + output_buffer->write(static_cast(CompressionMethodByte::NONE)); + } + + // Encode header + EncodeHeader(*ostr_ptr, inner.header, rows); + // Encode column data + encodeColumn(std::forward(batch_columns), ostr_ptr); + + // Flush rest buffer + if (compress_codec) + { + compress_codec->next(); + inner.original_size += compress_codec->getUncompressedBytes(); + inner.compressed_size += compress_codec->getCompressedBytes(); + } + else + { + inner.original_size += output_buffer->count(); + } + + return output_buffer->releaseStr(); + } +}; + +CHBlockChunkCodecV1::CHBlockChunkCodecV1(const Block & header_) + : header(header_) + , header_size(ApproxBlockHeaderBytes(header)) +{ +} + +static void checkSchema(const Block & header, const Block & block) +{ + CodecUtils::checkColumnSize(header.columns(), block.columns()); + for (size_t column_index = 0; column_index < header.columns(); ++column_index) + { + auto && type_name = block.getByPosition(column_index).type->getName(); + CodecUtils::checkDataTypeName(column_index, header.getByPosition(column_index).type->getName(), type_name); + } +} + +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const Block & block, CompressionMethod compression_method, bool check_schema) +{ + if (check_schema) + { + checkSchema(header, block); + } + return CHBlockChunkCodecV1Impl{*this}.encode(block, compression_method); +} + +void CHBlockChunkCodecV1::clear() +{ + encoded_rows = 0; + original_size = 0; + compressed_size = 0; +} + +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const MutableColumns & columns, CompressionMethod compression_method) +{ + return CHBlockChunkCodecV1Impl{*this}.encodeImpl(columns, compression_method); +} +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const Columns & columns, CompressionMethod compression_method) +{ + return CHBlockChunkCodecV1Impl{*this}.encodeImpl(columns, compression_method); +} +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const std::vector & columns, CompressionMethod compression_method) +{ + return CHBlockChunkCodecV1Impl{*this}.encodeImpl(columns, compression_method); +} +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(std::vector && columns, CompressionMethod compression_method) +{ + return CHBlockChunkCodecV1Impl{*this}.encodeImpl(std::move(columns), compression_method); +} +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const std::vector & columns, CompressionMethod compression_method) +{ + return CHBlockChunkCodecV1Impl{*this}.encodeImpl(std::move(columns), compression_method); +} +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(std::vector && columns, CompressionMethod compression_method) +{ + return CHBlockChunkCodecV1Impl{*this}.encodeImpl(std::move(columns), compression_method); +} +CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const std::vector & blocks, CompressionMethod compression_method, bool check_schema) +{ + if (check_schema) + { + for (auto && block : blocks) + { + checkSchema(header, block); + } + } + + return CHBlockChunkCodecV1Impl{*this}.encode(blocks, compression_method); +} + +static Block decodeCompression(const Block & header, ReadBuffer & istr) +{ + size_t decoded_rows{}; + auto decoded_block = DecodeHeader(istr, header, decoded_rows); + DecodeColumns(istr, decoded_block, decoded_rows, 0); + assert(decoded_rows == decoded_block.rows()); + return decoded_block; +} + +Block CHBlockChunkCodecV1::decode(const Block & header, std::string_view str) +{ + assert(!str.empty()); + + // read first byte of compression method flag which defined in `CompressionMethodByte` + if (static_cast(str[0]) == CompressionMethodByte::NONE) + { + str = str.substr(1, str.size() - 1); + ReadBufferFromString buff_str(str); + return decodeCompression(header, buff_str); + } + ReadBufferFromString buff_str(str); + auto && istr = CompressedCHBlockChunkReadBuffer(buff_str); + return decodeCompression(header, istr); +} + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.h b/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.h new file mode 100644 index 00000000000..76331ce8314 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.h @@ -0,0 +1,60 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +size_t ApproxBlockHeaderBytes(const Block & block); +using CompressedCHBlockChunkReadBuffer = CompressedReadBuffer; +using CompressedCHBlockChunkWriteBuffer = CompressedWriteBuffer; +void EncodeHeader(WriteBuffer & ostr, const Block & header, size_t rows); +void DecodeColumns(ReadBuffer & istr, Block & res, size_t rows_to_read, size_t reserve_size = 0); +Block DecodeHeader(ReadBuffer & istr, const Block & header, size_t & rows); +CompressionMethod ToInternalCompressionMethod(tipb::CompressionMode compression_mode); +extern void WriteColumnData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit); + +struct CHBlockChunkCodecV1 : boost::noncopyable +{ + using Self = CHBlockChunkCodecV1; + using EncodeRes = std::string; + + const Block & header; + const size_t header_size; + + size_t encoded_rows{}; + size_t original_size{}; + size_t compressed_size{}; + + void clear(); + explicit CHBlockChunkCodecV1(const Block & header_); + // + EncodeRes encode(const MutableColumns & columns, CompressionMethod compression_method); + EncodeRes encode(std::vector && columns, CompressionMethod compression_method); + EncodeRes encode(const std::vector & columns, CompressionMethod compression_method); + EncodeRes encode(const Columns & columns, CompressionMethod compression_method); + EncodeRes encode(const std::vector & columns, CompressionMethod compression_method); + EncodeRes encode(std::vector && columns, CompressionMethod compression_method); + EncodeRes encode(const Block & block, CompressionMethod compression_method, bool check_schema = true); + EncodeRes encode(const std::vector & blocks, CompressionMethod compression_method, bool check_schema = true); + // + static Block decode(const Block & header, std::string_view str); +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp index a61a00cc77b..d609b167f01 100644 --- a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp +++ b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -25,6 +26,58 @@ CHBlockChunkDecodeAndSquash::CHBlockChunkDecodeAndSquash( { } +std::optional CHBlockChunkDecodeAndSquash::decodeAndSquashV1(std::string_view sv) +{ + if unlikely (sv.empty()) + { + std::optional res; + if (accumulated_block) + res.swap(accumulated_block); + return res; + } + + // read first byte of compression method flag which defined in `CompressionMethodByte` + if (static_cast(sv[0]) == CompressionMethodByte::NONE) + { + ReadBufferFromString istr(sv.substr(1, sv.size() - 1)); + return decodeAndSquashV1Impl(istr); + } + + ReadBufferFromString istr(sv); + auto && compress_buffer = CompressedCHBlockChunkReadBuffer(istr); + return decodeAndSquashV1Impl(compress_buffer); +} + +std::optional CHBlockChunkDecodeAndSquash::decodeAndSquashV1Impl(ReadBuffer & istr) +{ + std::optional res; + + if (!accumulated_block) + { + size_t rows{}; + Block block = DecodeHeader(istr, codec.header, rows); + if (rows) + { + DecodeColumns(istr, block, rows, static_cast(rows_limit * 1.5)); + accumulated_block.emplace(std::move(block)); + } + } + else + { + size_t rows{}; + DecodeHeader(istr, codec.header, rows); + DecodeColumns(istr, *accumulated_block, rows, 0); + } + + if (accumulated_block && accumulated_block->rows() >= rows_limit) + { + /// Return accumulated data and reset accumulated_block + res.swap(accumulated_block); + return res; + } + return res; +} + std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & str) { std::optional res; diff --git a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.h b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.h index b3745f89ffc..df5586d01b1 100644 --- a/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.h +++ b/dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.h @@ -27,8 +27,12 @@ class CHBlockChunkDecodeAndSquash CHBlockChunkDecodeAndSquash(const Block & header, size_t rows_limit_); ~CHBlockChunkDecodeAndSquash() = default; std::optional decodeAndSquash(const String &); + std::optional decodeAndSquashV1(std::string_view); std::optional flush(); +private: + std::optional decodeAndSquashV1Impl(ReadBuffer & istr); + private: CHBlockChunkCodec codec; std::optional accumulated_block; diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index ca6b7747d38..2e0a54a3e4f 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -741,7 +741,7 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) pipeline.transform([&](auto & stream) { // construct writer std::unique_ptr response_writer = newMPPExchangeWriter( - context.getDAGContext()->tunnel_set, + dagContext().tunnel_set, partition_col_ids, partition_col_collators, exchange_sender.tp(), @@ -750,7 +750,9 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) dagContext(), enable_fine_grained_shuffle, stream_count, - batch_size); + batch_size, + exchange_sender.compression(), + context.getSettingsRef().batch_send_min_limit_compression); stream = std::make_shared(stream, std::move(response_writer), log->identifier()); stream->setExtraInfo(extra_info); }); diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp new file mode 100644 index 00000000000..d9f7a5f5c78 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp @@ -0,0 +1,183 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + + +namespace DB::tests +{ + +// Return a block with **rows** and 5 Int64 column. +static Block prepareBlock(size_t rows) +{ + Block block; + for (size_t i = 0; i < 5; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; +} + +template +void test_enocde_release_data(VecCol && batch_columns, const Block & header, const size_t total_rows) +{ + // encode and release columns + const auto mode = CompressionMethod::LZ4; + + auto codec = CHBlockChunkCodecV1{ + header, + }; + auto str = codec.encode(std::forward(batch_columns), mode); + ASSERT_FALSE(str.empty()); + ASSERT_EQ(codec.encoded_rows, total_rows); + ASSERT_NE(codec.compressed_size, 0); + ASSERT_NE(codec.original_size, 0); + auto decoded_block = CHBlockChunkCodecV1::decode(header, str); + ASSERT_EQ(total_rows, decoded_block.rows()); + for (auto && columns : batch_columns) + { + for (auto && col : columns) + { + if (col) + ASSERT_EQ(col->size(), 0); + } + } + { + // test no rows + auto & empty_batch_columns = batch_columns; + auto str = codec.encode(empty_batch_columns, mode); + ASSERT_TRUE(str.empty()); + } +} + +TEST(CHBlockChunkCodec, ChunkCodecV1) +{ + size_t block_num = 10; + size_t rows = 10; + std::vector blocks; + auto header = prepareBlock(0); + for (size_t i = 0; i < block_num; ++i) + { + auto block = prepareBlock(rows); + blocks.emplace_back(std::move(block)); + } + blocks.emplace_back(prepareBlock(0)); + + auto total_rows = rows * block_num; + for (auto mode : {CompressionMethod::NONE, CompressionMethod::LZ4, CompressionMethod::ZSTD}) + { + { + // encode nothing if no rows + auto codec = CHBlockChunkCodecV1{ + header, + }; + auto str = codec.encode(header, mode); + ASSERT_TRUE(str.empty()); + ASSERT_EQ(codec.encoded_rows, 0); + ASSERT_EQ(codec.compressed_size, 0); + ASSERT_EQ(codec.original_size, 0); + } + { + auto codec = CHBlockChunkCodecV1{ + header, + }; + auto str = codec.encode(blocks.front(), mode); + ASSERT_FALSE(str.empty()); + ASSERT_EQ(codec.encoded_rows, blocks.front().rows()); + auto decoded_block = CHBlockChunkCodecV1::decode(header, str); + ASSERT_EQ(blocks.front().rows(), decoded_block.rows()); + } + { + // test encode blocks + auto codec = CHBlockChunkCodecV1{ + header, + }; + auto str = codec.encode(blocks, mode); + ASSERT_FALSE(str.empty()); + ASSERT_EQ(codec.encoded_rows, total_rows); + + if (mode == CompressionMethod::NONE) + ASSERT_EQ(codec.compressed_size, 0); + else + ASSERT_NE(codec.compressed_size, 0); + + ASSERT_NE(codec.original_size, 0); + + auto decoded_block = CHBlockChunkCodecV1::decode(header, str); + ASSERT_EQ(total_rows, decoded_block.rows()); + } + { + auto columns = prepareBlock(rows).getColumns(); + auto codec = CHBlockChunkCodecV1{ + header, + }; + auto str = codec.encode(columns, mode); + ASSERT_FALSE(str.empty()); + ASSERT_EQ(codec.encoded_rows, rows); + auto decoded_block = CHBlockChunkCodecV1::decode(header, str); + ASSERT_EQ(decoded_block.rows(), rows); + } + { + auto columns = prepareBlock(rows).mutateColumns(); + auto codec = CHBlockChunkCodecV1{ + header, + }; + auto str = codec.encode(columns, mode); + ASSERT_FALSE(str.empty()); + ASSERT_EQ(codec.encoded_rows, rows); + auto decoded_block = CHBlockChunkCodecV1::decode(header, str); + ASSERT_EQ(decoded_block.rows(), rows); + } + } + { + std::vector batch_columns; + for (size_t i = 0; i < block_num; ++i) + batch_columns.emplace_back(prepareBlock(rows).mutateColumns()); + batch_columns.emplace_back(prepareBlock(0).mutateColumns()); + { + auto tmp = prepareBlock(0).mutateColumns(); + for (auto && col : tmp) + { + col.reset(); + } + batch_columns.emplace_back(std::move(tmp)); + } + test_enocde_release_data(std::move(batch_columns), header, total_rows); + } + { + std::vector batch_columns; + for (size_t i = 0; i < block_num; ++i) + batch_columns.emplace_back(prepareBlock(rows).getColumns()); + batch_columns.emplace_back(prepareBlock(0).getColumns()); + { + auto tmp = prepareBlock(0).getColumns(); + for (auto && col : tmp) + { + col.reset(); + } + batch_columns.emplace_back(std::move(tmp)); + } + test_enocde_release_data(std::move(batch_columns), header, total_rows); + } +} +} // namespace DB::tests diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp index 69b6abc5569..da81bae040f 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -23,7 +25,6 @@ #include #include -#include namespace DB { namespace tests @@ -93,8 +94,8 @@ class TestChunkDecodeAndSquash : public testing::Test void doTestWork(bool flush_something) { - const size_t block_rows = 1024; - const size_t block_num = 256; + const size_t block_rows = 256; + const size_t block_num = 64; std::mt19937_64 rand_gen; // 1. Build Blocks. std::vector blocks; @@ -109,12 +110,29 @@ class TestChunkDecodeAndSquash : public testing::Test // 2. encode all blocks std::unique_ptr codec_stream = std::make_unique()->newCodecStream(makeFields()); std::vector encode_str_vec(block_num); + std::vector encode_str_use_compression(block_num, true); + size_t round_index = 0; for (const auto & block : blocks) { - codec_stream->encode(block, 0, block.rows()); - encode_str_vec.push_back(codec_stream->getString()); - codec_stream->clear(); + if (round_index % 3 == 0) + { + codec_stream->encode(block, 0, block.rows()); + encode_str_vec.push_back(codec_stream->getString()); + codec_stream->clear(); + encode_str_use_compression.emplace_back(false); + } + else + { + auto codec = CHBlockChunkCodecV1{block}; + auto && str = codec.encode(block, CompressionMethod::LZ4); + if (!str.empty()) + assert(static_cast(str[0]) == CompressionMethodByte::LZ4); + encode_str_vec.push_back(std::move(str)); + encode_str_use_compression.emplace_back(true); + } + round_index++; } + round_index = 0; // 3. DecodeAndSquash all these blocks Block header = blocks.back(); @@ -122,7 +140,17 @@ class TestChunkDecodeAndSquash : public testing::Test CHBlockChunkDecodeAndSquash decoder(header, block_rows * 4); for (const auto & str : encode_str_vec) { - auto result = decoder.decodeAndSquash(str); + std::optional result{}; + if (!encode_str_use_compression[round_index]) + { + result = decoder.decodeAndSquash(str); + } + else + { + result = decoder.decodeAndSquashV1(str); + } + round_index++; + if (result) decoded_blocks.push_back(std::move(result.value())); } diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 1561a60f31d..5fe4a7d8c80 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -28,11 +28,10 @@ #include #include -#include #include #include #include -#include + namespace DB { @@ -87,8 +86,11 @@ struct MockWriter void broadcastOrPassThroughWrite(Blocks & blocks) { - auto packet = MPPTunnelSetHelper::toPacket(blocks, result_field_types); + auto && packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types); ++total_packets; + if (!packet) + return; + if (!packet->packet.chunks().empty()) total_bytes += packet->packet.ByteSizeLong(); queue->push(std::move(packet)); @@ -120,6 +122,10 @@ struct MockWriter write(tmp); } uint16_t getPartitionNum() const { return 1; } + bool isLocal(size_t index) const + { + return index == 0; + } std::vector result_field_types; diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index f16dfdc5c29..7f9443b7a7f 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -33,6 +34,7 @@ #include #include #include +#include #include @@ -226,6 +228,15 @@ grpc::Status FlashService::DispatchMPPTask( auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) return check_result; + + // DO NOT register mpp task and return grpc error + if (auto mpp_version = request->meta().mpp_version(); !DB::CheckMppVersion(mpp_version)) + { + auto && err_msg = fmt::format("Failed to handling mpp dispatch request, reason=`{}`", DB::GenMppVersionErrorMessage(mpp_version)); + LOG_WARNING(log, err_msg); + return grpc::Status(grpc::StatusCode::CANCELLED, std::move(err_msg)); + } + GET_METRIC(tiflash_coprocessor_request_count, type_dispatch_mpp_task).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Increment(); GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Increment(); @@ -268,6 +279,31 @@ grpc::Status FlashService::IsAlive(grpc::ServerContext * grpc_context [[maybe_un auto & tmt_context = context->getTMTContext(); response->set_available(tmt_context.checkRunning()); + response->set_mpp_version(DB::GetMppVersion()); + return grpc::Status::OK; +} + +static grpc::Status CheckMppVersionForEstablishMPPConnection(const mpp::EstablishMPPConnectionRequest * request) +{ + const auto & sender_mpp_version = request->sender_meta().mpp_version(); + const auto & receiver_mpp_version = request->receiver_meta().mpp_version(); + + std::string && err_reason{}; + + if (!DB::CheckMppVersion(sender_mpp_version)) + { + err_reason += fmt::format("sender failed: {}; ", DB::GenMppVersionErrorMessage(sender_mpp_version)); + } + if (!DB::CheckMppVersion(receiver_mpp_version)) + { + err_reason += fmt::format("receiver failed: {}; ", DB::GenMppVersionErrorMessage(receiver_mpp_version)); + } + + if (!err_reason.empty()) + { + auto && err_msg = fmt::format("Failed to establish MPP connection, reason=`{}`", err_reason); + return grpc::Status(grpc::StatusCode::INTERNAL, std::move(err_msg)); + } return grpc::Status::OK; } @@ -284,6 +320,12 @@ grpc::Status AsyncFlashService::establishMPPConnectionAsync(grpc::ServerContext if (!check_result.ok()) return check_result; + if (auto res = CheckMppVersionForEstablishMPPConnection(request); !res.ok()) + { + LOG_WARNING(log, res.error_message()); + return res; + } + GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment(); @@ -302,6 +344,13 @@ grpc::Status FlashService::EstablishMPPConnection(grpc::ServerContext * grpc_con auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) return check_result; + + if (auto res = CheckMppVersionForEstablishMPPConnection(request); !res.ok()) + { + LOG_WARNING(log, res.error_message()); + return res; + } + GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment(); GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment(); @@ -355,6 +404,14 @@ grpc::Status FlashService::CancelMPPTask( auto check_result = checkGrpcContext(grpc_context); if (!check_result.ok()) return check_result; + + if (auto mpp_version = request->meta().mpp_version(); !DB::CheckMppVersion(mpp_version)) + { + auto && err_msg = fmt::format("Failed to cancel mpp task, reason=`{}`", DB::GenMppVersionErrorMessage(mpp_version)); + LOG_WARNING(log, err_msg); + return grpc::Status(grpc::StatusCode::INTERNAL, std::move(err_msg)); + } + GET_METRIC(tiflash_coprocessor_request_count, type_cancel_mpp_task).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment(); Stopwatch watch; @@ -402,6 +459,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest if (!status.ok()) { auto err = std::make_unique(); + err->set_mpp_version(DB::GetMppVersion()); err->set_msg("error status"); response->set_allocated_error(err.release()); return status; diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index da9a00f59de..88f29305dbe 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -86,7 +86,7 @@ class AsyncRequestHandler : public UnaryCallback { packets.resize(batch_packet_count); for (auto & packet : packets) - packet = std::make_shared(); + packet = std::make_shared(MPPDataPacketV0); start(); } @@ -265,7 +265,7 @@ class AsyncRequestHandler : public UnaryCallback return false; // can't reuse packet since it is sent to readers. - packet = std::make_shared(); + packet = std::make_shared(MPPDataPacketV0); } return true; } @@ -575,7 +575,7 @@ void ExchangeReceiverBase::readLoop(const Request & req) for (;;) { LOG_TRACE(log, "begin next "); - TrackedMppDataPacketPtr packet = std::make_shared(); + TrackedMppDataPacketPtr packet = std::make_shared(MPPDataPacketV0); bool success = reader->read(packet); if (!success) break; @@ -648,20 +648,45 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( if (recv_msg->chunks.empty()) return detail; - auto & packet = recv_msg->packet->packet; + auto & packet = recv_msg->packet->getPacket(); // Record total packet size even if fine grained shuffle is enabled. detail.packet_bytes = packet.ByteSizeLong(); - for (const String * chunk : recv_msg->chunks) + + switch (auto version = packet.version(); version) + { + case DB::MPPDataPacketV0: { - auto result = decoder_ptr->decodeAndSquash(*chunk); - if (!result) - continue; - detail.rows += result->rows(); - if likely (result->rows() > 0) + for (const auto * chunk : recv_msg->chunks) { - block_queue.push(std::move(result.value())); + auto result = decoder_ptr->decodeAndSquash(*chunk); + if (!result) + continue; + detail.rows += result->rows(); + if likely (result->rows() > 0) + { + block_queue.push(std::move(result.value())); + } } + return detail; + } + case DB::MPPDataPacketV1: + { + for (const auto * chunk : recv_msg->chunks) + { + auto && result = decoder_ptr->decodeAndSquashV1(*chunk); + if (!result || !result->rows()) + continue; + detail.rows += result->rows(); + block_queue.push(std::move(*result)); + } + return detail; + } + default: + { + RUNTIME_CHECK_MSG(false, "Unknown mpp packet version {}, please update TiFlash instance", version); + break; + } } return detail; } diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp index ad6ac0d6faf..b04851f8e30 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -21,6 +22,9 @@ namespace DB { + +const char * FineGrainedShuffleWriterLabels[] = {"FineGrainedShuffleWriter", "FineGrainedShuffleWriter-V1"}; + template FineGrainedShuffleWriter::FineGrainedShuffleWriter( ExchangeWriterPtr writer_, @@ -28,7 +32,9 @@ FineGrainedShuffleWriter::FineGrainedShuffleWriter( TiDB::TiDBCollators collators_, DAGContext & dag_context_, uint64_t fine_grained_shuffle_stream_count_, - UInt64 fine_grained_shuffle_batch_size_) + UInt64 fine_grained_shuffle_batch_size_, + MPPDataPacketVersion data_codec_version_, + tipb::CompressionMode compression_mode_) : DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_) , writer(writer_) , partition_col_ids(std::move(partition_col_ids_)) @@ -37,6 +43,8 @@ FineGrainedShuffleWriter::FineGrainedShuffleWriter( , fine_grained_shuffle_batch_size(fine_grained_shuffle_batch_size_) , batch_send_row_limit(fine_grained_shuffle_batch_size * fine_grained_shuffle_stream_count) , hash(0) + , data_codec_version(data_codec_version_) + , compression_method(ToInternalCompressionMethod(compression_mode_)) { rows_in_blocks = 0; partition_num = writer_->getPartitionNum(); @@ -60,6 +68,23 @@ void FineGrainedShuffleWriter::prepare(const Block & sample_b num_bucket = partition_num * fine_grained_shuffle_stream_count; partition_key_containers_for_reuse.resize(collators.size()); initScatterColumns(); + + switch (data_codec_version) + { + case MPPDataPacketV0: + break; + case MPPDataPacketV1: + default: + { + for (const auto & field_type : dag_context.result_field_types) + { + expected_types.emplace_back(getDataTypeByFieldTypeForComputingLayer(field_type)); + } + assertBlockSchema(expected_types, header, FineGrainedShuffleWriterLabels[MPPDataPacketV1]); + break; + } + } + prepared = true; } @@ -107,9 +132,12 @@ void FineGrainedShuffleWriter::initScatterColumns() } template -void FineGrainedShuffleWriter::batchWriteFineGrainedShuffle() +template +void FineGrainedShuffleWriter::batchWriteFineGrainedShuffleImpl() { - if (likely(!blocks.empty())) + if (blocks.empty()) + return; + { assert(rows_in_blocks > 0); assert(fine_grained_shuffle_stream_count <= 1024); @@ -118,6 +146,11 @@ void FineGrainedShuffleWriter::batchWriteFineGrainedShuffle() while (!blocks.empty()) { const auto & block = blocks.back(); + if constexpr (version != MPPDataPacketV0) + { + // check schema + assertBlockSchema(expected_types, block, FineGrainedShuffleWriterLabels[MPPDataPacketV1]); + } HashBaseWriterHelper::scatterColumnsForFineGrainedShuffle(block, partition_col_ids, collators, partition_key_containers_for_reuse, partition_num, fine_grained_shuffle_stream_count, hash, selector, scattered); blocks.pop_back(); } @@ -132,12 +165,33 @@ void FineGrainedShuffleWriter::batchWriteFineGrainedShuffle() bucket_idx, fine_grained_shuffle_stream_count, num_columns, - part_id); + part_id, + data_codec_version, + compression_method); } rows_in_blocks = 0; } } +template +void FineGrainedShuffleWriter::batchWriteFineGrainedShuffle() +{ + switch (data_codec_version) + { + case MPPDataPacketV0: + { + batchWriteFineGrainedShuffleImpl(); + break; + } + case MPPDataPacketV1: + default: + { + batchWriteFineGrainedShuffleImpl(); + break; + } + } +} + template class FineGrainedShuffleWriter; } // namespace DB diff --git a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h index e7b5e7603df..44b26dfc2ae 100644 --- a/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h +++ b/dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h @@ -19,9 +19,16 @@ #include #include +namespace DB::HashBaseWriterHelper +{ +struct HashPartitionWriterHelperV1; +} + namespace DB { class DAGContext; +enum class CompressionMethod; +enum MPPDataPacketVersion : int64_t; template class FineGrainedShuffleWriter : public DAGResponseWriter @@ -33,15 +40,18 @@ class FineGrainedShuffleWriter : public DAGResponseWriter TiDB::TiDBCollators collators_, DAGContext & dag_context_, UInt64 fine_grained_shuffle_stream_count_, - UInt64 fine_grained_shuffle_batch_size); + UInt64 fine_grained_shuffle_batch_size, + MPPDataPacketVersion data_codec_version_, + tipb::CompressionMode compression_mode_); void prepare(const Block & sample_block) override; void write(const Block & block) override; void flush() override; private: void batchWriteFineGrainedShuffle(); - void initScatterColumns(); + template + void batchWriteFineGrainedShuffleImpl(); private: ExchangeWriterPtr writer; @@ -60,6 +70,10 @@ class FineGrainedShuffleWriter : public DAGResponseWriter WeakHash32 hash; IColumn::Selector selector; std::vector scattered; // size = num_columns + // support data compression + DataTypes expected_types; + MPPDataPacketVersion data_codec_version; + CompressionMethod compression_method{}; }; } // namespace DB diff --git a/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp b/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp index 5698f27424e..ffed38b1664 100644 --- a/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp +++ b/dbms/src/Flash/Mpp/HashBaseWriterHelper.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include namespace DB::HashBaseWriterHelper { @@ -169,4 +171,17 @@ void scatterColumnsForFineGrainedShuffle(const Block & block, } } +HashPartitionWriterHelperV1::HashPartitionWriterHelperV1(const std::vector & field_types) +{ + for (const auto & field_type : field_types) + { + expected_types.emplace_back(getDataTypeByFieldTypeForComputingLayer(field_type)); + } +} +void HashPartitionWriterHelperV1::checkBlock(const Block & block) const +{ + DB::assertBlockSchema(expected_types, block, "HashPartitionWriterHelper"); + block.checkNumberOfRows(); +} + } // namespace DB::HashBaseWriterHelper diff --git a/dbms/src/Flash/Mpp/HashBaseWriterHelper.h b/dbms/src/Flash/Mpp/HashBaseWriterHelper.h index 5684e59177d..24da3286961 100644 --- a/dbms/src/Flash/Mpp/HashBaseWriterHelper.h +++ b/dbms/src/Flash/Mpp/HashBaseWriterHelper.h @@ -55,4 +55,12 @@ void scatterColumnsForFineGrainedShuffle(const Block & block, WeakHash32 & hash, IColumn::Selector & selector, std::vector & scattered); + +// Used to hold expected types for codec +struct HashPartitionWriterHelperV1 +{ + DataTypes expected_types; + explicit HashPartitionWriterHelperV1(const std::vector & field_types); + void checkBlock(const Block & block) const; +}; } // namespace DB::HashBaseWriterHelper diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp index 7ae30a1b4e7..18e87bdc1c2 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.cpp +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -21,59 +22,180 @@ namespace DB { +constexpr ssize_t MAX_BATCH_SEND_MIN_LIMIT_MEM_SIZE = 1024 * 1024 * 64; // 64MB: 8192 Rows * 256 Byte/row * 32 partitions +const char * HashPartitionWriterLabels[] = {"HashPartitionWriter", "HashPartitionWriter-V1"}; + template HashPartitionWriter::HashPartitionWriter( ExchangeWriterPtr writer_, std::vector partition_col_ids_, TiDB::TiDBCollators collators_, Int64 batch_send_min_limit_, - DAGContext & dag_context_) + DAGContext & dag_context_, + MPPDataPacketVersion data_codec_version_, + tipb::CompressionMode compression_mode_) : DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_) , batch_send_min_limit(batch_send_min_limit_) , writer(writer_) , partition_col_ids(std::move(partition_col_ids_)) , collators(std::move(collators_)) + , data_codec_version(data_codec_version_) + , compression_method(ToInternalCompressionMethod(compression_mode_)) { rows_in_blocks = 0; partition_num = writer_->getPartitionNum(); RUNTIME_CHECK(partition_num > 0); RUNTIME_CHECK(dag_context.encode_type == tipb::EncodeType::TypeCHBlock); + + switch (data_codec_version) + { + case MPPDataPacketV0: + break; + case MPPDataPacketV1: + default: + { + // make `batch_send_min_limit` always GT 0 + if (batch_send_min_limit <= 0) + { + // set upper limit if not specified + batch_send_min_limit = 8 * 1024 * partition_num /* 8K * partition-num */; + } + for (const auto & field_type : dag_context.result_field_types) + { + expected_types.emplace_back(getDataTypeByFieldTypeForComputingLayer(field_type)); + } + break; + } + } } template void HashPartitionWriter::flush() { - if (rows_in_blocks > 0) + if (0 == rows_in_blocks) + return; + + switch (data_codec_version) + { + case MPPDataPacketV0: + { partitionAndWriteBlocks(); + break; + } + case MPPDataPacketV1: + default: + { + partitionAndWriteBlocksV1(); + break; + } + } } template -void HashPartitionWriter::write(const Block & block) +void HashPartitionWriter::writeImplV1(const Block & block) { - RUNTIME_CHECK_MSG( - block.columns() == dag_context.result_field_types.size(), - "Output column size mismatch with field type size"); size_t rows = block.rows(); if (rows > 0) { rows_in_blocks += rows; + mem_size_in_blocks += block.bytes(); blocks.push_back(block); } + if (static_cast(rows_in_blocks) >= batch_send_min_limit + || mem_size_in_blocks >= MAX_BATCH_SEND_MIN_LIMIT_MEM_SIZE) + partitionAndWriteBlocksV1(); +} +template +void HashPartitionWriter::writeImpl(const Block & block) +{ + size_t rows = block.rows(); + if (rows > 0) + { + rows_in_blocks += rows; + blocks.push_back(block); + } if (static_cast(rows_in_blocks) > batch_send_min_limit) partitionAndWriteBlocks(); } +template +void HashPartitionWriter::write(const Block & block) +{ + RUNTIME_CHECK_MSG( + block.columns() == dag_context.result_field_types.size(), + "Output column size mismatch with field type size"); + + switch (data_codec_version) + { + case MPPDataPacketV0: + { + return writeImpl(block); + } + case MPPDataPacketV1: + default: + { + return writeImplV1(block); + } + } +} + +template +void HashPartitionWriter::partitionAndWriteBlocksV1() +{ + assert(rows_in_blocks > 0); + assert(mem_size_in_blocks > 0); + assert(!blocks.empty()); + + HashBaseWriterHelper::materializeBlocks(blocks); + // All blocks are same, use one block's meta info as header + Block dest_block_header = blocks.back().cloneEmpty(); + std::vector partition_key_containers(collators.size()); + std::vector> dest_columns(partition_num); + size_t total_rows = 0; + + while (!blocks.empty()) + { + const auto & block = blocks.back(); + { + // check schema + assertBlockSchema(expected_types, block, HashPartitionWriterLabels[MPPDataPacketV1]); + } + auto && dest_tbl_cols = HashBaseWriterHelper::createDestColumns(block, partition_num); + HashBaseWriterHelper::scatterColumns(block, partition_col_ids, collators, partition_key_containers, partition_num, dest_tbl_cols); + blocks.pop_back(); + + for (size_t part_id = 0; part_id < partition_num; ++part_id) + { + auto & columns = dest_tbl_cols[part_id]; + if unlikely (!columns.front()) + continue; + size_t expect_size = columns.front()->size(); + total_rows += expect_size; + dest_columns[part_id].emplace_back(std::move(columns)); + } + } + RUNTIME_CHECK(rows_in_blocks, total_rows); + + for (size_t part_id = 0; part_id < partition_num; ++part_id) + { + writer->partitionWrite(dest_block_header, std::move(dest_columns[part_id]), part_id, data_codec_version, compression_method); + } + + assert(blocks.empty()); + rows_in_blocks = 0; + mem_size_in_blocks = 0; +} + template void HashPartitionWriter::partitionAndWriteBlocks() { + if unlikely (blocks.empty()) + return; + std::vector partition_blocks; partition_blocks.resize(partition_num); - - if (!blocks.empty()) { - assert(rows_in_blocks > 0); - HashBaseWriterHelper::materializeBlocks(blocks); std::vector partition_key_containers(collators.size()); diff --git a/dbms/src/Flash/Mpp/HashPartitionWriter.h b/dbms/src/Flash/Mpp/HashPartitionWriter.h index f90dc4ddb7f..096e6df465c 100644 --- a/dbms/src/Flash/Mpp/HashPartitionWriter.h +++ b/dbms/src/Flash/Mpp/HashPartitionWriter.h @@ -22,6 +22,8 @@ namespace DB { class DAGContext; +enum class CompressionMethod; +enum MPPDataPacketVersion : int64_t; template class HashPartitionWriter : public DAGResponseWriter @@ -32,12 +34,17 @@ class HashPartitionWriter : public DAGResponseWriter std::vector partition_col_ids_, TiDB::TiDBCollators collators_, Int64 batch_send_min_limit_, - DAGContext & dag_context_); + DAGContext & dag_context_, + MPPDataPacketVersion data_codec_version_, + tipb::CompressionMode compression_mode_); void write(const Block & block) override; void flush() override; private: + void writeImpl(const Block & block); + void writeImplV1(const Block & block); void partitionAndWriteBlocks(); + void partitionAndWriteBlocksV1(); void writePartitionBlocks(std::vector & partition_blocks); @@ -49,6 +56,11 @@ class HashPartitionWriter : public DAGResponseWriter TiDB::TiDBCollators collators; size_t rows_in_blocks; uint16_t partition_num; + // support data compression + int64_t mem_size_in_blocks{}; + DataTypes expected_types; + MPPDataPacketVersion data_codec_version; + CompressionMethod compression_method{}; }; } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index 55d0f7ab9be..a308a9717a3 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -33,7 +34,7 @@ void checkPacketSize(size_t size) TrackedMppDataPacketPtr serializePacket(const tipb::SelectResponse & response) { - auto tracked_packet = std::make_shared(); + auto tracked_packet = std::make_shared(MPPDataPacketV0); tracked_packet->serializeByResponse(response); checkPacketSize(tracked_packet->getPacket().ByteSizeLong()); return tracked_packet; @@ -62,28 +63,156 @@ void MPPTunnelSetBase::write(tipb::SelectResponse & response) tunnels.back()->write(serializePacket(response)); } +static inline void updatePartitionWriterMetrics(size_t packet_bytes, bool is_local) +{ + // statistic + GET_METRIC(tiflash_exchange_data_bytes, type_hash_original).Increment(packet_bytes); + // compression method is always NONE + if (is_local) + GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_local).Increment(packet_bytes); + else + GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_remote).Increment(packet_bytes); +} + +static inline void updatePartitionWriterMetrics(CompressionMethod method, size_t original_size, size_t sz, bool is_local) +{ + // statistic + GET_METRIC(tiflash_exchange_data_bytes, type_hash_original).Increment(original_size); + + switch (method) + { + case CompressionMethod::NONE: + { + if (is_local) + { + GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_local).Increment(sz); + } + else + { + GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_remote).Increment(sz); + } + break; + } + case CompressionMethod::LZ4: + { + GET_METRIC(tiflash_exchange_data_bytes, type_hash_lz4_compression).Increment(sz); + break; + } + case CompressionMethod::ZSTD: + { + GET_METRIC(tiflash_exchange_data_bytes, type_hash_zstd_compression).Increment(sz); + break; + } + default: + break; + } +} + template void MPPTunnelSetBase::broadcastOrPassThroughWrite(Blocks & blocks) { RUNTIME_CHECK(!tunnels.empty()); - auto tracked_packet = MPPTunnelSetHelper::toPacket(blocks, result_field_types); - checkPacketSize(tracked_packet->getPacket().ByteSizeLong()); + auto && tracked_packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types); + if (!tracked_packet) + return; + auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); + checkPacketSize(packet_bytes); // TODO avoid copy packet for broadcast. for (size_t i = 1; i < tunnels.size(); ++i) tunnels[i]->write(tracked_packet->copy()); tunnels[0]->write(std::move(tracked_packet)); + { + // statistic + size_t data_bytes = 0; + size_t local_data_bytes = 0; + { + auto tunnel_cnt = getPartitionNum(); + size_t local_tunnel_cnt = 0; + for (size_t i = 0; i < tunnel_cnt; ++i) + { + local_tunnel_cnt += isLocal(i); + } + data_bytes = packet_bytes * tunnel_cnt; + local_data_bytes = packet_bytes * local_tunnel_cnt; + } + GET_METRIC(tiflash_exchange_data_bytes, type_broadcast_passthrough_original).Increment(data_bytes); + GET_METRIC(tiflash_exchange_data_bytes, type_broadcast_passthrough_none_compression_local).Increment(local_data_bytes); + GET_METRIC(tiflash_exchange_data_bytes, type_broadcast_passthrough_none_compression_remote).Increment(data_bytes - local_data_bytes); + } } template void MPPTunnelSetBase::partitionWrite(Blocks & blocks, int16_t partition_id) { - auto tracked_packet = MPPTunnelSetHelper::toPacket(blocks, result_field_types); - if (likely(tracked_packet->getPacket().chunks_size() > 0)) - { - checkPacketSize(tracked_packet->getPacket().ByteSizeLong()); - tunnels[partition_id]->write(std::move(tracked_packet)); - } + auto && tracked_packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types); + if (!tracked_packet) + return; + auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); + checkPacketSize(packet_bytes); + tunnels[partition_id]->write(std::move(tracked_packet)); + updatePartitionWriterMetrics(packet_bytes, isLocal(partition_id)); +} + +template +void MPPTunnelSetBase::partitionWrite( + const Block & header, + std::vector && part_columns, + int16_t partition_id, + MPPDataPacketVersion version, + CompressionMethod compression_method) +{ + assert(version > MPPDataPacketV0); + + bool is_local = isLocal(partition_id); + compression_method = is_local ? CompressionMethod::NONE : compression_method; + + size_t original_size = 0; + auto tracked_packet = MPPTunnelSetHelper::ToPacket(header, std::move(part_columns), version, compression_method, original_size); + if (!tracked_packet) + return; + + auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); + checkPacketSize(packet_bytes); + tunnels[partition_id]->write(std::move(tracked_packet)); + updatePartitionWriterMetrics(compression_method, original_size, packet_bytes, is_local); +} + +template +void MPPTunnelSetBase::fineGrainedShuffleWrite( + const Block & header, + std::vector & scattered, + size_t bucket_idx, + UInt64 fine_grained_shuffle_stream_count, + size_t num_columns, + int16_t partition_id, + MPPDataPacketVersion version, + CompressionMethod compression_method) +{ + if (version == MPPDataPacketV0) + return fineGrainedShuffleWrite(header, scattered, bucket_idx, fine_grained_shuffle_stream_count, num_columns, partition_id); + + bool is_local = isLocal(partition_id); + compression_method = is_local ? CompressionMethod::NONE : compression_method; + + size_t original_size = 0; + auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacket( + header, + scattered, + bucket_idx, + fine_grained_shuffle_stream_count, + num_columns, + version, + compression_method, + original_size); + + if unlikely (tracked_packet->getPacket().chunks_size() <= 0) + return; + + auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); + checkPacketSize(packet_bytes); + tunnels[partition_id]->write(std::move(tracked_packet)); + updatePartitionWriterMetrics(compression_method, original_size, packet_bytes, is_local); } template @@ -95,18 +224,21 @@ void MPPTunnelSetBase::fineGrainedShuffleWrite( size_t num_columns, int16_t partition_id) { - auto tracked_packet = MPPTunnelSetHelper::toFineGrainedPacket( + auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacketV0( header, scattered, bucket_idx, fine_grained_shuffle_stream_count, num_columns, result_field_types); - if (likely(tracked_packet->getPacket().chunks_size() > 0)) - { - checkPacketSize(tracked_packet->getPacket().ByteSizeLong()); - tunnels[partition_id]->write(std::move(tracked_packet)); - } + + if unlikely (tracked_packet->getPacket().chunks_size() <= 0) + return; + + auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); + checkPacketSize(packet_bytes); + tunnels[partition_id]->write(std::move(tracked_packet)); + updatePartitionWriterMetrics(packet_bytes, isLocal(partition_id)); } template @@ -150,6 +282,13 @@ typename MPPTunnelSetBase::TunnelPtr MPPTunnelSetBase::getTunnel return tunnels[it->second]; } +template +bool MPPTunnelSetBase::isLocal(size_t index) const +{ + assert(getPartitionNum() > index); + return getTunnels()[index]->isLocal(); +} + /// Explicit template instantiations - to avoid code bloat in headers. template class MPPTunnelSetBase; diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index ac0431eef0a..e0a8d4115d2 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -16,16 +16,7 @@ #include #include -#ifdef __clang__ -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wdeprecated-declarations" -#endif -#include -#ifdef __clang__ -#pragma clang diagnostic pop -#endif - -#include +#include namespace DB { @@ -41,10 +32,15 @@ class MPPTunnelSetBase : private boost::noncopyable // this is a root mpp writing. void write(tipb::SelectResponse & response); // this is a broadcast or pass through writing. + // data codec version V0 void broadcastOrPassThroughWrite(Blocks & blocks); // this is a partition writing. + // data codec version V0 void partitionWrite(Blocks & blocks, int16_t partition_id); + // data codec version > V0 + void partitionWrite(const Block & header, std::vector && part_columns, int16_t partition_id, MPPDataPacketVersion version, CompressionMethod compression_method); // this is a fine grained shuffle writing. + // data codec version V0 void fineGrainedShuffleWrite( const Block & header, std::vector & scattered, @@ -52,6 +48,15 @@ class MPPTunnelSetBase : private boost::noncopyable UInt64 fine_grained_shuffle_stream_count, size_t num_columns, int16_t partition_id); + void fineGrainedShuffleWrite( + const Block & header, + std::vector & scattered, + size_t bucket_idx, + UInt64 fine_grained_shuffle_stream_count, + size_t num_columns, + int16_t partition_id, + MPPDataPacketVersion version, + CompressionMethod compression_method); /// this is a execution summary writing. /// for both broadcast writing and partition/fine grained shuffle writing, only /// return meaningful execution summary for the first tunnel, @@ -78,6 +83,9 @@ class MPPTunnelSetBase : private boost::noncopyable const std::vector & getTunnels() const { return tunnels; } +private: + bool isLocal(size_t index) const; + private: std::vector tunnels; std::unordered_map receiver_task_id_to_index_map; diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp index d17f52b8284..eb65327a7f8 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp @@ -13,15 +13,43 @@ // limitations under the License. #include +#include #include namespace DB::MPPTunnelSetHelper { -TrackedMppDataPacketPtr toPacket(Blocks & blocks, const std::vector & field_types) + +TrackedMppDataPacketPtr ToPacket( + const Block & header, + std::vector && part_columns, + MPPDataPacketVersion version, + CompressionMethod method, + size_t & original_size) +{ + assert(version > MPPDataPacketV0); + + auto && codec = CHBlockChunkCodecV1{ + header, + }; + + auto && res = codec.encode(std::move(part_columns), method); + if unlikely (res.empty()) + return nullptr; + + auto tracked_packet = std::make_shared(version); + tracked_packet->addChunk(std::move(res)); + original_size += codec.original_size; + return tracked_packet; +} + +TrackedMppDataPacketPtr ToPacketV0(Blocks & blocks, const std::vector & field_types) { + if (blocks.empty()) + return nullptr; + CHBlockChunkCodec codec; auto codec_stream = codec.newCodecStream(field_types); - auto tracked_packet = std::make_shared(); + auto tracked_packet = std::make_shared(MPPDataPacketV0); while (!blocks.empty()) { const auto & block = blocks.back(); @@ -33,7 +61,50 @@ TrackedMppDataPacketPtr toPacket(Blocks & blocks, const std::vector & scattered, + size_t bucket_idx, + UInt64 fine_grained_shuffle_stream_count, + size_t num_columns, + MPPDataPacketVersion version, + CompressionMethod method, + size_t & original_size) +{ + assert(version > MPPDataPacketV0); + + auto && codec = CHBlockChunkCodecV1{ + header, + }; + auto tracked_packet = std::make_shared(version); + + for (uint64_t stream_idx = 0; stream_idx < fine_grained_shuffle_stream_count; ++stream_idx) + { + // assemble scatter columns into a block + MutableColumns columns; + columns.reserve(num_columns); + for (size_t col_id = 0; col_id < num_columns; ++col_id) + columns.emplace_back(std::move(scattered[col_id][bucket_idx + stream_idx])); + + auto && res = codec.encode(columns, method); + if (!res.empty()) + { + tracked_packet->addChunk(std::move(res)); + tracked_packet->getPacket().add_stream_ids(stream_idx); + } + + for (size_t col_id = 0; col_id < num_columns; ++col_id) + { + columns[col_id]->popBack(columns[col_id]->size()); // clear column + scattered[col_id][bucket_idx + stream_idx] = std::move(columns[col_id]); + } + } + + original_size += codec.original_size; + return tracked_packet; +} + +TrackedMppDataPacketPtr ToFineGrainedPacketV0( const Block & header, std::vector & scattered, size_t bucket_idx, @@ -43,7 +114,7 @@ TrackedMppDataPacketPtr toFineGrainedPacket( { CHBlockChunkCodec codec; auto codec_stream = codec.newCodecStream(field_types); - auto tracked_packet = std::make_shared(); + auto tracked_packet = std::make_shared(MPPDataPacketV0); for (uint64_t stream_idx = 0; stream_idx < fine_grained_shuffle_stream_count; ++stream_idx) { // assemble scatter columns into a block @@ -52,12 +123,14 @@ TrackedMppDataPacketPtr toFineGrainedPacket( for (size_t col_id = 0; col_id < num_columns; ++col_id) columns.emplace_back(std::move(scattered[col_id][bucket_idx + stream_idx])); auto block = header.cloneWithColumns(std::move(columns)); - - // encode into packet - codec_stream->encode(block, 0, block.rows()); - tracked_packet->addChunk(codec_stream->getString()); - tracked_packet->getPacket().add_stream_ids(stream_idx); - codec_stream->clear(); + if (block.rows()) + { + // encode into packet + codec_stream->encode(block, 0, block.rows()); + tracked_packet->addChunk(codec_stream->getString()); + tracked_packet->getPacket().add_stream_ids(stream_idx); + codec_stream->clear(); + } // disassemble the block back to scatter columns columns = block.mutateColumns(); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.h b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.h index 38bddcef962..050547dd330 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.h @@ -15,18 +15,41 @@ #pragma once #include +#include #include -#include + +namespace DB +{ +enum class CompressionMethod; +} namespace DB::MPPTunnelSetHelper { -TrackedMppDataPacketPtr toPacket(Blocks & blocks, const std::vector & field_types); +TrackedMppDataPacketPtr ToPacketV0(Blocks & blocks, const std::vector & field_types); + +TrackedMppDataPacketPtr ToPacket( + const Block & header, + std::vector && part_columns, + MPPDataPacketVersion version, + CompressionMethod compression_method, + size_t & original_size); -TrackedMppDataPacketPtr toFineGrainedPacket( +TrackedMppDataPacketPtr ToFineGrainedPacketV0( const Block & header, std::vector & scattered, size_t bucket_idx, UInt64 fine_grained_shuffle_stream_count, size_t num_columns, const std::vector & field_types); + +TrackedMppDataPacketPtr ToFineGrainedPacket( + const Block & header, + std::vector & scattered, + size_t bucket_idx, + UInt64 fine_grained_shuffle_stream_count, + size_t num_columns, + MPPDataPacketVersion version, + CompressionMethod compression_method, + size_t & original_size); + } // namespace DB::MPPTunnelSetHelper diff --git a/dbms/src/Flash/Mpp/MppVersion.h b/dbms/src/Flash/Mpp/MppVersion.h new file mode 100644 index 00000000000..b3ca0546fb5 --- /dev/null +++ b/dbms/src/Flash/Mpp/MppVersion.h @@ -0,0 +1,41 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +enum MppVersion : int64_t +{ + MppVersionV0 = 0, + MppVersionV1, + // + MppVersionMAX, +}; + +enum MPPDataPacketVersion : int64_t +{ + MPPDataPacketV0 = 0, + MPPDataPacketV1, + // + MPPDataPacketMAX, +}; + +bool CheckMppVersion(int64_t mpp_version); +std::string GenMppVersionErrorMessage(int64_t mpp_version); +int64_t GetMppVersion(); + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/TrackedMppDataPacket.h b/dbms/src/Flash/Mpp/TrackedMppDataPacket.h index ad7e2a3cdd2..f9530cb689b 100644 --- a/dbms/src/Flash/Mpp/TrackedMppDataPacket.h +++ b/dbms/src/Flash/Mpp/TrackedMppDataPacket.h @@ -116,13 +116,17 @@ struct TrackedMppDataPacket packet = data; } - explicit TrackedMppDataPacket() + explicit TrackedMppDataPacket(int64_t version) : mem_tracker_wrapper(current_memory_tracker) - {} + { + packet.set_version(version); + } - explicit TrackedMppDataPacket(MemoryTracker * memory_tracker) + explicit TrackedMppDataPacket(MemoryTracker * memory_tracker, int64_t version) : mem_tracker_wrapper(memory_tracker) - {} + { + packet.set_version(version); + } TrackedMppDataPacket(const mpp::MPPDataPacket & data, size_t size, MemoryTracker * memory_tracker) : mem_tracker_wrapper(size, memory_tracker) diff --git a/dbms/src/Flash/Mpp/Utils.cpp b/dbms/src/Flash/Mpp/Utils.cpp index 21d89b3cd52..0ddfb7ed958 100644 --- a/dbms/src/Flash/Mpp/Utils.cpp +++ b/dbms/src/Flash/Mpp/Utils.cpp @@ -12,17 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include +#include +#include +#include #include namespace DB { + +namespace FailPoints +{ +extern const char invalid_mpp_version[]; +} // namespace FailPoints + mpp::MPPDataPacket getPacketWithError(String reason) { mpp::MPPDataPacket data; auto err = std::make_unique(); + err->set_mpp_version(DB::GetMppVersion()); err->set_msg(std::move(reason)); data.set_allocated_error(err.release()); return data; @@ -38,4 +50,36 @@ void trimStackTrace(String & message) } } -} // namespace DB +// Latest mpp-version supported by TiFlash +static MppVersion NewestMppVersion = MppVersion(MppVersion::MppVersionMAX - 1); +static MppVersion MinMppVersion = MppVersion::MppVersionV0; + +// Check mpp-version is illegal +bool CheckMppVersion(int64_t mpp_version) +{ + fiu_do_on(FailPoints::invalid_mpp_version, { + mpp_version = -1; + }); + return mpp_version >= MinMppVersion && mpp_version <= NewestMppVersion; +} + +std::string GenMppVersionErrorMessage(int64_t mpp_version) +{ + fiu_do_on(FailPoints::invalid_mpp_version, { + mpp_version = -1; + }); + auto err_msg = fmt::format("Invalid mpp version {}, TiFlash expects version: min {}, max {}, should upgrade {}", + mpp_version, + MinMppVersion, + NewestMppVersion, + (mpp_version < MinMppVersion) ? "TiDB/planner" : "TiFlash"); + return err_msg; +} + +// Get latest mpp-version supported by TiFlash +int64_t GetMppVersion() +{ + return (NewestMppVersion); +} + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h index 1ce68030354..9cb23d11734 100644 --- a/dbms/src/Flash/Mpp/newMPPExchangeWriter.h +++ b/dbms/src/Flash/Mpp/newMPPExchangeWriter.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB { @@ -32,11 +33,16 @@ std::unique_ptr newMPPExchangeWriter( DAGContext & dag_context, bool enable_fine_grained_shuffle, UInt64 fine_grained_shuffle_stream_count, - UInt64 fine_grained_shuffle_batch_size) + UInt64 fine_grained_shuffle_batch_size, + tipb::CompressionMode compression_mode, + Int64 batch_send_min_limit_compression) { RUNTIME_CHECK(dag_context.isMPPTask()); if (dag_context.isRootMPPTask()) { + // No need to use use data compression + RUNTIME_CHECK(compression_mode == tipb::CompressionMode::NONE); + RUNTIME_CHECK(!enable_fine_grained_shuffle); RUNTIME_CHECK(exchange_type == tipb::ExchangeType::PassThrough); return std::make_unique>( @@ -49,6 +55,11 @@ std::unique_ptr newMPPExchangeWriter( { if (exchange_type == tipb::ExchangeType::Hash) { + auto mpp_version = dag_context.getMPPTaskMeta().mpp_version(); + auto data_codec_version = mpp_version == MppVersionV0 + ? MPPDataPacketV0 + : MPPDataPacketV1; + if (enable_fine_grained_shuffle) { return std::make_unique>( @@ -57,20 +68,31 @@ std::unique_ptr newMPPExchangeWriter( partition_col_collators, dag_context, fine_grained_shuffle_stream_count, - fine_grained_shuffle_batch_size); + fine_grained_shuffle_batch_size, + data_codec_version, + compression_mode); } else { + auto chosen_batch_send_min_limit = mpp_version == MppVersionV0 + ? batch_send_min_limit + : batch_send_min_limit_compression; + return std::make_unique>( writer, partition_col_ids, partition_col_collators, - batch_send_min_limit, - dag_context); + chosen_batch_send_min_limit, + dag_context, + data_codec_version, + compression_mode); } } else { + // TODO: support data compression if necessary + RUNTIME_CHECK(compression_mode == tipb::CompressionMode::NONE); + RUNTIME_CHECK(!enable_fine_grained_shuffle); return std::make_unique>( writer, @@ -79,4 +101,5 @@ std::unique_ptr newMPPExchangeWriter( } } } + } // namespace DB diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index 08c53b51aa5..0ce4fda70dd 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,8 @@ namespace DB { namespace tests { +static CompressionMethodByte GetCompressionMethodByte(CompressionMethod m); + class TestMPPExchangeWriter : public testing::Test { protected: @@ -125,14 +128,51 @@ struct MockExchangeWriter , part_num(part_num_) , result_field_types(dag_context.result_field_types) {} - + void partitionWrite( + const Block & header, + std::vector && part_columns, + int16_t part_id, + MPPDataPacketVersion version, + CompressionMethod method) + { + assert(version > MPPDataPacketV0); + method = isLocal(part_id) ? CompressionMethod::NONE : method; + size_t original_size = 0; + auto tracked_packet = MPPTunnelSetHelper::ToPacket(header, std::move(part_columns), version, method, original_size); + checker(tracked_packet, part_id); + } + void fineGrainedShuffleWrite( + const Block & header, + std::vector & scattered, + size_t bucket_idx, + UInt64 fine_grained_shuffle_stream_count, + size_t num_columns, + int16_t part_id, + MPPDataPacketVersion version, + CompressionMethod method) + { + if (version == MPPDataPacketV0) + return fineGrainedShuffleWrite(header, scattered, bucket_idx, fine_grained_shuffle_stream_count, num_columns, part_id); + method = isLocal(part_id) ? CompressionMethod::NONE : method; + size_t original_size = 0; + auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacket( + header, + scattered, + bucket_idx, + fine_grained_shuffle_stream_count, + num_columns, + version, + method, + original_size); + checker(tracked_packet, part_id); + } void broadcastOrPassThroughWrite(Blocks & blocks) { - checker(MPPTunnelSetHelper::toPacket(blocks, result_field_types), 0); + checker(MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types), 0); } void partitionWrite(Blocks & blocks, uint16_t part_id) { - checker(MPPTunnelSetHelper::toPacket(blocks, result_field_types), part_id); + checker(MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types), part_id); } void fineGrainedShuffleWrite( const Block & header, @@ -142,7 +182,7 @@ struct MockExchangeWriter size_t num_columns, int16_t part_id) { - auto tracked_packet = MPPTunnelSetHelper::toFineGrainedPacket( + auto tracked_packet = MPPTunnelSetHelper::ToFineGrainedPacketV0( header, scattered, bucket_idx, @@ -152,14 +192,20 @@ struct MockExchangeWriter checker(tracked_packet, part_id); } - void write(tipb::SelectResponse &) { FAIL() << "cannot reach here, only consider CH Block format"; } + static void write(tipb::SelectResponse &) { FAIL() << "cannot reach here, only consider CH Block format"; } void sendExecutionSummary(const tipb::SelectResponse & response) { - auto tracked_packet = std::make_shared(); + auto tracked_packet = std::make_shared(MPPDataPacketV0); tracked_packet->serializeByResponse(response); checker(tracked_packet, 0); } uint16_t getPartitionNum() const { return part_num; } + bool isLocal(size_t index) const + { + assert(getPartitionNum() > index); + // make only part 0 use local tunnel + return index == 0; + } private: MockExchangeWriterChecker checker; @@ -199,7 +245,9 @@ try part_col_collators, *dag_context_ptr, fine_grained_shuffle_stream_count, - fine_grained_shuffle_batch_size); + fine_grained_shuffle_batch_size, + DB::MPPDataPacketV0, + tipb::CompressionMode::NONE); dag_writer->prepare(block.cloneEmpty()); dag_writer->write(block); dag_writer->flush(); @@ -226,6 +274,92 @@ try } CATCH +TEST_F(TestMPPExchangeWriter, TestFineGrainedShuffleWriterV1) +try +{ + const size_t block_rows = 64; + const size_t block_num = 64; + const uint16_t part_num = 4; + const uint32_t fine_grained_shuffle_stream_count = 8; + const Int64 fine_grained_shuffle_batch_size = 108; + + // 1. Build Block. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + blocks.emplace_back(prepareUniformBlock(block_rows)); + blocks.emplace_back(prepareUniformBlock(0)); + } + const auto & header = blocks.back().cloneEmpty(); + + for (auto mode : {tipb::CompressionMode::NONE, tipb::CompressionMode::FAST, tipb::CompressionMode::HIGH_COMPRESSION}) + { + // 2. Build MockExchangeWriter. + std::unordered_map write_report; + auto checker = [&write_report](const TrackedMppDataPacketPtr & packet, uint16_t part_id) { + write_report[part_id].emplace_back(packet); + }; + auto mock_writer = std::make_shared(checker, part_num, *dag_context_ptr); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + *dag_context_ptr, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size, + DB::MPPDataPacketV1, + mode); + dag_writer->prepare(blocks[0].cloneEmpty()); + for (const auto & block : blocks) + dag_writer->write(block); + dag_writer->flush(); + + // 4. Start to check write_report. + size_t per_part_rows = block_rows * block_num / part_num; + ASSERT_EQ(write_report.size(), part_num); + std::vector rows_of_stream_ids(fine_grained_shuffle_stream_count, 0); + + CHBlockChunkDecodeAndSquash decoder(header, 512); + + for (size_t part_index = 0; part_index < part_num; ++part_index) + { + size_t part_decoded_block_rows = 0; + + for (const auto & packet : write_report[part_index]) + { + ASSERT_EQ(packet->getPacket().chunks_size(), packet->getPacket().stream_ids_size()); + ASSERT_EQ(DB::MPPDataPacketV1, packet->getPacket().version()); + + for (int i = 0; i < packet->getPacket().chunks_size(); ++i) + { + const auto & chunk = packet->getPacket().chunks(i); + + auto tar_method_byte = mock_writer->isLocal(part_index) ? CompressionMethodByte::NONE : GetCompressionMethodByte(ToInternalCompressionMethod(mode)); + + ASSERT_EQ(CompressionMethodByte(chunk[0]), tar_method_byte); + auto && result = decoder.decodeAndSquashV1(chunk); + if (!result) + { + result = decoder.flush(); + } + assert(result); + auto decoded_block = std::move(*result); + part_decoded_block_rows += decoded_block.rows(); + rows_of_stream_ids[packet->getPacket().stream_ids(i)] += decoded_block.rows(); + } + } + ASSERT_EQ(part_decoded_block_rows, per_part_rows); + } + + size_t per_stream_id_rows = block_rows * block_num / fine_grained_shuffle_stream_count; + for (size_t rows : rows_of_stream_ids) + ASSERT_EQ(rows, per_stream_id_rows); + } +} +CATCH + TEST_F(TestMPPExchangeWriter, testFineGrainedShuffleWriter) try { @@ -258,7 +392,9 @@ try part_col_collators, *dag_context_ptr, fine_grained_shuffle_stream_count, - fine_grained_shuffle_batch_size); + fine_grained_shuffle_batch_size, + DB::MPPDataPacketV0, + tipb::CompressionMode::NONE); dag_writer->prepare(blocks[0].cloneEmpty()); for (const auto & block : blocks) dag_writer->write(block); @@ -319,7 +455,9 @@ try part_col_ids, part_col_collators, batch_send_min_limit, - *dag_context_ptr); + *dag_context_ptr, + DB::MPPDataPacketV0, + tipb::CompressionMode::NONE); for (const auto & block : blocks) dag_writer->write(block); dag_writer->flush(); @@ -391,5 +529,96 @@ try } CATCH +static CompressionMethodByte GetCompressionMethodByte(CompressionMethod m) +{ + switch (m) + { + case CompressionMethod::LZ4: + return CompressionMethodByte::LZ4; + case CompressionMethod::NONE: + return CompressionMethodByte::NONE; + case CompressionMethod::ZSTD: + return CompressionMethodByte::ZSTD; + default: + RUNTIME_CHECK(false); + } + return CompressionMethodByte::NONE; +} + +TEST_F(TestMPPExchangeWriter, TestHashPartitionWriterV1) +try +{ + const size_t block_rows = 64; + const size_t block_num = 64; + const size_t batch_send_min_limit = 1024 * 1024 * 1024; + const uint16_t part_num = 4; + + // 1. Build Blocks. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + blocks.emplace_back(prepareUniformBlock(block_rows)); + blocks.emplace_back(prepareUniformBlock(0)); + } + const auto & header = blocks.back().cloneEmpty(); + + for (auto mode : {tipb::CompressionMode::NONE, tipb::CompressionMode::FAST, tipb::CompressionMode::HIGH_COMPRESSION}) + { + // 2. Build MockExchangeWriter. + std::unordered_map write_report; + auto checker = [&write_report](const TrackedMppDataPacketPtr & packet, uint16_t part_id) { + write_report[part_id].emplace_back(packet); + }; + auto mock_writer = std::make_shared(checker, part_num, *dag_context_ptr); + + // 3. Start to write. + auto dag_writer = std::make_shared>>( + mock_writer, + part_col_ids, + part_col_collators, + batch_send_min_limit, + *dag_context_ptr, + DB::MPPDataPacketV1, + mode); + for (const auto & block : blocks) + dag_writer->write(block); + dag_writer->write(header); // write empty + dag_writer->flush(); + + // 4. Start to check write_report. + size_t per_part_rows = block_rows * block_num / part_num; + ASSERT_EQ(write_report.size(), part_num); + + CHBlockChunkDecodeAndSquash decoder(header, 512); + + for (size_t part_index = 0; part_index < part_num; ++part_index) + { + size_t decoded_block_rows = 0; + for (const auto & tracked_packet : write_report[part_index]) + { + auto & packet = tracked_packet->getPacket(); + + ASSERT_EQ(packet.version(), DB::MPPDataPacketV1); + + for (auto && chunk : packet.chunks()) + { + auto tar_method_byte = mock_writer->isLocal(part_index) ? CompressionMethodByte::NONE : GetCompressionMethodByte(ToInternalCompressionMethod(mode)); + ASSERT_EQ(CompressionMethodByte(chunk[0]), tar_method_byte); + auto && result = decoder.decodeAndSquashV1(chunk); + if (!result) + continue; + decoded_block_rows += result->rows(); + } + } + { + auto result = decoder.flush(); + if (result) + decoded_block_rows += result->rows(); + } + ASSERT_EQ(decoded_block_rows, per_part_rows); + } + } +} +CATCH } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 7d740a7eb44..e07ab938c07 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -39,7 +39,7 @@ namespace { TrackedMppDataPacketPtr newDataPacket(const String & data) { - auto data_packet_ptr = std::make_shared(); + auto data_packet_ptr = std::make_shared(MPPDataPacketV0); data_packet_ptr->getPacket().set_data(data); return data_packet_ptr; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp index 5efae216198..a887d777188 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.cpp @@ -22,6 +22,7 @@ #include #include + namespace DB { PhysicalPlanNodePtr PhysicalExchangeSender::build( @@ -44,7 +45,8 @@ PhysicalPlanNodePtr PhysicalExchangeSender::build( partition_col_ids, partition_col_collators, exchange_sender.tp(), - fine_grained_shuffle); + fine_grained_shuffle, + exchange_sender.compression()); // executeUnion will be call after sender.transform, so don't need to restore concurrency. physical_exchange_sender->disableRestoreConcurrency(); return physical_exchange_sender; @@ -78,7 +80,9 @@ void PhysicalExchangeSender::buildBlockInputStreamImpl(DAGPipeline & pipeline, C dag_context, fine_grained_shuffle.enable(), fine_grained_shuffle.stream_count, - fine_grained_shuffle.batch_size); + fine_grained_shuffle.batch_size, + compression_mode, + context.getSettingsRef().batch_send_min_limit_compression); stream = std::make_shared(stream, std::move(response_writer), log->identifier()); stream->setExtraInfo(extra_info); }); @@ -93,4 +97,5 @@ const Block & PhysicalExchangeSender::getSampleBlock() const { return child->getSampleBlock(); } + } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h index 3829882b13b..88fc4b7773b 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalExchangeSender.h @@ -39,11 +39,13 @@ class PhysicalExchangeSender : public PhysicalUnary const std::vector & partition_col_ids_, const TiDB::TiDBCollators & collators_, const tipb::ExchangeType & exchange_type_, - const FineGrainedShuffle & fine_grained_shuffle_) + const FineGrainedShuffle & fine_grained_shuffle_, + const tipb::CompressionMode & compression_mode_) : PhysicalUnary(executor_id_, PlanType::ExchangeSender, schema_, req_id, child_) , partition_col_ids(partition_col_ids_) , partition_col_collators(collators_) , exchange_type(exchange_type_) + , compression_mode(compression_mode_) , fine_grained_shuffle(fine_grained_shuffle_) {} @@ -57,6 +59,7 @@ class PhysicalExchangeSender : public PhysicalUnary std::vector partition_col_ids; TiDB::TiDBCollators partition_col_collators; tipb::ExchangeType exchange_type; + tipb::CompressionMode compression_mode; FineGrainedShuffle fine_grained_shuffle; }; diff --git a/dbms/src/Functions/CMakeLists.txt b/dbms/src/Functions/CMakeLists.txt index defd75d64e5..562af2bcf40 100644 --- a/dbms/src/Functions/CMakeLists.txt +++ b/dbms/src/Functions/CMakeLists.txt @@ -19,8 +19,11 @@ add_headers_and_sources(clickhouse_functions ./GatherUtils) add_headers_and_sources(clickhouse_functions ./Conditional) add_headers_and_sources(clickhouse_functions ${TiFlash_BINARY_DIR}/dbms/src/Functions) -check_then_add_sources_compile_flag (TIFLASH_ENABLE_AVX_SUPPORT "-mavx2" CollationStringOptimized.cpp) -check_then_add_sources_compile_flag (TIFLASH_COMPILER_MOVBE_SUPPORT "-mmovbe" CollationStringOptimized.cpp) +check_then_add_sources_compile_flag ( + TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT + "${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}" + CollationStringOptimized.cpp +) list(REMOVE_ITEM clickhouse_functions_sources IFunction.cpp FunctionFactory.cpp FunctionHelpers.cpp) list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h FunctionHelpers.h) diff --git a/dbms/src/Interpreters/AsynchronousMetrics.cpp b/dbms/src/Interpreters/AsynchronousMetrics.cpp index 7a2d37829fb..9c809ef2c8f 100644 --- a/dbms/src/Interpreters/AsynchronousMetrics.cpp +++ b/dbms/src/Interpreters/AsynchronousMetrics.cpp @@ -82,8 +82,8 @@ void AsynchronousMetrics::run() /// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter. const auto get_next_minute = [] { - return std::chrono::time_point_cast( - std::chrono::system_clock::now() + std::chrono::minutes(1)) + return std::chrono::time_point_cast( + std::chrono::steady_clock::now() + std::chrono::minutes(1)) + std::chrono::seconds(30); }; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index dc79161f356..be64319f516 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -50,6 +50,7 @@ struct Settings M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \ M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \ M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minimal chunk size of exchanging data among TiFlash.") \ + M(SettingInt64, batch_send_min_limit_compression, -1, "default minimal chunk size of exchanging data among TiFlash when using data compression.") \ M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ M(SettingUInt64, mpp_task_timeout, DEFAULT_MPP_TASK_TIMEOUT, "mpp task max endurable time.") \ M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \ diff --git a/dbms/src/Server/MetricsTransmitter.cpp b/dbms/src/Server/MetricsTransmitter.cpp index e596ae0a95c..0ccd2d9346b 100644 --- a/dbms/src/Server/MetricsTransmitter.cpp +++ b/dbms/src/Server/MetricsTransmitter.cpp @@ -57,8 +57,8 @@ void MetricsTransmitter::run() /// To avoid time drift and transmit values exactly each interval: /// next time aligned to system seconds /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00 - return std::chrono::system_clock::time_point( - (std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds + return std::chrono::steady_clock::time_point( + (std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()) / seconds) * seconds + std::chrono::seconds(seconds)); }; diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index 7bd21b67be2..cba3b64248b 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -30,9 +30,8 @@ #include #include #include -#include +#include -#include namespace DB { @@ -443,8 +442,8 @@ LearnerReadSnapshot doLearnerRead( auto end_time = Clock::now(); LOG_DEBUG( log, - "[Learner Read] batch read index | wait index cost {} ms totally, regions_num={}, concurrency={}", - std::chrono::duration_cast(end_time - start_time).count(), + "[Learner Read] batch read index | wait index cost {} totally, regions_num={}, concurrency={}", + std::chrono::duration_cast(end_time - start_time), num_regions, concurrent_num); diff --git a/dbms/src/Storages/Transaction/PDTiKVClient.cpp b/dbms/src/Storages/Transaction/PDTiKVClient.cpp index a06f1a3ae64..ed46aeb18c2 100644 --- a/dbms/src/Storages/Transaction/PDTiKVClient.cpp +++ b/dbms/src/Storages/Transaction/PDTiKVClient.cpp @@ -23,6 +23,6 @@ extern const int LOGICAL_ERROR; } std::atomic PDClientHelper::cached_gc_safe_point = 0; -std::atomic> PDClientHelper::safe_point_last_update_time; +std::atomic> PDClientHelper::safe_point_last_update_time; } // namespace DB diff --git a/dbms/src/Storages/Transaction/PDTiKVClient.h b/dbms/src/Storages/Transaction/PDTiKVClient.h index 111ae9862ed..f1be9a0a5c0 100644 --- a/dbms/src/Storages/Transaction/PDTiKVClient.h +++ b/dbms/src/Storages/Transaction/PDTiKVClient.h @@ -45,7 +45,7 @@ struct PDClientHelper if (!ignore_cache) { // In case we cost too much to update safe point from PD. - std::chrono::time_point now = std::chrono::system_clock::now(); + auto now = std::chrono::steady_clock::now(); const auto duration = std::chrono::duration_cast(now - safe_point_last_update_time.load()); const auto min_interval = std::max(Int64(1), safe_point_update_interval_seconds); // at least one second if (duration.count() < min_interval) @@ -59,7 +59,7 @@ struct PDClientHelper { auto safe_point = pd_client->getGCSafePoint(); cached_gc_safe_point = safe_point; - safe_point_last_update_time = std::chrono::system_clock::now(); + safe_point_last_update_time = std::chrono::steady_clock::now(); return safe_point; } catch (pingcap::Exception & e) @@ -71,7 +71,7 @@ struct PDClientHelper private: static std::atomic cached_gc_safe_point; - static std::atomic> safe_point_last_update_time; + static std::atomic> safe_point_last_update_time; }; diff --git a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp index c8f3f807f7a..b3e342daf29 100644 --- a/dbms/src/Storages/Transaction/ReadIndexWorker.cpp +++ b/dbms/src/Storages/Transaction/ReadIndexWorker.cpp @@ -156,7 +156,7 @@ struct BlockedReadIndexHelperTrait uint64_t timeout_ms; }; -struct BlockedReadIndexHelper : BlockedReadIndexHelperTrait +struct BlockedReadIndexHelper final : BlockedReadIndexHelperTrait { public: BlockedReadIndexHelper(uint64_t timeout_ms_, AsyncWaker & waker_) @@ -181,7 +181,7 @@ struct BlockedReadIndexHelper : BlockedReadIndexHelperTrait AsyncWaker & waker; }; -struct BlockedReadIndexHelperV3 : BlockedReadIndexHelperTrait +struct BlockedReadIndexHelperV3 final : BlockedReadIndexHelperTrait { BlockedReadIndexHelperV3(uint64_t timeout_ms_, AsyncWaker::Notifier & notifier_) : BlockedReadIndexHelperTrait(timeout_ms_) @@ -335,7 +335,7 @@ struct ReadIndexNotifyCtrl : MutexLockWrap AsyncWaker::NotifierPtr notifier; }; -struct RegionReadIndexNotifier : AsyncNotifier +struct RegionReadIndexNotifier final : AsyncNotifier { void wake() override { @@ -446,7 +446,7 @@ void ReadIndexDataNode::ReadIndexElement::doPoll(const TiFlashRaftProxyHelper & clean_task = true; } - else if (Clock::now() > timeout + start_time) + else if (std::chrono::steady_clock::now() > timeout + start_time) { TEST_LOG_FMT("poll ReadIndexElement timeout for region {}", region_id); @@ -458,7 +458,7 @@ void ReadIndexDataNode::ReadIndexElement::doPoll(const TiFlashRaftProxyHelper & TEST_LOG_FMT( "poll ReadIndexElement failed for region {}, time cost {}, timeout {}, start time {}", region_id, - Clock::now() - start_time, + std::chrono::steady_clock::now() - start_time, timeout, start_time); } @@ -706,7 +706,7 @@ void ReadIndexWorker::consumeReadIndexNotifyCtrl() } } -void ReadIndexWorker::consumeRegionNotifies(Duration min_dur) +void ReadIndexWorker::consumeRegionNotifies(std::chrono::steady_clock::duration min_dur) { if (!lastRunTimeout(min_dur)) { @@ -721,7 +721,7 @@ void ReadIndexWorker::consumeRegionNotifies(Duration min_dur) } TEST_LOG_FMT("worker {} set last run time {}", getID(), Clock::now()); - last_run_time.store(Clock::now(), std::memory_order_release); + last_run_time.store(std::chrono::steady_clock::now(), std::memory_order_release); } ReadIndexFuturePtr ReadIndexWorker::genReadIndexFuture(const kvrpcpb::ReadIndexRequest & req) @@ -737,7 +737,7 @@ ReadIndexFuturePtr ReadIndexWorkerManager::genReadIndexFuture(const kvrpcpb::Rea return getWorkerByRegion(req.context().region_id()).genReadIndexFuture(req); } -void ReadIndexWorker::runOneRound(Duration min_dur) +void ReadIndexWorker::runOneRound(std::chrono::steady_clock::duration min_dur) { if (!read_index_notify_ctrl->empty()) { @@ -759,10 +759,10 @@ ReadIndexWorker::ReadIndexWorker( { } -bool ReadIndexWorker::lastRunTimeout(Duration timeout) const +bool ReadIndexWorker::lastRunTimeout(std::chrono::steady_clock::duration timeout) const { TEST_LOG_FMT("worker {}, last run time {}, timeout {}", getID(), last_run_time.load(std::memory_order_relaxed), timeout); - return last_run_time.load(std::memory_order_relaxed) + timeout < Clock::now(); + return last_run_time.load(std::memory_order_relaxed) + timeout < std::chrono::steady_clock::now(); } ReadIndexWorker & ReadIndexWorkerManager::getWorkerByRegion(RegionID region_id) @@ -828,13 +828,13 @@ ReadIndexWorkerManager::~ReadIndexWorkerManager() stop(); } -void ReadIndexWorkerManager::runOneRoundAll(Duration min_dur) +void ReadIndexWorkerManager::runOneRoundAll(std::chrono::steady_clock::duration min_dur) { for (size_t id = 0; id < runners.size(); ++id) runOneRound(min_dur, id); } -void ReadIndexWorkerManager::runOneRound(Duration min_dur, size_t id) +void ReadIndexWorkerManager::runOneRound(std::chrono::steady_clock::duration min_dur, size_t id) { runners[id]->runOneRound(min_dur); } @@ -1003,7 +1003,7 @@ void ReadIndexWorkerManager::ReadIndexRunner::blockedWaitFor(std::chrono::millis global_notifier->blockedWaitFor(timeout); } -void ReadIndexWorkerManager::ReadIndexRunner::runOneRound(Duration min_dur) +void ReadIndexWorkerManager::ReadIndexRunner::runOneRound(std::chrono::steady_clock::duration min_dur) { for (size_t i = id; i < workers.size(); i += runner_cnt) workers[i]->runOneRound(min_dur); diff --git a/dbms/src/Storages/Transaction/ReadIndexWorker.h b/dbms/src/Storages/Transaction/ReadIndexWorker.h index e26671a2303..ebc36874226 100644 --- a/dbms/src/Storages/Transaction/ReadIndexWorker.h +++ b/dbms/src/Storages/Transaction/ReadIndexWorker.h @@ -35,7 +35,7 @@ class ReadIndexTest; struct AsyncWaker { - struct Notifier : AsyncNotifier + struct Notifier final : AsyncNotifier , MutexLockWrap { mutable std::condition_variable cv; @@ -46,7 +46,7 @@ struct AsyncWaker AsyncNotifier::Status blockedWaitFor(std::chrono::milliseconds timeout) override; void wake() override; - virtual ~Notifier() = default; + ~Notifier() override = default; }; using NotifierPtr = std::shared_ptr; @@ -88,7 +88,7 @@ class ReadIndexWorkerManager : boost::noncopyable void wakeAll(); // wake all runners to handle tasks void asyncRun(); - void runOneRound(Duration min_dur, size_t id); + void runOneRound(std::chrono::steady_clock::duration min_dur, size_t id); void stop(); ~ReadIndexWorkerManager(); BatchReadIndexRes batchReadIndex( @@ -104,7 +104,7 @@ class ReadIndexWorkerManager : boost::noncopyable ReadIndexFuturePtr genReadIndexFuture(const kvrpcpb::ReadIndexRequest & req); private: - void runOneRoundAll(Duration min_dur = std::chrono::milliseconds{0}); + void runOneRoundAll(std::chrono::steady_clock::duration min_dur = std::chrono::milliseconds{0}); enum class State : uint8_t { @@ -124,7 +124,7 @@ class ReadIndexWorkerManager : boost::noncopyable void blockedWaitFor(std::chrono::milliseconds timeout) const; /// Traverse its workers and try to execute tasks. - void runOneRound(Duration min_dur); + void runOneRound(std::chrono::steady_clock::duration min_dur); /// Create one thread to run asynchronously. void asyncRun(); @@ -219,7 +219,7 @@ struct ReadIndexDataNode : MutexLockWrap Task task_pair; kvrpcpb::ReadIndexResponse resp; std::deque callbacks; - Timepoint start_time = Clock::now(); + std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); }; struct WaitingTasks : MutexLockWrap @@ -298,12 +298,12 @@ struct ReadIndexWorker void consumeReadIndexNotifyCtrl(); - void consumeRegionNotifies(Duration min_dur); + void consumeRegionNotifies(std::chrono::steady_clock::duration min_dur); ReadIndexFuturePtr genReadIndexFuture(const kvrpcpb::ReadIndexRequest & req); // try to consume read-index response notifications & region waiting list - void runOneRound(Duration min_dur); + void runOneRound(std::chrono::steady_clock::duration min_dur); explicit ReadIndexWorker( const TiFlashRaftProxyHelper & proxy_helper_, @@ -329,7 +329,7 @@ struct ReadIndexWorker // x = x == 0 ? 1 : x; // max_read_index_history = x; // } - bool lastRunTimeout(Duration timeout) const; + bool lastRunTimeout(std::chrono::steady_clock::duration timeout) const; void removeRegion(uint64_t); @@ -348,7 +348,7 @@ struct ReadIndexWorker RegionNotifyMap region_notify_map; // no need to be protected - std::atomic last_run_time{Timepoint::min()}; + std::atomic last_run_time{std::chrono::steady_clock::time_point::min()}; }; struct MockStressTestCfg diff --git a/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp b/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp index 48ee974a6e2..5c5ac03dd8e 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_read_index_worker.cpp @@ -381,9 +381,9 @@ void ReadIndexTest::testNormal() std::vector reqs; reqs = {make_read_index_reqs(5, 12), make_read_index_reqs(1, 12), make_read_index_reqs(2, 12)}; - Timepoint start = Clock::now(); + auto start = std::chrono::steady_clock::now(); auto resps = manager->batchReadIndex(reqs, 20); - auto time_cost = Clock::now() - start; + auto time_cost = std::chrono::steady_clock::now() - start; ASSERT_GE(time_cost, std::chrono::milliseconds{20}); // meet timeout ASSERT_EQ(resps[0].first.read_index(), 669); ASSERT_EQ(resps[1].first.region_error().has_region_not_found(), true); // timeout to region error not found diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 01f1fa9ac37..971d23c2c29 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -20,7 +20,6 @@ endif () add_subdirectory (libcommon) add_subdirectory (libpocoext) add_subdirectory (libdaemon) - add_subdirectory (libmemcpy) if (GLIBC_COMPATIBILITY) diff --git a/libs/libcommon/CMakeLists.txt b/libs/libcommon/CMakeLists.txt index b17867ed662..a72e228eb2a 100644 --- a/libs/libcommon/CMakeLists.txt +++ b/libs/libcommon/CMakeLists.txt @@ -158,22 +158,18 @@ endif () # Region for specialized CPU flags tuning -check_then_add_sources_compile_flag ( - TIFLASH_ENABLE_AVX_SUPPORT - "-mavx2" - src/mem_utils_avx2.cpp - src/crc64_avx2.cpp - src/avx2_mem_utils_impl.cpp -) check_then_add_sources_compile_flag ( TIFLASH_COMPILER_VPCLMULQDQ_SUPPORT "-mvpclmulqdq;-Wno-ignored-attributes" src/crc64_avx2.cpp src/crc64_avx512.cpp ) + check_then_add_sources_compile_flag ( - TIFLASH_COMPILER_MOVBE_SUPPORT - "-mmovbe" + TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT + "${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}" + src/mem_utils_avx2.cpp + src/crc64_avx2.cpp src/avx2_mem_utils_impl.cpp ) diff --git a/libs/libcommon/include/common/avx2_byte_count.h b/libs/libcommon/include/common/avx2_byte_count.h new file mode 100644 index 00000000000..1c59cc8a9a8 --- /dev/null +++ b/libs/libcommon/include/common/avx2_byte_count.h @@ -0,0 +1,78 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace mem_utils::details +{ +#if defined(MEM_UTILS_FUNC_NO_SANITIZE) +MEM_UTILS_FUNC_NO_SANITIZE +#else +ALWAYS_INLINE static inline +#endif +uint64_t avx2_byte_count(const char * src, size_t size, char target) +{ + uint64_t zero_bytes_cnt = 0; + const auto check_block32 = _mm256_set1_epi8(target); + + if (uint8_t right_offset = OFFSET_FROM_ALIGNED(size_t(src), BLOCK32_SIZE); right_offset != 0) + { + // align to 32 + src = reinterpret_cast(ALIGNED_ADDR(size_t(src), BLOCK32_SIZE)); + + // right shift offset to remove useless mask bit + auto mask = get_block32_cmp_eq_mask(src, check_block32); + size_t left_remain = BLOCK32_SIZE - right_offset; + + if unlikely (left_remain >= size) + { + left_remain -= size; + mask <<= left_remain; + mask >>= left_remain; + mask >>= right_offset; + return __builtin_popcount(mask); + } + + mask >>= right_offset; + zero_bytes_cnt += __builtin_popcount(mask); + size -= left_remain; + src += BLOCK32_SIZE; + } + + assert(size_t(src) % BLOCK32_SIZE == 0); + + // clang will unroll by step 4 automatically with flags `-mavx2` if size >= 4 * 32 + // unrolled by step 8 with flags `-march=haswell` + for (; size >= BLOCK32_SIZE;) + { + auto mask = get_block32_cmp_eq_mask(src, check_block32); + zero_bytes_cnt += __builtin_popcount(mask); + size -= BLOCK32_SIZE, src += BLOCK32_SIZE; + } + + if unlikely (size != 0) + { + auto mask = get_block32_cmp_eq_mask(src, check_block32); + uint32_t left_remain = BLOCK32_SIZE - size; + mask <<= left_remain; + mask >>= left_remain; + zero_bytes_cnt += __builtin_popcount(mask); + } + + return zero_bytes_cnt; +} + +} // namespace mem_utils::details diff --git a/libs/libcommon/include/common/avx2_mem_utils.h b/libs/libcommon/include/common/avx2_mem_utils.h index 28051208637..e56087c893c 100644 --- a/libs/libcommon/include/common/avx2_mem_utils.h +++ b/libs/libcommon/include/common/avx2_mem_utils.h @@ -24,6 +24,10 @@ #include #include +#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) +#define MEM_UTILS_FUNC_NO_SANITIZE [[maybe_unused]] static NO_INLINE NO_SANITIZE_ADDRESS NO_SANITIZE_THREAD +#endif + namespace mem_utils::details { @@ -32,7 +36,7 @@ template ALWAYS_INLINE static inline T clear_rightmost_bit_one(const T value) { assert(value != 0); - + // recommended to use compile flag `-mbmi` under AMD64 platform return value & (value - 1); } @@ -67,9 +71,18 @@ FLATTEN_INLINE static inline void write(void * tar, const S & src) { *reinterpret_cast(tar) = src; } +template FLATTEN_INLINE_PURE static inline Block32 load_block32(const void * p) { - return _mm256_loadu_si256(reinterpret_cast(p)); + if constexpr (aligned) + { + if constexpr (non_temporal) + return _mm256_stream_load_si256(reinterpret_cast(p)); + else + return _mm256_load_si256(reinterpret_cast(p)); + } + else + return _mm256_loadu_si256(reinterpret_cast(p)); } FLATTEN_INLINE_PURE static inline Block16 load_block16(const void * p) { @@ -83,11 +96,16 @@ FLATTEN_INLINE static inline void write_block16(void * p, const Block16 & src) else _mm_storeu_si128(reinterpret_cast(p), src); } -template +template FLATTEN_INLINE static inline void write_block32(void * p, const Block32 & src) { if constexpr (aligned) - _mm256_store_si256(reinterpret_cast(p), src); + { + if constexpr (non_temporal) + _mm256_stream_si256(reinterpret_cast(p), src); + else + _mm256_store_si256(reinterpret_cast(p), src); + } else _mm256_storeu_si256(reinterpret_cast(p), src); } @@ -96,11 +114,27 @@ FLATTEN_INLINE_PURE static inline uint32_t get_block32_cmp_eq_mask(const void * uint32_t mask = _mm256_movemask_epi8(_mm256_cmpeq_epi8(load_block32(p1), load_block32(p2))); return mask; } +FLATTEN_INLINE_PURE static inline uint32_t get_block32_cmp_eq_mask( + const void * s, + const Block32 & check_block) +{ + const auto block = load_block32(s); + uint32_t mask = _mm256_movemask_epi8(_mm256_cmpeq_epi8(block, check_block)); + return mask; +} FLATTEN_INLINE_PURE static inline uint32_t get_block16_cmp_eq_mask(const void * p1, const void * p2) { uint32_t mask = _mm_movemask_epi8(_mm_cmpeq_epi8(load_block16(p1), load_block16(p2))); return mask; } +FLATTEN_INLINE_PURE static inline uint32_t get_block16_cmp_eq_mask( + const void * s, + const Block16 & check_block) +{ + const auto block = load_block16(s); + uint32_t mask = _mm_movemask_epi8(_mm_cmpeq_epi8(block, check_block)); + return mask; +} FLATTEN_INLINE_PURE static inline bool check_block32_eq(const char * a, const char * b) { auto data = _mm256_xor_si256( diff --git a/libs/libcommon/include/common/avx2_memcpy.h b/libs/libcommon/include/common/avx2_memcpy.h index d8937928b5c..02f4ec0aa1d 100644 --- a/libs/libcommon/include/common/avx2_memcpy.h +++ b/libs/libcommon/include/common/avx2_memcpy.h @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -37,11 +38,7 @@ ALWAYS_INLINE static inline void * avx2_inline_memcpy(void * __restrict dst_, co #define MEM_CP_END(n) tiflash_compiler_builtin_memcpy(dst + size - (n), src + size - (n), (n)); #define MEM_CP_HEAD(n) tiflash_compiler_builtin_memcpy(dst, src, (n)); -#ifdef AVX2_MEMCPY_USE_PREFETCH #define PREFETCH(addr) __builtin_prefetch(addr) -#else -#define PREFETCH(addr) -#endif constexpr int block32_size = details::BLOCK32_SIZE; @@ -58,7 +55,7 @@ ALWAYS_INLINE static inline void * avx2_inline_memcpy(void * __restrict dst_, co } /// No bytes remaining. } - else if (unlikely(size <= 4)) + else if (unlikely(size < 4)) { /// Chunks of 2..3 bytes. details::memcpy_ignore_overlap<2>(dst, src, size); @@ -95,8 +92,12 @@ ALWAYS_INLINE static inline void * avx2_inline_memcpy(void * __restrict dst_, co dst -= offset; src -= offset; size += offset; + assert(size_t(dst) % block32_size == 0); } + // TODO: use non-temporal way(mark data unlikely to be used again soon) to minimize caching for large memory size(bigger than L2/L3 cache size) if necessary. + // TODO: check whether source address is aligned to 32 and use specific aligned instructions if necessary. + /// Aligned unrolled copy. while (size >= block32_size * loop_block32_cnt) { diff --git a/libs/libcommon/include/common/avx2_strstr.h b/libs/libcommon/include/common/avx2_strstr.h index ad0b6715413..706719ab2f8 100644 --- a/libs/libcommon/include/common/avx2_strstr.h +++ b/libs/libcommon/include/common/avx2_strstr.h @@ -19,19 +19,6 @@ namespace mem_utils::details { -FLATTEN_INLINE_PURE static inline uint32_t get_block32_cmp_eq_mask( - const void * s, - const Block32 & check_block) -{ - /* - vpcmpeqb ymm0, ymm0, ymmword ptr [...] - */ - // `_mm256_loadu_si256` and `_mm256_load_si256` are same in such case - const auto block = load_block32(s); - uint32_t mask = _mm256_movemask_epi8(_mm256_cmpeq_epi8(block, check_block)); - return mask; -} - template ALWAYS_INLINE static inline bool check_aligned_block32_may_exceed(const char * src, ssize_t n, const char *& res, const Block32 & check_block, F && callback) { @@ -248,12 +235,13 @@ ALWAYS_INLINE static inline const char * avx2_strstr_impl(const char * src, size #undef M } -ALWAYS_INLINE static inline size_t avx2_strstr(const char * src, size_t n, const char * needle, size_t k) -{ -#if defined(ADDRESS_SANITIZER) - return std::string_view{src, n}.find({needle, k}); // memchr@plt -> bcmp@plt +#if defined(MEM_UTILS_FUNC_NO_SANITIZE) +MEM_UTILS_FUNC_NO_SANITIZE +#else +ALWAYS_INLINE static inline #endif - +size_t avx2_strstr(const char * src, size_t n, const char * needle, size_t k) +{ const auto * p = avx2_strstr_impl(src, n, needle, k); return p ? p - src : std::string_view::npos; } @@ -261,12 +249,14 @@ ALWAYS_INLINE static inline size_t avx2_strstr(std::string_view src, std::string { return avx2_strstr(src.data(), src.size(), needle.data(), needle.size()); } -ALWAYS_INLINE static inline const char * avx2_memchr(const char * src, size_t n, char target) -{ -#if defined(ADDRESS_SANITIZER) - return static_cast(std::memchr(src, target, n)); // memchr@plt -#endif +#if defined(MEM_UTILS_FUNC_NO_SANITIZE) +MEM_UTILS_FUNC_NO_SANITIZE +#else +ALWAYS_INLINE static inline +#endif +const char * avx2_memchr(const char * src, size_t n, char target) +{ if (unlikely(n < 1)) { return nullptr; diff --git a/libs/libcommon/include/common/fixed_mem_eq.h b/libs/libcommon/include/common/fixed_mem_eq.h index 85691008c6d..39980660418 100644 --- a/libs/libcommon/include/common/fixed_mem_eq.h +++ b/libs/libcommon/include/common/fixed_mem_eq.h @@ -123,7 +123,7 @@ ALWAYS_INLINE inline bool memcmp_eq_fixed_size(const char * a, const char * b) ret */ - return std::memcmp(a, b, k) == 0; + return __builtin_memcmp(a, b, k) == 0; } else if constexpr (k > 8) { diff --git a/libs/libcommon/include/common/mem_utils_opt.h b/libs/libcommon/include/common/mem_utils_opt.h index 0b5e86aa6fe..98f47111256 100644 --- a/libs/libcommon/include/common/mem_utils_opt.h +++ b/libs/libcommon/include/common/mem_utils_opt.h @@ -25,6 +25,7 @@ constexpr bool tiflash_use_avx2_compile_flag = true; // if cpp source file is compiled with flag `-mavx2`, it's recommended to use inline function for better performance. +#include #include #include @@ -60,6 +61,9 @@ bool avx2_mem_equal(const char * p1, const char * p2, size_t n); // same function like `std::memcmp` int avx2_mem_cmp(const char * p1, const char * p2, size_t n); +// return count of target byte +uint64_t avx2_byte_count(const char * src, size_t size, char target); + } // namespace mem_utils #endif diff --git a/libs/libcommon/src/avx2_mem_utils_impl.cpp b/libs/libcommon/src/avx2_mem_utils_impl.cpp index fad0ee3962e..2b4bfe7ec79 100644 --- a/libs/libcommon/src/avx2_mem_utils_impl.cpp +++ b/libs/libcommon/src/avx2_mem_utils_impl.cpp @@ -44,6 +44,11 @@ const char * avx2_memchr(const char * src, size_t n, char target) return details::avx2_memchr(src, n, target); } +uint64_t avx2_byte_count(const char * src, size_t size, char target) +{ + return details::avx2_byte_count(src, size, target); +} + } // namespace mem_utils #endif diff --git a/libs/libcommon/src/tests/CMakeLists.txt b/libs/libcommon/src/tests/CMakeLists.txt index 04498763105..0e029466385 100644 --- a/libs/libcommon/src/tests/CMakeLists.txt +++ b/libs/libcommon/src/tests/CMakeLists.txt @@ -39,7 +39,7 @@ add_executable (gtests_libcommon gtest_crc64.cpp gtest_logger.cpp gtest_arithmetic_overflow.cpp - ) +) add_sources_compile_flag_avx2 (gtest_mem_utils_opt.cpp) diff --git a/libs/libcommon/src/tests/gtest_mem_utils_opt.cpp b/libs/libcommon/src/tests/gtest_mem_utils_opt.cpp index 04941075d32..6fd9fd26d1f 100644 --- a/libs/libcommon/src/tests/gtest_mem_utils_opt.cpp +++ b/libs/libcommon/src/tests/gtest_mem_utils_opt.cpp @@ -26,8 +26,6 @@ #include #include -#include "../../libmemcpy/folly/FollyMemcpy.h" - #if defined(TIFLASH_ENABLE_AVX_SUPPORT) void TestFunc(size_t size) @@ -232,4 +230,26 @@ TEST(MemUtilsTestOPT, Memcopy) } } +void TestMemByteCount(size_t size) +{ + char target = 8; + std::string oa(size + 100, target); + char * start = oa.data(); + for (auto * pos = start; pos < start + 32; ++pos) + { + ASSERT_EQ(mem_utils::avx2_byte_count(pos, size, target), size); + std::memset(pos, target - 1, size); + ASSERT_EQ(mem_utils::avx2_byte_count(pos, size, target), 0); + std::memset(pos, target, size); + } +} + +TEST(MemUtilsTestOPT, MemByteCount) +{ + for (size_t size = 0; size <= 32 * 6; ++size) + { + TestMemByteCount(size); + } +} + #endif \ No newline at end of file diff --git a/libs/libmemcpy/CMakeLists.txt b/libs/libmemcpy/CMakeLists.txt index ae91ddf7bb6..a9ba22a9ced 100644 --- a/libs/libmemcpy/CMakeLists.txt +++ b/libs/libmemcpy/CMakeLists.txt @@ -39,7 +39,11 @@ else () list (APPEND memcpy_sources memcpy.cpp) endif() -add_sources_compile_flag_avx2 (${memcpy_sources}) +check_then_add_sources_compile_flag ( + TIFLASH_ENABLE_ARCH_HASWELL_SUPPORT + "${TIFLASH_COMPILER_ARCH_HASWELL_FLAG}" + ${memcpy_sources} +) add_library (memcpy STATIC ${memcpy_sources}) target_include_directories(memcpy PUBLIC ${TiFlash_SOURCE_DIR}/libs/libcommon/include) diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index bf9e300af1c..eb8d77937a2 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -3370,6 +3370,105 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 24 + }, + "hiddenSeries": false, + "id": 165, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tiflash_exchange_data_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Exchange Bytes/Seconds", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -10681,4 +10780,4 @@ "title": "Test-Cluster-TiFlash-Summary", "uid": "SVbh2xUWk", "version": 1 -} +} \ No newline at end of file diff --git a/tests/fullstack-test2/mpp/mpp-version.test b/tests/fullstack-test2/mpp/mpp-version.test new file mode 100644 index 00000000000..bb11ab9b291 --- /dev/null +++ b/tests/fullstack-test2/mpp/mpp-version.test @@ -0,0 +1,49 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Preparation. +=> DBGInvoke __init_fail_point() + +mysql> drop table if exists test.t; +mysql> create table if not exists test.t(a int, b int); + +mysql> insert into test.t values(1,5), (2,5), (3,5), (4,5); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t + +mysql> use test; set tidb_enforce_mpp=on; set tidb_isolation_read_engines='tiflash'; select count(1) as c from t group by b; + ++---+ +| c | ++---+ +| 4 | ++---+ + +=> DBGInvoke __enable_fail_point(invalid_mpp_version) + +mysql> use test; set tidb_enforce_mpp=on; set tidb_isolation_read_engines='tiflash'; select count(1) as c from t group by b; + +ERROR 1105 (HY000) at line 1: rpc error: code = Canceled desc = Failed to handling mpp dispatch request, reason=`Invalid mpp version -1, TiFlash expects version: min 0, max 1, should upgrade TiDB/planner` + +=> DBGInvoke __disable_fail_point(invalid_mpp_version) + +mysql> use test; set tidb_enforce_mpp=on; set tidb_isolation_read_engines='tiflash'; select count(1) as c from t group by b; + ++---+ +| c | ++---+ +| 4 | ++---+ + +mysql> drop table if exists test.t