diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 17315a5598e..53fe9a225c3 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -31,32 +31,8 @@ namespace ErrorCodes extern const int INCORRECT_INDEX; extern const int LOGICAL_ERROR; extern const int CANNOT_READ_ALL_DATA; -extern const int NOT_IMPLEMENTED; } // namespace ErrorCodes -namespace -{ -void checkColumnSize(size_t expected, size_t actual) -{ - if (expected != actual) - throw Exception( - fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual), - ErrorCodes::LOGICAL_ERROR); -} - -void checkDataTypeName(size_t column_index, const String & expected, const String & actual) -{ - if (expected != actual) - throw Exception( - fmt::format( - "NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}", - column_index, - expected, - actual), - ErrorCodes::LOGICAL_ERROR); -} -} // namespace - NativeBlockInputStream::NativeBlockInputStream( ReadBuffer & istr_, UInt64 server_revision_, @@ -179,9 +155,9 @@ Block NativeBlockInputStream::readImpl() } if (header) - checkColumnSize(header.columns(), columns); + CodecUtils::checkColumnSize(header.columns(), columns); else if (!output_names.empty()) - checkColumnSize(output_names.size(), columns); + CodecUtils::checkColumnSize(output_names.size(), columns); for (size_t i = 0; i < columns; ++i) { @@ -208,7 +184,7 @@ Block NativeBlockInputStream::readImpl() readBinary(type_name, istr); if (header) { - checkDataTypeName(i, header_datatypes[i].name, type_name); + CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name); column.type = header_datatypes[i].type; } else diff --git a/dbms/src/DataStreams/NativeBlockInputStream.h b/dbms/src/DataStreams/NativeBlockInputStream.h index 6a3c2eedd76..418a93fe4f7 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.h +++ b/dbms/src/DataStreams/NativeBlockInputStream.h @@ -17,6 +17,7 @@ #include #include #include +#include #include namespace DB @@ -116,19 +117,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream Block header; UInt64 server_revision; bool align_column_name_with_header = false; - - struct DataTypeWithTypeName - { - DataTypeWithTypeName(const DataTypePtr & t, const String & n) - : type(t) - , name(n) - { - } - - DataTypePtr type; - String name; - }; - std::vector header_datatypes; + std::vector header_datatypes; bool use_index = false; IndexForNativeFormat::Blocks::const_iterator index_block_it; @@ -136,7 +125,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it; /// If an index is specified, then `istr` must be CompressedReadBufferFromFile. - CompressedReadBufferFromFile<> * istr_concrete; + CompressedReadBufferFromFile<> * istr_concrete = nullptr; PODArray avg_value_size_hints; diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index c128c4d260e..83290e8a422 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -64,6 +65,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream // CoprocessorBlockInputStream doesn't take care of this. size_t stream_id; + std::unique_ptr decoder_ptr; + void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { for (const auto & execution_summary : resp.execution_summaries()) @@ -84,6 +87,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream { if (resp.execution_summaries_size() == 0) return; + if (!execution_summaries_inited[index].load()) { initRemoteExecutionSummaries(resp, index); @@ -128,7 +132,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream { while (true) { - auto result = remote_reader->nextResult(block_queue, sample_block, stream_id); + auto result = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr); if (result.meet_error) { LOG_WARNING(log, "remote reader meets error: {}", result.error_msg); @@ -155,21 +159,22 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream } const auto & decode_detail = result.decode_detail; + total_rows += decode_detail.rows; size_t index = 0; if constexpr (is_streaming_reader) index = result.call_index; - ++connection_profile_infos[index].packets; + connection_profile_infos[index].packets += decode_detail.packets; connection_profile_infos[index].bytes += decode_detail.packet_bytes; - total_rows += decode_detail.rows; LOG_TRACE( log, "recv {} rows from remote for {}, total recv row num: {}", decode_detail.rows, result.req_info, total_rows); + if (decode_detail.rows > 0) return true; // else continue @@ -193,6 +198,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); + constexpr size_t squash_rows_limit = 8192; + if constexpr (is_streaming_reader) + decoder_ptr = std::make_unique(sample_block, squash_rows_limit); } Block getHeader() const override { return sample_block; } @@ -211,7 +219,6 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream if (!fetchRemoteResult()) return {}; } - // todo should merge some blocks to make sure the output block is big enough Block block = block_queue.front(); block_queue.pop(); return block; @@ -222,6 +229,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr; } + size_t getTotalRows() const { return total_rows; } size_t getSourceNum() const { return source_num; } bool isStreamingCall() const { return is_streaming_reader; } const std::vector & getConnectionProfileInfos() const { return connection_profile_infos; } diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp index 8f3d6e10d67..75b53c90398 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,20 @@ class CHBlockChunkCodecStream : public ChunkCodecStream DataTypes expected_types; }; +CHBlockChunkCodec::CHBlockChunkCodec( + const Block & header_) + : header(header_) +{ + for (const auto & column : header) + header_datatypes.emplace_back(column.type, column.type->getName()); +} + +CHBlockChunkCodec::CHBlockChunkCodec(const DAGSchema & schema) +{ + for (const auto & c : schema) + output_names.push_back(c.first); +} + size_t getExtraInfoSize(const Block & block) { size_t size = 64; /// to hold some length of structures, such as column number, row number... @@ -83,6 +98,14 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {}); } +void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows) +{ + IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) { + return &istr; + }; + type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {}); +} + void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end) { /// only check block schema in CHBlock codec because for both @@ -120,21 +143,83 @@ std::unique_ptr CHBlockChunkCodec::newCodecStream(const std::v return std::make_unique(field_types); } +Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr, size_t reserve_size) +{ + Block res; + if (istr.eof()) + { + return res; + } + + /// Dimensions + size_t columns = 0; + size_t rows = 0; + readBlockMeta(istr, columns, rows); + + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + readColumnMeta(i, istr, column); + + /// Data + MutableColumnPtr read_column = column.type->createColumn(); + if (reserve_size > 0) + read_column->reserve(std::max(rows, reserve_size)); + else if (rows) + read_column->reserve(rows); + + if (rows) /// If no rows, nothing to read. + readData(*column.type, *read_column, istr, rows); + + column.column = std::move(read_column); + res.insert(std::move(column)); + } + return res; +} +void CHBlockChunkCodec::readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const +{ + readVarUInt(columns, istr); + readVarUInt(rows, istr); + + if (header) + CodecUtils::checkColumnSize(header.columns(), columns); + else if (!output_names.empty()) + CodecUtils::checkColumnSize(output_names.size(), columns); +} + +void CHBlockChunkCodec::readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column) +{ + /// Name + readBinary(column.name, istr); + if (header) + column.name = header.getByPosition(i).name; + else if (!output_names.empty()) + column.name = output_names[i]; + + /// Type + String type_name; + readBinary(type_name, istr); + const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); + if (header) + { + CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name); + column.type = header_datatypes[i].type; + } + else + { + column.type = data_type_factory.get(type_name); + } +} + Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema) { ReadBufferFromString read_buffer(str); - std::vector output_names; - for (const auto & c : schema) - output_names.push_back(c.first); - NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names)); - return block_in.read(); + return CHBlockChunkCodec(schema).decodeImpl(read_buffer); } Block CHBlockChunkCodec::decode(const String & str, const Block & header) { ReadBufferFromString read_buffer(str); - NativeBlockInputStream block_in(read_buffer, header, 0, /*align_column_name_with_header=*/true); - return block_in.read(); + return CHBlockChunkCodec(header).decodeImpl(read_buffer); } - } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h index 79dc797cb47..2fa520e19a9 100644 --- a/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h +++ b/dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h @@ -15,16 +15,34 @@ #pragma once #include +#include namespace DB { -class CHBlockChunkCodec : public ChunkCodec +class CHBlockChunkDecodeAndSquash; + +class CHBlockChunkCodec final : public ChunkCodec { public: CHBlockChunkCodec() = default; + CHBlockChunkCodec(const Block & header_); + CHBlockChunkCodec(const DAGSchema & schema); + Block decode(const String &, const DAGSchema & schema) override; static Block decode(const String &, const Block & header); std::unique_ptr newCodecStream(const std::vector & field_types) override; + +private: + friend class CHBlockChunkDecodeAndSquash; + void readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column); + void readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const; + static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows); + /// 'reserve_size' used for Squash usage, and takes effect when 'reserve_size' > 0 + Block decodeImpl(ReadBuffer & istr, size_t reserve_size = 0); + + Block header; + std::vector header_datatypes; + std::vector output_names; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.cpp b/dbms/src/Flash/Coprocessor/CodecUtils.cpp new file mode 100644 index 00000000000..ceffcbdc7c6 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CodecUtils.cpp @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} // namespace ErrorCodes + +namespace CodecUtils +{ +void checkColumnSize(size_t expected, size_t actual) +{ + if unlikely (expected != actual) + throw Exception( + fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual), + ErrorCodes::LOGICAL_ERROR); +} + +void checkDataTypeName(size_t column_index, const String & expected, const String & actual) +{ + if unlikely (expected != actual) + throw Exception( + fmt::format( + "NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}", + column_index, + expected, + actual), + ErrorCodes::LOGICAL_ERROR); +} + +} // namespace CodecUtils +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.h b/dbms/src/Flash/Coprocessor/CodecUtils.h new file mode 100644 index 00000000000..c668c622fda --- /dev/null +++ b/dbms/src/Flash/Coprocessor/CodecUtils.h @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +namespace CodecUtils +{ +struct DataTypeWithTypeName +{ + DataTypeWithTypeName(const DataTypePtr & t, const String & n) + : type(t) + , name(n) + { + } + + DataTypePtr type; + String name; +}; + +void checkColumnSize(size_t expected, size_t actual); +void checkDataTypeName(size_t column_index, const String & expected, const String & actual); +} // namespace CodecUtils +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index b48fdbcd6dc..cebf2c080f3 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -139,12 +140,13 @@ class CoprocessorReader return detail; } - // stream_id is only meaningful for ExchagneReceiver. - CoprocessorReaderResult nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/) + // stream_id, decoder_ptr are only meaningful for ExchagneReceiver. + CoprocessorReaderResult nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/, std::unique_ptr & /*decoder_ptr*/) { auto && [result, has_next] = resp_iter.next(); if (!result.error.empty()) return {nullptr, true, result.error.message(), false}; + if (!has_next) return {nullptr, false, "", true}; @@ -156,12 +158,13 @@ class CoprocessorReader return {nullptr, true, resp->error().DebugString(), false}; } else if (has_enforce_encode_type && resp->encode_type() != tipb::EncodeType::TypeCHBlock && resp->chunks_size() > 0) - return { - nullptr, - true, - "Encode type of coprocessor response is not CHBlock, " - "maybe the version of some TiFlash node in the cluster is not match with this one", - false}; + { + return {nullptr, + true, + "Encode type of coprocessor response is not CHBlock, " + "maybe the version of some TiFlash node in the cluster is not match with this one", + false}; + } auto detail = decodeChunks(resp, block_queue, header, schema); return {resp, false, "", false, detail}; } diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 7adb88b3d6b..5adba4775bb 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -484,7 +483,6 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) query_block.source_name, /*stream_id=*/enable_fine_grained_shuffle ? i : 0); exchange_receiver_io_input_streams.push_back(stream); - stream = std::make_shared(stream, 8192, 0, log->identifier()); stream->setExtraInfo(extra_info); pipeline.streams.push_back(stream); } diff --git a/dbms/src/Flash/Coprocessor/DecodeDetail.h b/dbms/src/Flash/Coprocessor/DecodeDetail.h index 91851650d9e..2c501e96820 100644 --- a/dbms/src/Flash/Coprocessor/DecodeDetail.h +++ b/dbms/src/Flash/Coprocessor/DecodeDetail.h @@ -21,6 +21,9 @@ namespace DB /// Detail of the packet that decoding in TiRemoteInputStream.RemoteReader.decodeChunks() struct DecodeDetail { + // Responding packets count, usually be 1, be 0 when flush data before eof + Int64 packets = 1; + // For fine grained shuffle, each ExchangeReceiver/thread will decode its own blocks. // So this is the row number of partial blocks of the original packet. // This will be the row number of all blocks of the original packet if it's not fine grained shuffle. diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp new file mode 100644 index 00000000000..8ef5e6a910c --- /dev/null +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.cpp @@ -0,0 +1,85 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +CHBlockChunkDecodeAndSquash::CHBlockChunkDecodeAndSquash( + const Block & header, + size_t rows_limit_) + : codec(header) + , rows_limit(rows_limit_) +{ +} + +std::optional CHBlockChunkDecodeAndSquash::decodeAndSquash(const String & str) +{ + std::optional res; + ReadBufferFromString istr(str); + if (istr.eof()) + { + if (accumulated_block) + res.swap(accumulated_block); + return res; + } + + if (!accumulated_block) + { + /// hard-code 1.5 here, since final column size will be more than rows_limit in most situations, + /// so it should be larger than 1.0, just use 1.5 here, no special meaning + Block block = codec.decodeImpl(istr, static_cast(rows_limit * 1.5)); + if (block) + accumulated_block.emplace(std::move(block)); + } + else + { + /// Dimensions + size_t columns = 0; + size_t rows = 0; + codec.readBlockMeta(istr, columns, rows); + + if (rows) + { + auto mutable_columns = accumulated_block->mutateColumns(); + for (size_t i = 0; i < columns; ++i) + { + ColumnWithTypeAndName column; + codec.readColumnMeta(i, istr, column); + CHBlockChunkCodec::readData(*column.type, *(mutable_columns[i]), istr, rows); + } + accumulated_block->setColumns(std::move(mutable_columns)); + } + } + + if (accumulated_block && accumulated_block->rows() >= rows_limit) + { + /// Return accumulated data and reset accumulated_block + res.swap(accumulated_block); + return res; + } + return res; +} + +std::optional CHBlockChunkDecodeAndSquash::flush() +{ + if (!accumulated_block) + return accumulated_block; + std::optional res; + accumulated_block.swap(res); + return res; +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h new file mode 100644 index 00000000000..9ba8e821535 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/IChunkDecodeAndSquash.h @@ -0,0 +1,48 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class IChunkDecodeAndSquash +{ +public: + virtual ~IChunkDecodeAndSquash() = default; + /// The returned optional value can only have block that block.operator bool() is true + virtual std::optional decodeAndSquash(const String &) = 0; + /// Return value should be false if 'flush' is invoked consecutively more than once + virtual std::optional flush() = 0; +}; + +class CHBlockChunkDecodeAndSquash final : public IChunkDecodeAndSquash +{ +public: + explicit CHBlockChunkDecodeAndSquash(const Block & header, size_t rows_limit_); + virtual ~CHBlockChunkDecodeAndSquash() = default; + std::optional decodeAndSquash(const String &); + std::optional flush(); + +private: + CHBlockChunkCodec codec; + std::optional accumulated_block; + size_t rows_limit; +}; + + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp new file mode 100644 index 00000000000..ace861b1c41 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_chunk_decode_and_squash.cpp @@ -0,0 +1,152 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +namespace DB +{ +namespace tests +{ +class TestChunkDecodeAndSquash : public testing::Test +{ +protected: + void SetUp() override + { + } + +public: + TestChunkDecodeAndSquash() + : context(TiFlashTestEnv::getContext()) + {} + + static Block squashBlocks(std::vector & blocks) + { + std::vector reference_block_vec; + SquashingTransform squash_transform(std::numeric_limits::max(), 0, ""); + for (auto & block : blocks) + squash_transform.add(std::move(block)); + Block empty; + auto result = squash_transform.add(std::move(empty)); + return result.block; + } + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + fields[i].set_flag(TiDB::ColumnFlagNotNull); + } + return fields; + } + + static DAGSchema makeSchema() + { + auto fields = makeFields(); + DAGSchema schema; + for (size_t i = 0; i < fields.size(); ++i) + { + ColumnInfo info = TiDB::fieldTypeToColumnInfo(fields[i]); + schema.emplace_back(String("col") + std::to_string(i), std::move(info)); + } + return schema; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareBlock(size_t rows) + { + Block block; + for (size_t i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + void doTestWork(bool flush_something) + { + const size_t block_rows = 1024; + const size_t block_num = 256; + std::mt19937_64 rand_gen; + // 1. Build Blocks. + std::vector blocks; + for (size_t i = 0; i < block_num; ++i) + { + UInt64 rows = flush_something ? static_cast(rand_gen()) % (block_rows * 4) : block_rows; + blocks.emplace_back(prepareBlock(rows)); + if (flush_something) + blocks.emplace_back(prepareBlock(0)); /// Adds this empty block, so even unluckily, total_rows % rows_limit == 0, it would flush an empty block with header + } + + // 2. encode all blocks + std::unique_ptr codec_stream = std::make_unique()->newCodecStream(makeFields()); + std::vector encode_str_vec(block_num); + for (const auto & block : blocks) + { + codec_stream->encode(block, 0, block.rows()); + encode_str_vec.push_back(codec_stream->getString()); + codec_stream->clear(); + } + + // 3. DecodeAndSquash all these blocks + Block header = blocks.back(); + std::vector decoded_blocks; + CHBlockChunkDecodeAndSquash decoder(header, block_rows * 4); + for (const auto & str : encode_str_vec) + { + auto result = decoder.decodeAndSquash(str); + if (result) + decoded_blocks.push_back(std::move(result.value())); + } + auto last_block = decoder.flush(); + if (last_block) + decoded_blocks.push_back(std::move(last_block.value())); + /// flush after flush should return empty optional + ASSERT_TRUE(!decoder.flush()); + + // 4. Check correctness + Block reference_block = squashBlocks(blocks); + Block decoded_block = squashBlocks(decoded_blocks); + ASSERT_BLOCK_EQ(reference_block, decoded_block); + } + Context context; +}; + +TEST_F(TestChunkDecodeAndSquash, testDecodeAndSquash) +try +{ + doTestWork(true); + doTestWork(false); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp new file mode 100644 index 00000000000..c6598e556c1 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -0,0 +1,446 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ +namespace tests +{ +using Packet = TrackedMppDataPacket; +using PacketPtr = std::shared_ptr; +using PacketQueue = MPMCQueue; +using PacketQueuePtr = std::shared_ptr; + +bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & right) +{ + return (left.concurrency == right.concurrency) && (left.num_iterations == right.num_iterations) && (left.num_produced_rows == right.num_produced_rows) && (left.time_processed_ns == right.time_processed_ns); +} + +struct MockWriter +{ + explicit MockWriter(PacketQueuePtr queue_) + : queue(queue_) + {} + + ExecutionSummary mockExecutionSummary() + { + ExecutionSummary summary; + summary.time_processed_ns = 100; + summary.num_produced_rows = 10000; + summary.num_iterations = 50; + summary.concurrency = 1; + return summary; + } + + void write(mpp::MPPDataPacket &, uint16_t) { FAIL() << "cannot reach here."; } + void write(mpp::MPPDataPacket & packet) + { + auto tracked_packet = std::make_shared(packet, nullptr); + if (add_summary) + { + tipb::SelectResponse response; + auto * summary_ptr = response.add_execution_summaries(); + auto summary = mockExecutionSummary(); + summary_ptr->set_time_processed_ns(summary.time_processed_ns); + summary_ptr->set_num_produced_rows(summary.num_produced_rows); + summary_ptr->set_num_iterations(summary.num_iterations); + summary_ptr->set_concurrency(summary.concurrency); + summary_ptr->set_executor_id("Executor_0"); + tracked_packet->serializeByResponse(response); + } + ++total_packets; + if (!tracked_packet->packet.chunks().empty()) + total_bytes += tracked_packet->packet.ByteSizeLong(); + queue->push(tracked_packet); + } + void write(tipb::SelectResponse &, uint16_t) { FAIL() << "cannot reach here."; } + void write(tipb::SelectResponse & response) + { + ++total_packets; + if (!response.chunks().empty()) + total_bytes += response.ByteSizeLong(); + mpp::MPPDataPacket packet; + auto tracked_packet = std::make_shared(packet, nullptr); + tracked_packet->serializeByResponse(response); + queue->push(tracked_packet); + } + uint16_t getPartitionNum() const { return 1; } + + PacketQueuePtr queue; + bool add_summary = false; + size_t total_packets = 0; + size_t total_bytes = 0; +}; + +// NOLINTBEGIN(readability-convert-member-functions-to-static) +struct MockReceiverContext +{ + using Status = ::grpc::Status; + struct Request + { + String debugString() const + { + return "{Request}"; + } + + int source_index = 0; + int send_task_id = 0; + int recv_task_id = -1; + }; + + struct Reader + { + explicit Reader(const PacketQueuePtr & queue_) + : queue(queue_) + {} + + void initialize() const + { + } + + bool read(PacketPtr & packet [[maybe_unused]]) const + { + PacketPtr res; + if (queue->pop(res) == MPMCQueueResult::OK) + { + *packet = *res; // avoid change shared packets + return true; + } + return false; + } + + Status finish() const + { + return ::grpc::Status(); + } + + void cancel(const String &) + { + } + + PacketQueuePtr queue; + }; + + struct MockAsyncGrpcExchangePacketReader + { + // Not implement benchmark for Async GRPC for now. + void init(UnaryCallback *) { assert(0); } + void read(TrackedMppDataPacketPtr &, UnaryCallback *) { assert(0); } + void finish(::grpc::Status &, UnaryCallback *) { assert(0); } + }; + + using AsyncReader = MockAsyncGrpcExchangePacketReader; + + MockReceiverContext( + PacketQueuePtr & queue_, + const std::vector & field_types_) + : queue(queue_) + , field_types(field_types_) + { + } + + void fillSchema(DAGSchema & schema) const + { + schema.clear(); + for (size_t i = 0; i < field_types.size(); ++i) + { + String name = "exchange_receiver_" + std::to_string(i); + ColumnInfo info = TiDB::fieldTypeToColumnInfo(field_types[i]); + schema.emplace_back(std::move(name), std::move(info)); + } + } + + Request makeRequest(int index) const + { + return {index, index, -1}; + } + + std::shared_ptr makeReader(const Request &) + { + return std::make_shared(queue); + } + + static Status getStatusOK() + { + return ::grpc::Status(); + } + + bool supportAsync(const Request &) const { return false; } + void makeAsyncReader( + const Request &, + std::shared_ptr &, + UnaryCallback *) const {} + + PacketQueuePtr queue; + std::vector field_types; +}; +using MockExchangeReceiver = ExchangeReceiverBase; +using MockWriterPtr = std::shared_ptr; +using MockExchangeReceiverInputStream = TiRemoteBlockInputStream; + +class TestTiRemoteBlockInputStream : public testing::Test +{ +protected: + void SetUp() override + { + dag_context_ptr = std::make_unique(1024); + dag_context_ptr->is_mpp_task = true; + dag_context_ptr->is_root_mpp_task = true; + dag_context_ptr->result_field_types = makeFields(); + dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; + context.setDAGContext(dag_context_ptr.get()); + } + +public: + TestTiRemoteBlockInputStream() + : context(TiFlashTestEnv::getContext()) + {} + + static Block squashBlocks(std::vector & blocks) + { + std::vector reference_block_vec; + SquashingTransform squash_transform(std::numeric_limits::max(), 0, ""); + for (auto & block : blocks) + squash_transform.add(std::move(block)); + Block empty; + auto result = squash_transform.add(std::move(empty)); + return result.block; + } + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + fields[i].set_flag(TiDB::ColumnFlagNotNull); + } + return fields; + } + + static DAGSchema makeSchema() + { + auto fields = makeFields(); + DAGSchema schema; + for (size_t i = 0; i < fields.size(); ++i) + { + ColumnInfo info = TiDB::fieldTypeToColumnInfo(fields[i]); + schema.emplace_back(String("col") + std::to_string(i), std::move(info)); + } + return schema; + } + + // Return a block with **rows** and 10 Int64 column. + static Block prepareBlock(size_t rows) + { + Block block; + for (size_t i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + auto int64_column = ColumnGenerator::instance().generate({rows, "Int64", RANDOM}).column; + block.insert(ColumnWithTypeAndName{ + std::move(int64_column), + int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + static void prepareBlocks( + std::vector & source_blocks, + bool empty_last_packet) + { + const size_t block_rows = 8192 * 3 / 8; + /// 61 is specially chosen so that the last packet contains 3072 rows, and then end of the queue + size_t block_num = empty_last_packet ? 60 : 61; + // 1. Build Blocks. + for (size_t i = 0; i < block_num; ++i) + source_blocks.emplace_back(prepareBlock(block_rows)); + } + + void prepareQueue( + std::shared_ptr & writer, + std::vector & source_blocks, + bool empty_last_packet) + { + prepareBlocks(source_blocks, empty_last_packet); + + const size_t batch_send_min_limit = 4096; + auto dag_writer = std::make_shared>( + writer, + batch_send_min_limit, + true, + *dag_context_ptr); + + // 2. encode all blocks + for (const auto & block : source_blocks) + dag_writer->write(block); + writer->add_summary = true; + dag_writer->finishWrite(); + } + + void prepareQueueV2( + std::shared_ptr & writer, + std::vector & source_blocks, + bool empty_last_packet) + { + dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; + prepareBlocks(source_blocks, empty_last_packet); + + const size_t batch_send_min_limit = 4096; + auto dag_writer = std::make_shared>( + writer, + 0, + batch_send_min_limit, + true, + *dag_context_ptr); + + // 2. encode all blocks + for (const auto & block : source_blocks) + dag_writer->write(block); + dag_writer->finishWrite(); + } + + void checkChunkInResponse( + std::vector & source_blocks, + std::vector & decoded_blocks, + std::shared_ptr & receiver_stream, + std::shared_ptr & writer) + { + /// Check Connection Info + auto infos = receiver_stream->getConnectionProfileInfos(); + ASSERT_EQ(infos.size(), 1); + ASSERT_EQ(infos[0].packets, writer->total_packets); + ASSERT_EQ(infos[0].bytes, writer->total_bytes); + + Block reference_block = squashBlocks(source_blocks); + Block decoded_block = squashBlocks(decoded_blocks); + ASSERT_EQ(receiver_stream->getTotalRows(), reference_block.rows()); + ASSERT_BLOCK_EQ(reference_block, decoded_block); + } + + void checkNoChunkInResponse( + std::vector & source_blocks, + std::vector & decoded_blocks, + std::shared_ptr & receiver_stream, + std::shared_ptr & writer) + { + /// Check Execution Summary + auto summary = receiver_stream->getRemoteExecutionSummaries(0); + ASSERT_EQ(summary->size(), 1); + ASSERT_EQ(summary->begin()->first, "Executor_0"); + ASSERT_TRUE(equalSummaries(writer->mockExecutionSummary(), summary->begin()->second)); + + /// Check Connection Info + auto infos = receiver_stream->getConnectionProfileInfos(); + ASSERT_EQ(infos.size(), 1); + ASSERT_EQ(infos[0].packets, writer->total_packets); + ASSERT_EQ(infos[0].bytes, writer->total_bytes); + + Block reference_block = squashBlocks(source_blocks); + Block decoded_block = squashBlocks(decoded_blocks); + ASSERT_BLOCK_EQ(reference_block, decoded_block); + ASSERT_EQ(receiver_stream->getTotalRows(), reference_block.rows()); + } + + std::shared_ptr makeExchangeReceiverInputStream( + PacketQueuePtr queue_ptr) + { + auto receiver = std::make_shared( + std::make_shared(queue_ptr, makeFields()), + 1, + 1, + "mock_req_id", + "mock_exchange_receiver_id", + 0); + auto receiver_stream = std::make_shared( + receiver, + "mock_req_id", + "executor_0", + 0); + return receiver_stream; + } + + void doTestNoChunkInResponse(bool empty_last_packet) + { + PacketQueuePtr queue_ptr = std::make_shared(1000); + std::vector source_blocks; + auto writer = std::make_shared(queue_ptr); + prepareQueue(writer, source_blocks, empty_last_packet); + queue_ptr->finish(); + + auto receiver_stream = makeExchangeReceiverInputStream(queue_ptr); + receiver_stream->readPrefix(); + std::vector decoded_blocks; + while (const auto & block = receiver_stream->read()) + decoded_blocks.emplace_back(block); + receiver_stream->readSuffix(); + checkNoChunkInResponse(source_blocks, decoded_blocks, receiver_stream, writer); + } + + void doTestChunkInResponse(bool empty_last_packet) + { + PacketQueuePtr queue_ptr = std::make_shared(1000); + std::vector source_blocks; + auto writer = std::make_shared(queue_ptr); + prepareQueueV2(writer, source_blocks, empty_last_packet); + queue_ptr->finish(); + auto receiver_stream = makeExchangeReceiverInputStream(queue_ptr); + receiver_stream->readPrefix(); + std::vector decoded_blocks; + while (const auto & block = receiver_stream->read()) + decoded_blocks.emplace_back(block); + receiver_stream->readSuffix(); + checkChunkInResponse(source_blocks, decoded_blocks, receiver_stream, writer); + } + + Context context; + std::unique_ptr dag_context_ptr; +}; + +TEST_F(TestTiRemoteBlockInputStream, testNoChunkInResponse) +try +{ + doTestNoChunkInResponse(true); + doTestNoChunkInResponse(false); +} +CATCH + +TEST_F(TestTiRemoteBlockInputStream, testChunksInResponse) +try +{ + doTestChunkInResponse(true); + doTestChunkInResponse(false); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 521e9549cb5..874b1f027d3 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -681,7 +681,7 @@ template DecodeDetail ExchangeReceiverBase::decodeChunks( const std::shared_ptr & recv_msg, std::queue & block_queue, - const Block & header) + std::unique_ptr & decoder_ptr) { assert(recv_msg != nullptr); DecodeDetail detail; @@ -692,40 +692,74 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( // Record total packet size even if fine grained shuffle is enabled. detail.packet_bytes = packet.ByteSizeLong(); - for (const String * chunk : recv_msg->chunks) { - Block block = CHBlockChunkCodec::decode(*chunk, header); - detail.rows += block.rows(); - if (unlikely(block.rows() == 0)) + auto result = decoder_ptr->decodeAndSquash(*chunk); + if (!result) continue; - block_queue.push(std::move(block)); + detail.rows += result->rows(); + if likely (result->rows() > 0) + { + block_queue.push(std::move(result.value())); + } } return detail; } template -ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queue & block_queue, const Block & header, size_t stream_id) +ExchangeReceiverResult ExchangeReceiverBase::nextResult( + std::queue & block_queue, + const Block & header, + size_t stream_id, + std::unique_ptr & decoder_ptr) { if (unlikely(stream_id >= msg_channels.size())) { LOG_ERROR(exc_log, "stream_id out of range, stream_id: {}, total_stream_count: {}", stream_id, msg_channels.size()); return ExchangeReceiverResult::newError(0, "", "stream_id out of range"); } + std::shared_ptr recv_msg; if (msg_channels[stream_id]->pop(recv_msg) != MPMCQueueResult::OK) { - std::unique_lock lock(mu); - return state != ExchangeReceiverState::NORMAL - ? ExchangeReceiverResult::newError(0, name, constructStatusString(state, err_msg)) - : ExchangeReceiverResult::newEOF(name); /// live_connections == 0, msg_channel is finished, and state is NORMAL, that is the end. + return handleUnnormalChannel(block_queue, decoder_ptr); } else { assert(recv_msg != nullptr); if (unlikely(recv_msg->error_ptr != nullptr)) return ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg()); - return toDecodeResult(block_queue, header, recv_msg); + return toDecodeResult(block_queue, header, recv_msg, decoder_ptr); + } +} + +template +ExchangeReceiverResult ExchangeReceiverBase::handleUnnormalChannel( + std::queue & block_queue, + std::unique_ptr & decoder_ptr) +{ + std::optional last_block = decoder_ptr->flush(); + std::unique_lock lock(mu); + if (this->state != DB::ExchangeReceiverState::NORMAL) + { + return DB::ExchangeReceiverResult::newError(0, DB::ExchangeReceiverBase::name, DB::constructStatusString(this->state, this->err_msg)); + } + else + { + /// If there are cached data in squashDecoder, then just push the block and return EOF next iteration + if (last_block && last_block->rows() > 0) + { + /// Can't get correct caller_index here, use 0 instead + auto result = ExchangeReceiverResult::newOk(nullptr, 0, ""); + result.decode_detail.packets = 0; + result.decode_detail.rows = last_block->rows(); + block_queue.push(std::move(last_block.value())); + return result; + } + else + { + return DB::ExchangeReceiverResult::newEOF(DB::ExchangeReceiverBase::name); /// live_connections == 0, msg_channel is finished, and state is NORMAL, that is the end. + } } } @@ -733,7 +767,8 @@ template ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( std::queue & block_queue, const Block & header, - const std::shared_ptr & recv_msg) + const std::shared_ptr & recv_msg, + std::unique_ptr & decoder_ptr) { assert(recv_msg != nullptr); if (recv_msg->resp_ptr != nullptr) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries. @@ -757,7 +792,7 @@ ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( } else if (!recv_msg->chunks.empty()) { - result.decode_detail = decodeChunks(recv_msg, block_queue, header); + result.decode_detail = decodeChunks(recv_msg, block_queue, decoder_ptr); } return result; } @@ -765,7 +800,7 @@ ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( else /// the non-last packets { auto result = ExchangeReceiverResult::newOk(nullptr, recv_msg->source_index, recv_msg->req_info); - result.decode_detail = decodeChunks(recv_msg, block_queue, header); + result.decode_detail = decodeChunks(recv_msg, block_queue, decoder_ptr); return result; } } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index b1c4caa0a80..37c905ffaf3 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -142,7 +143,8 @@ class ExchangeReceiverBase ExchangeReceiverResult nextResult( std::queue & block_queue, const Block & header, - size_t stream_id); + size_t stream_id, + std::unique_ptr & decoder_ptr); size_t getSourceNum() const { return source_num; } uint64_t getFineGrainedShuffleStreamCount() const { return fine_grained_shuffle_stream_count; } @@ -177,10 +179,14 @@ class ExchangeReceiverBase bool setEndState(ExchangeReceiverState new_state); String getStatusString(); + ExchangeReceiverResult handleUnnormalChannel( + std::queue & block_queue, + std::unique_ptr & decoder_ptr); + DecodeDetail decodeChunks( const std::shared_ptr & recv_msg, std::queue & block_queue, - const Block & header); + std::unique_ptr & decoder_ptr); void connectionDone( bool meet_error, @@ -193,7 +199,8 @@ class ExchangeReceiverBase ExchangeReceiverResult toDecodeResult( std::queue & block_queue, const Block & header, - const std::shared_ptr & recv_msg); + const std::shared_ptr & recv_msg, + std::unique_ptr & decoder_ptr); private: std::shared_ptr rpc_context; diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp index 03fd2804fb3..68799368860 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include #include @@ -84,7 +83,6 @@ void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & c execId(), /*stream_id=*/enable_fine_grained_shuffle ? i : 0); exchange_receiver_io_input_streams.push_back(stream); - stream = std::make_shared(stream, 8192, 0, log->identifier()); stream->setExtraInfo(extra_info); pipeline.streams.push_back(stream); }