Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exchange receiver decode optimization to do squashing work at the same time #6202

Merged
merged 14 commits into from
Nov 2, 2022
31 changes: 3 additions & 28 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
#include <fmt/core.h>

#include <ext/range.h>


Expand All @@ -31,32 +30,8 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
} // namespace ErrorCodes

namespace
{
void checkColumnSize(size_t expected, size_t actual)
{
if (expected != actual)
throw Exception(
fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual),
ErrorCodes::LOGICAL_ERROR);
}

void checkDataTypeName(size_t column_index, const String & expected, const String & actual)
{
if (expected != actual)
throw Exception(
fmt::format(
"NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}",
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
}
} // namespace

NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_,
UInt64 server_revision_,
Expand Down Expand Up @@ -179,9 +154,9 @@ Block NativeBlockInputStream::readImpl()
}

if (header)
checkColumnSize(header.columns(), columns);
CodecUtils::checkColumnSize(header.columns(), columns);
else if (!output_names.empty())
checkColumnSize(output_names.size(), columns);
CodecUtils::checkColumnSize(output_names.size(), columns);

for (size_t i = 0; i < columns; ++i)
{
Expand All @@ -208,7 +183,7 @@ Block NativeBlockInputStream::readImpl()
readBinary(type_name, istr);
if (header)
{
checkDataTypeName(i, header_datatypes[i].name, type_name);
CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
Expand Down
15 changes: 2 additions & 13 deletions dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <IO/CompressedReadBufferFromFile.h>
#include <Flash/Coprocessor/CodecUtils.h>

namespace DB
{
Expand Down Expand Up @@ -116,19 +117,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream
Block header;
UInt64 server_revision;
bool align_column_name_with_header = false;

struct DataTypeWithTypeName
{
DataTypeWithTypeName(const DataTypePtr & t, const String & n)
: type(t)
, name(n)
{
}

DataTypePtr type;
String name;
};
std::vector<DataTypeWithTypeName> header_datatypes;
std::vector<CodecUtils::DataTypeWithTypeName> header_datatypes;

bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it;
Expand Down
88 changes: 49 additions & 39 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/IChunkDecodeAndSquash.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -64,6 +65,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
// CoprocessorBlockInputStream doesn't take care of this.
size_t stream_id;

std::unique_ptr<IChunkDecodeAndSquash> decoder_ptr;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
Expand All @@ -84,6 +87,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
if (resp.execution_summaries_size() == 0)
return;

if (!execution_summaries_inited[index].load())
{
initRemoteExecutionSummaries(resp, index);
Expand Down Expand Up @@ -128,51 +132,54 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
while (true)
{
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
if (result.meet_error)
{
LOG_WARNING(log, "remote reader meets error: {}", result.error_msg);
throw Exception(result.error_msg);
}
if (result.eof)
return false;
if (result.resp != nullptr && result.resp->has_error())
auto results = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr);
for (auto & result : results)
{
LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString());
throw Exception(result.resp->error().DebugString());
}
/// only the last response contains execution summaries
if (result.resp != nullptr)
{
if constexpr (is_streaming_reader)
if (result.meet_error)
{
addRemoteExecutionSummaries(*result.resp, result.call_index, true);
LOG_WARNING(log, "remote reader meets error: {}", result.error_msg);
throw Exception(result.error_msg);
}
else
if (result.eof)
return false;
if (result.resp != nullptr && result.resp->has_error())
{
addRemoteExecutionSummaries(*result.resp, 0, false);
LOG_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString());
throw Exception(result.resp->error().DebugString());
}
}
/// only the last response contains execution summaries
if (result.resp != nullptr)
{
if constexpr (is_streaming_reader)
{
addRemoteExecutionSummaries(*result.resp, result.call_index, true);
}
else
{
addRemoteExecutionSummaries(*result.resp, 0, false);
}
}

const auto & decode_detail = result.decode_detail;

const auto & decode_detail = result.decode_detail;

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

++connection_profile_infos[index].packets;
connection_profile_infos[index].bytes += decode_detail.packet_bytes;

total_rows += decode_detail.rows;
LOG_TRACE(
log,
"recv {} rows from remote for {}, total recv row num: {}",
decode_detail.rows,
result.req_info,
total_rows);
if (decode_detail.rows > 0)
return true;
// else continue
size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

++connection_profile_infos[index].packets;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
connection_profile_infos[index].bytes += decode_detail.packet_bytes;

total_rows += decode_detail.rows;
LOG_TRACE(
log,
"recv {} rows from remote for {}, total recv row num: {}",
decode_detail.rows,
result.req_info,
total_rows);
if (decode_detail.rows > 0)
return true;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
// else continue
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand All @@ -193,6 +200,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
if constexpr (is_streaming_reader)
decoder_ptr = std::make_unique<CHBlockChunkDecodeAndSquash>(sample_block, 8192);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
}

Block getHeader() const override { return sample_block; }
Expand Down Expand Up @@ -222,6 +231,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr;
}

size_t getTotalRows() const { return total_rows; }
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
size_t getSourceNum() const { return source_num; }
bool isStreamingCall() const { return is_streaming_reader; }
const std::vector<ConnectionProfileInfo> & getConnectionProfileInfos() const { return connection_profile_infos; }
Expand Down
99 changes: 91 additions & 8 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <IO/ReadBufferFromString.h>
#include <DataTypes/DataTypeFactory.h>

namespace DB
{
Expand Down Expand Up @@ -48,6 +49,20 @@ class CHBlockChunkCodecStream : public ChunkCodecStream
DataTypes expected_types;
};

CHBlockChunkCodec::CHBlockChunkCodec(
const Block & header_)
: header(header_)
{
for (const auto & column : header)
header_datatypes.emplace_back(column.type, column.type->getName());
}

CHBlockChunkCodec::CHBlockChunkCodec(const DAGSchema & schema)
{
for (const auto & c : schema)
output_names.push_back(c.first);
}

size_t getExtraInfoSize(const Block & block)
{
size_t size = 64; /// to hold some length of structures, such as column number, row number...
Expand Down Expand Up @@ -83,6 +98,14 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o
type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
}

void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows)
{
IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) {
return &istr;
};
type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {});
}

void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
/// only check block schema in CHBlock codec because for both
Expand Down Expand Up @@ -120,21 +143,81 @@ std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::v
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr, size_t reserve_size)
{
Block res;
if (istr.eof())
{
return res;
}

/// Dimensions
size_t columns = 0;
size_t rows = 0;
readBlockMeta(istr, columns, rows);

for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName column;
readColumnMeta(i, istr, column);

/// Data
MutableColumnPtr read_column = column.type->createColumn();
if (reserve_size > 0)
read_column->reserve(reserve_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reserve rows if reserve_size <= 0 and if reserve_size >0, reserve std::max(rows, reserve_size) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, Done.


if (rows) /// If no rows, nothing to read.
readData(*column.type, *read_column, istr, rows);

column.column = std::move(read_column);
res.insert(std::move(column));
}
return res;
}
void CHBlockChunkCodec::readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const
{
readVarUInt(columns, istr);
readVarUInt(rows, istr);

if (header)
CodecUtils::checkColumnSize(header.columns(), columns);
else if (!output_names.empty())
CodecUtils::checkColumnSize(output_names.size(), columns);
}

void CHBlockChunkCodec::readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column)
{
/// Name
readBinary(column.name, istr);
if (header)
column.name = header.getByPosition(i).name;
else if (!output_names.empty())
column.name = output_names[i];

/// Type
String type_name;
readBinary(type_name, istr);
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
if (header)
{
CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
{
column.type = data_type_factory.get(type_name);
}
}

Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema)
{
ReadBufferFromString read_buffer(str);
std::vector<String> output_names;
for (const auto & c : schema)
output_names.push_back(c.first);
NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names));
return block_in.read();
return CHBlockChunkCodec(schema).decodeImpl(read_buffer);
}

Block CHBlockChunkCodec::decode(const String & str, const Block & header)
{
ReadBufferFromString read_buffer(str);
NativeBlockInputStream block_in(read_buffer, header, 0, /*align_column_name_with_header=*/true);
return block_in.read();
return CHBlockChunkCodec(header).decodeImpl(read_buffer);
}

} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,34 @@
#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/CodecUtils.h>

namespace DB
{
class CHBlockChunkCodec : public ChunkCodec
class CHBlockChunkDecodeAndSquash;

class CHBlockChunkCodec final : public ChunkCodec
{
public:
CHBlockChunkCodec() = default;
CHBlockChunkCodec(const Block & header_);
CHBlockChunkCodec(const DAGSchema & schema);

Block decode(const String &, const DAGSchema & schema) override;
static Block decode(const String &, const Block & header);
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;

private:
friend class CHBlockChunkDecodeAndSquash;
void readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column);
void readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const;
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows);
/// 'reserve_size' used for Squash usage, and takes effect when 'reserve_size' > 0
Block decodeImpl(ReadBuffer & istr, size_t reserve_size = 0);

Block header;
std::vector<CodecUtils::DataTypeWithTypeName> header_datatypes;
std::vector<String> output_names;
};

} // namespace DB
Loading