Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize data size of Broadcast / Passthrough exchange operator #6880

Merged
merged 47 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1483c51
x
solotzg Feb 13, 2023
6e9e3fb
wip
solotzg Feb 15, 2023
1bbb013
1
solotzg Feb 15, 2023
e67e6c6
2
solotzg Feb 15, 2023
6d07c6c
3: hack tests
solotzg Feb 15, 2023
e01e939
4
solotzg Feb 15, 2023
a89d417
5
solotzg Feb 15, 2023
5b9ccdc
6
solotzg Feb 15, 2023
1d9bd32
7
solotzg Feb 20, 2023
64172b8
8
solotzg Feb 20, 2023
e224da1
9
solotzg Feb 20, 2023
20e7167
10
solotzg Feb 20, 2023
9b13aeb
11
solotzg Feb 20, 2023
49d2d16
12
solotzg Feb 21, 2023
a79e60d
13
solotzg Feb 21, 2023
a72f995
14
solotzg Feb 21, 2023
d26c6eb
15
solotzg Feb 21, 2023
e0a5c6a
16
solotzg Feb 21, 2023
1c48c0e
17
solotzg Feb 21, 2023
aa6e719
18
solotzg Feb 21, 2023
be78910
19
solotzg Feb 22, 2023
f0643dc
20
solotzg Feb 22, 2023
fae2812
21
solotzg Feb 22, 2023
7856dfc
22
solotzg Feb 22, 2023
a1dba62
23
solotzg Feb 22, 2023
4e84fa4
fix typo
solotzg Feb 24, 2023
da5cfc6
reduce memory usage
solotzg Feb 28, 2023
3e6af50
24
solotzg Mar 1, 2023
1be9771
25
solotzg Mar 1, 2023
f5f0ec8
use mpp version 0 as default in StorageDisaggregated
solotzg Mar 1, 2023
1c42148
26
solotzg Mar 2, 2023
fd1ec17
refine code
solotzg Mar 2, 2023
8709113
27
solotzg Mar 2, 2023
38ce6e9
Merge remote-tracking branch 'pingcap/master' into bc-pass-exchange-c…
solotzg Mar 2, 2023
c1d2884
28
solotzg Mar 2, 2023
41d20fb
Merge branch 'master' into bc-pass-exchange-compress
solotzg Mar 2, 2023
0964359
29
solotzg Mar 2, 2023
52a2de2
Update MPPTunnelSetWriter.cpp
solotzg Mar 2, 2023
5a5eda0
30
solotzg Mar 3, 2023
c106983
31
solotzg Mar 3, 2023
3b778c3
32
solotzg Mar 3, 2023
7864b03
Merge branch 'master' into bc-pass-exchange-compress
solotzg Mar 3, 2023
9cc0b9b
Merge branch 'master' into bc-pass-exchange-compress
solotzg Mar 3, 2023
eafe6ad
33
solotzg Mar 3, 2023
f71ccb2
Merge remote-tracking branch 'origin/bc-pass-exchange-compress' into …
solotzg Mar 3, 2023
ccb115c
Merge branch 'master' into bc-pass-exchange-compress
solotzg Mar 3, 2023
de15156
Merge branch 'master' into bc-pass-exchange-compress
ti-chi-bot Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ check_then_add_sources_compile_flag (
src/Columns/ColumnsCommon.cpp
src/Columns/ColumnVector.cpp
src/DataTypes/DataTypeString.cpp
src/Interpreters/Join.cpp
)

list (APPEND tiflash_common_io_sources ${CONFIG_BUILD})
Expand Down
26 changes: 16 additions & 10 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,22 @@ namespace DB
F(type_mpp_establish_conn, {{"type", "mpp_tunnel"}}), \
F(type_mpp_establish_conn_local, {{"type", "mpp_tunnel_local"}}), \
F(type_cancel_mpp_task, {{"type", "cancel_mpp_task"}})) \
M(tiflash_exchange_data_bytes, "Total bytes sent by exchange operators", Counter, \
F(type_hash_original, {"type", "hash_original"}), /*the original data size by hash exchange*/ \
F(type_hash_none_compression_remote, {"type", "hash_none_compression_remote"}), /*the remote exchange data size by hash partition with no compression*/\
F(type_hash_none_compression_local, {"type", "hash_none_compression_local"}), /*the local exchange data size by hash partition with no compression*/ \
F(type_hash_lz4_compression, {"type", "hash_lz4_compression"}), /*the exchange data size by hash partition with lz4 compression*/ \
F(type_hash_zstd_compression, {"type", "hash_zstd_compression"}), /*the exchange data size by hash partition with zstd compression*/ \
F(type_broadcast_passthrough_original, {"type", "broadcast_passthrough_original"}), /*the original exchange data size by broadcast/passthough*/ \
F(type_broadcast_passthrough_none_compression_local, {"type", "broadcast_passthrough_none_compression_local"}), /*the local exchange data size by broadcast/passthough with no compression*/ \
F(type_broadcast_passthrough_none_compression_remote, {"type", "broadcast_passthrough_none_compression_remote"}), /*the remote exchange data size by broadcast/passthough with no compression*/ \
) \
M(tiflash_exchange_data_bytes, "Total bytes sent by exchange operators", Counter, \
F(type_hash_original, {"type", "hash_original"}), \
F(type_hash_none_compression_remote, {"type", "hash_none_compression_remote"}), \
F(type_hash_none_compression_local, {"type", "hash_none_compression_local"}), \
F(type_hash_lz4_compression, {"type", "hash_lz4_compression"}), \
F(type_hash_zstd_compression, {"type", "hash_zstd_compression"}), \
F(type_broadcast_original, {"type", "broadcast_original"}), \
F(type_broadcast_none_compression_local, {"type", "broadcast_none_compression_local"}), \
F(type_broadcast_none_compression_remote, {"type", "broadcast_none_compression_remote"}), \
F(type_broadcast_lz4_compression, {"type", "broadcast_lz4_compression"}), \
F(type_broadcast_zstd_compression, {"type", "broadcast_zstd_compression"}), \
F(type_passthrough_original, {"type", "passthrough_original"}), \
F(type_passthrough_none_compression_local, {"type", "passthrough_none_compression_local"}), \
F(type_passthrough_none_compression_remote, {"type", "passthrough_none_compression_remote"}), \
F(type_passthrough_lz4_compression, {"type", "passthrough_lz4_compression"}), \
F(type_passthrough_zstd_compression, {"type", "passthrough_zstd_compression"})) \
M(tiflash_schema_version, "Current version of tiflash cached schema", Gauge) \
M(tiflash_schema_applying, "Whether the schema is applying or not (holding lock)", Gauge) \
M(tiflash_schema_apply_count, "Total number of each kinds of apply", Counter, F(type_diff, {"type", "diff"}), \
Expand Down
44 changes: 44 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ struct CHBlockChunkCodecV1Impl
{
return encodeImpl(blocks, compression_method);
}
CHBlockChunkCodecV1::EncodeRes encode(std::vector<Block> && blocks, CompressionMethod compression_method)
{
return encodeImpl(std::move(blocks), compression_method);
}

static const ColumnPtr & toColumnPtr(const Columns & c, size_t index)
{
Expand All @@ -269,6 +273,10 @@ struct CHBlockChunkCodecV1Impl
{
return block.getByPosition(index).column;
}
static ColumnPtr toColumnPtr(Block && block, size_t index)
{
return std::move(block.getByPosition(index).column);
}

template <typename ColumnsHolder>
static size_t getRows(ColumnsHolder && columns_holder)
Expand Down Expand Up @@ -349,6 +357,13 @@ struct CHBlockChunkCodecV1Impl
return encodeColumnImpl(block, ostr_ptr);
}
void encodeColumn(const std::vector<Block> & blocks, WriteBuffer * ostr_ptr)
{
for (auto && block : blocks)
{
encodeColumnImpl(block, ostr_ptr);
}
}
void encodeColumn(std::vector<Block> && blocks, WriteBuffer * ostr_ptr)
{
for (auto && block : blocks)
{
Expand Down Expand Up @@ -495,6 +510,19 @@ CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(const std::vector<Blo
return CHBlockChunkCodecV1Impl{*this}.encode(blocks, compression_method);
}

CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(std::vector<Block> && blocks, CompressionMethod compression_method, bool check_schema)
{
if (check_schema)
{
for (auto && block : blocks)
{
checkSchema(header, block);
}
}

return CHBlockChunkCodecV1Impl{*this}.encode(std::move(blocks), compression_method);
}

static Block decodeCompression(const Block & header, ReadBuffer & istr)
{
size_t decoded_rows{};
Expand All @@ -504,6 +532,22 @@ static Block decodeCompression(const Block & header, ReadBuffer & istr)
return decoded_block;
}

template <typename Buffer>
extern size_t CompressionEncode(
std::string_view source,
const CompressionSettings & compression_settings,
Buffer & compressed_buffer);

CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(std::string_view str, CompressionMethod compression_method)
{
assert(compression_method != CompressionMethod::NONE);

String compressed_buffer;
size_t compressed_size = CompressionEncode(str, CompressionSettings(compression_method), compressed_buffer);
compressed_buffer.resize(compressed_size);
return compressed_buffer;
}

Block CHBlockChunkCodecV1::decode(const Block & header, std::string_view str)
{
assert(!str.empty());
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ struct CHBlockChunkCodecV1 : boost::noncopyable
EncodeRes encode(std::vector<Columns> && columns, CompressionMethod compression_method);
EncodeRes encode(const Block & block, CompressionMethod compression_method, bool check_schema = true);
EncodeRes encode(const std::vector<Block> & blocks, CompressionMethod compression_method, bool check_schema = true);
EncodeRes encode(std::vector<Block> && blocks, CompressionMethod compression_method, bool check_schema = true);
//
solotzg marked this conversation as resolved.
Show resolved Hide resolved
static EncodeRes encode(std::string_view str, CompressionMethod compression_method);
static Block decode(const Block & header, std::string_view str);
};

Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,41 @@ TEST(CHBlockChunkCodec, ChunkCodecV1)
auto decoded_block = CHBlockChunkCodecV1::decode(header, str);
ASSERT_EQ(total_rows, decoded_block.rows());
}
{
std::vector<Block> blocks_to_move;
blocks_to_move.reserve(blocks.size());
for (auto && block : blocks)
{
blocks_to_move.emplace_back(block);
}
for (auto && block : blocks_to_move)
{
for (auto && col : block)
{
ASSERT_TRUE(col.column);
}
}
auto codec = CHBlockChunkCodecV1{
header,
};
auto str = codec.encode(std::move(blocks_to_move), mode);
for (auto && block : blocks_to_move)
{
ASSERT_EQ(block.rows(), 0);
}
ASSERT_FALSE(str.empty());
ASSERT_EQ(codec.encoded_rows, total_rows);

if (mode == CompressionMethod::NONE)
ASSERT_EQ(codec.compressed_size, 0);
else
ASSERT_NE(codec.compressed_size, 0);

ASSERT_NE(codec.original_size, 0);

auto decoded_block = CHBlockChunkCodecV1::decode(header, str);
ASSERT_EQ(total_rows, decoded_block.rows());
}
{
auto columns = prepareBlock(rows).getColumns();
auto codec = CHBlockChunkCodecV1{
Expand Down Expand Up @@ -179,5 +214,19 @@ TEST(CHBlockChunkCodec, ChunkCodecV1)
}
test_enocde_release_data(std::move(batch_columns), header, total_rows);
}
{
auto source_str = CHBlockChunkCodecV1{header}.encode(blocks.front(), CompressionMethod::NONE);
ASSERT_FALSE(source_str.empty());
ASSERT_EQ(static_cast<CompressionMethodByte>(source_str[0]), CompressionMethodByte::NONE);

for (auto mode : {CompressionMethod::LZ4, CompressionMethod::ZSTD})
{
auto compressed_str_a = CHBlockChunkCodecV1::encode({&source_str[1], source_str.size() - 1}, mode);
auto compressed_str_b = CHBlockChunkCodecV1{header}.encode(blocks.front(), mode);

ASSERT_EQ(compressed_str_a, compressed_str_b);
}
}
}

} // namespace DB::tests
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,48 @@ struct MockWriter
return summary;
}

void broadcastOrPassThroughWrite(Blocks & blocks)
void broadcastOrPassThroughWriteV0(Blocks & blocks)
{
auto && packet = MPPTunnelSetHelper::ToPacketV0(blocks, result_field_types);
++total_packets;
if (!packet)
return;

if (!packet->packet.chunks().empty())
total_bytes += packet->packet.ByteSizeLong();
total_bytes += packet->packet.ByteSizeLong();
queue->push(std::move(packet));
}

void broadcastWrite(Blocks & blocks)
{
return broadcastOrPassThroughWriteV0(blocks);
}
void passThroughWrite(Blocks & blocks)
{
return broadcastOrPassThroughWriteV0(blocks);
}
void broadcastOrPassThroughWrite(Blocks & blocks, MPPDataPacketVersion version, CompressionMethod compression_method)
{
if (version == MPPDataPacketV0)
return broadcastOrPassThroughWriteV0(blocks);

size_t original_size{};
auto && packet = MPPTunnelSetHelper::ToPacket(std::move(blocks), version, compression_method, original_size);
++total_packets;
if (!packet)
return;

total_bytes += packet->packet.ByteSizeLong();
queue->push(std::move(packet));
}
void broadcastWrite(Blocks & blocks, MPPDataPacketVersion version, CompressionMethod compression_method)
{
return broadcastOrPassThroughWrite(blocks, version, compression_method);
}
void passThroughWrite(Blocks & blocks, MPPDataPacketVersion version, CompressionMethod compression_method)
{
return broadcastOrPassThroughWrite(blocks, version, compression_method);
}

void write(tipb::SelectResponse & response)
{
if (add_summary)
Expand All @@ -119,10 +150,6 @@ struct MockWriter
queue->push(tracked_packet);
}
uint16_t getPartitionNum() const { return 1; }
bool isLocal(size_t index) const
{
return index == 0;
}
bool isReadyForWrite() const { throw Exception("Unsupport async write"); }

std::vector<tipb::FieldType> result_field_types;
Expand Down Expand Up @@ -357,7 +384,10 @@ class TestTiRemoteBlockInputStream : public testing::Test
auto dag_writer = std::make_shared<BroadcastOrPassThroughWriter<MockWriterPtr>>(
writer,
batch_send_min_limit,
*dag_context_ptr);
*dag_context_ptr,
MPPDataPacketVersion::MPPDataPacketV1,
tipb::CompressionMode::FAST,
tipb::ExchangeType::Broadcast);

// 2. encode all blocks
for (const auto & block : source_blocks)
Expand Down
45 changes: 42 additions & 3 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/BroadcastOrPassThroughWriter.h>
#include <Flash/Mpp/MPPTunnelSetWriter.h>
Expand All @@ -24,13 +25,41 @@ template <class ExchangeWriterPtr>
BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
ExchangeWriterPtr writer_,
Int64 batch_send_min_limit_,
DAGContext & dag_context_)
DAGContext & dag_context_,
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_)
: DAGResponseWriter(/*records_per_chunk=*/-1, dag_context_)
, batch_send_min_limit(batch_send_min_limit_)
, writer(writer_)
, exchange_type(exchange_type_)
, data_codec_version(data_codec_version_)
, compression_method(ToInternalCompressionMethod(compression_mode_))
{
rows_in_blocks = 0;
RUNTIME_CHECK(dag_context.encode_type == tipb::EncodeType::TypeCHBlock);
RUNTIME_CHECK(exchange_type == tipb::ExchangeType::Broadcast || exchange_type == tipb::ExchangeType::PassThrough);

switch (data_codec_version)
{
case MPPDataPacketV0:
break;
case MPPDataPacketV1:
default:
{
// make `batch_send_min_limit` always GT 0
if (batch_send_min_limit <= 0)
{
// set upper limit if not specified
batch_send_min_limit = 8 * 1024 /* 8K */;
}
for (const auto & field_type : dag_context.result_field_types)
{
expected_types.emplace_back(getDataTypeByFieldTypeForComputingLayer(field_type));
}
break;
}
}
}

template <class ExchangeWriterPtr>
Expand Down Expand Up @@ -66,10 +95,20 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::writeBlocks()
{
if (unlikely(blocks.empty()))
if unlikely (blocks.empty())
return;

writer->broadcastOrPassThroughWrite(blocks);
// check schema
if (!expected_types.empty())
{
for (auto && block : blocks)
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter");
}
Comment on lines +102 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

Suggested change
if (!expected_types.empty())
{
for (auto && block : blocks)
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter");
}
#ifndef NDEBUG
if (!expected_types.empty())
{
for (auto && block : blocks)
assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter");
}
#endif

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertBlockSchema is always neccesary when using compression because compression codec process will not check the expected types.


if (exchange_type == tipb::ExchangeType::Broadcast)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using template?

template <class ExchangeWriterPtr, bool is_broadcast>
class BroadcastOrPassThroughWriter : public DAGResponseWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also an optional way.

writer->broadcastWrite(blocks, data_codec_version, compression_method);
else
writer->passThroughWrite(blocks, data_codec_version, compression_method);
blocks.clear();
rows_in_blocks = 0;
}
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
namespace DB
{
class DAGContext;
enum class CompressionMethod;
enum MPPDataPacketVersion : int64_t;

template <class ExchangeWriterPtr>
class BroadcastOrPassThroughWriter : public DAGResponseWriter
Expand All @@ -30,7 +32,10 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
BroadcastOrPassThroughWriter(
ExchangeWriterPtr writer_,
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
DAGContext & dag_context_,
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
void write(const Block & block) override;
bool isReadyForWrite() const override;
void flush() override;
Expand All @@ -43,6 +48,12 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
ExchangeWriterPtr writer;
std::vector<Block> blocks;
size_t rows_in_blocks;
const tipb::ExchangeType exchange_type;

// support data compression
DataTypes expected_types;
MPPDataPacketVersion data_codec_version;
CompressionMethod compression_method{};
};

} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnelSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ void MPPTunnelSetBase<Tunnel>::registerTunnel(const MPPTaskId & receiver_task_id
{
++external_thread_cnt;
}
if (tunnel->isLocal())
{
++local_tunnel_cnt;
}
}

template <typename Tunnel>
Expand Down
Loading