Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exchange receiver decode optimization to do squashing work at the same time #6202

Merged
merged 14 commits into from
Nov 2, 2022
30 changes: 3 additions & 27 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,8 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
} // namespace ErrorCodes

namespace
{
void checkColumnSize(size_t expected, size_t actual)
{
if (expected != actual)
throw Exception(
fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual),
ErrorCodes::LOGICAL_ERROR);
}

void checkDataTypeName(size_t column_index, const String & expected, const String & actual)
{
if (expected != actual)
throw Exception(
fmt::format(
"NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}",
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
}
} // namespace

NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_,
UInt64 server_revision_,
Expand Down Expand Up @@ -179,9 +155,9 @@ Block NativeBlockInputStream::readImpl()
}

if (header)
checkColumnSize(header.columns(), columns);
CodecUtils::checkColumnSize(header.columns(), columns);
else if (!output_names.empty())
checkColumnSize(output_names.size(), columns);
CodecUtils::checkColumnSize(output_names.size(), columns);

for (size_t i = 0; i < columns; ++i)
{
Expand All @@ -208,7 +184,7 @@ Block NativeBlockInputStream::readImpl()
readBinary(type_name, istr);
if (header)
{
checkDataTypeName(i, header_datatypes[i].name, type_name);
CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
Expand Down
17 changes: 3 additions & 14 deletions dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/PODArray.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <IO/CompressedReadBufferFromFile.h>

namespace DB
Expand Down Expand Up @@ -116,27 +117,15 @@ class NativeBlockInputStream : public IProfilingBlockInputStream
Block header;
UInt64 server_revision;
bool align_column_name_with_header = false;

struct DataTypeWithTypeName
{
DataTypeWithTypeName(const DataTypePtr & t, const String & n)
: type(t)
, name(n)
{
}

DataTypePtr type;
String name;
};
std::vector<DataTypeWithTypeName> header_datatypes;
std::vector<CodecUtils::DataTypeWithTypeName> header_datatypes;

bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it;
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;

/// If an index is specified, then `istr` must be CompressedReadBufferFromFile.
CompressedReadBufferFromFile<> * istr_concrete;
CompressedReadBufferFromFile<> * istr_concrete = nullptr;

PODArray<double> avg_value_size_hints;

Expand Down
15 changes: 12 additions & 3 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/IChunkDecodeAndSquash.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -64,6 +65,8 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
// CoprocessorBlockInputStream doesn't take care of this.
size_t stream_id;

std::unique_ptr<IChunkDecodeAndSquash> decoder_ptr;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
Expand All @@ -84,6 +87,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
if (resp.execution_summaries_size() == 0)
return;

if (!execution_summaries_inited[index].load())
{
initRemoteExecutionSummaries(resp, index);
Expand Down Expand Up @@ -128,7 +132,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
while (true)
{
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id, decoder_ptr);
if (result.meet_error)
{
LOG_WARNING(log, "remote reader meets error: {}", result.error_msg);
Expand All @@ -155,21 +159,22 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

const auto & decode_detail = result.decode_detail;
total_rows += decode_detail.rows;

size_t index = 0;
if constexpr (is_streaming_reader)
index = result.call_index;

++connection_profile_infos[index].packets;
connection_profile_infos[index].packets += decode_detail.packets;
connection_profile_infos[index].bytes += decode_detail.packet_bytes;

total_rows += decode_detail.rows;
LOG_TRACE(
log,
"recv {} rows from remote for {}, total recv row num: {}",
decode_detail.rows,
result.req_info,
total_rows);

if (decode_detail.rows > 0)
return true;
// else continue
Expand All @@ -193,6 +198,9 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
constexpr size_t squash_rows_limit = 8192;
if constexpr (is_streaming_reader)
decoder_ptr = std::make_unique<CHBlockChunkDecodeAndSquash>(sample_block, squash_rows_limit);
}

Block getHeader() const override { return sample_block; }
Expand Down Expand Up @@ -222,6 +230,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return execution_summaries_inited[index].load() ? &execution_summaries[index] : nullptr;
}

size_t getTotalRows() const { return total_rows; }
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
size_t getSourceNum() const { return source_num; }
bool isStreamingCall() const { return is_streaming_reader; }
const std::vector<ConnectionProfileInfo> & getConnectionProfileInfos() const { return connection_profile_infos; }
Expand Down
101 changes: 93 additions & 8 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/TiFlashException.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <IO/ReadBufferFromString.h>
Expand Down Expand Up @@ -48,6 +49,20 @@ class CHBlockChunkCodecStream : public ChunkCodecStream
DataTypes expected_types;
};

CHBlockChunkCodec::CHBlockChunkCodec(
const Block & header_)
: header(header_)
{
for (const auto & column : header)
header_datatypes.emplace_back(column.type, column.type->getName());
}

CHBlockChunkCodec::CHBlockChunkCodec(const DAGSchema & schema)
{
for (const auto & c : schema)
output_names.push_back(c.first);
}

size_t getExtraInfoSize(const Block & block)
{
size_t size = 64; /// to hold some length of structures, such as column number, row number...
Expand Down Expand Up @@ -83,6 +98,14 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o
type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
}

void CHBlockChunkCodec::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows)
{
IDataType::InputStreamGetter input_stream_getter = [&](const IDataType::SubstreamPath &) {
return &istr;
};
type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, 0, false, {});
}

void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
/// only check block schema in CHBlock codec because for both
Expand Down Expand Up @@ -120,21 +143,83 @@ std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::v
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decodeImpl(ReadBuffer & istr, size_t reserve_size)
{
Block res;
if (istr.eof())
{
return res;
}

/// Dimensions
size_t columns = 0;
size_t rows = 0;
readBlockMeta(istr, columns, rows);

for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName column;
readColumnMeta(i, istr, column);

/// Data
MutableColumnPtr read_column = column.type->createColumn();
if (reserve_size > 0)
read_column->reserve(std::max(rows, reserve_size));
else if (rows)
read_column->reserve(rows);

if (rows) /// If no rows, nothing to read.
readData(*column.type, *read_column, istr, rows);

column.column = std::move(read_column);
res.insert(std::move(column));
}
return res;
}
void CHBlockChunkCodec::readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const
{
readVarUInt(columns, istr);
readVarUInt(rows, istr);

if (header)
CodecUtils::checkColumnSize(header.columns(), columns);
else if (!output_names.empty())
CodecUtils::checkColumnSize(output_names.size(), columns);
}

void CHBlockChunkCodec::readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column)
{
/// Name
readBinary(column.name, istr);
if (header)
column.name = header.getByPosition(i).name;
else if (!output_names.empty())
column.name = output_names[i];

/// Type
String type_name;
readBinary(type_name, istr);
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
if (header)
{
CodecUtils::checkDataTypeName(i, header_datatypes[i].name, type_name);
column.type = header_datatypes[i].type;
}
else
{
column.type = data_type_factory.get(type_name);
}
}

Block CHBlockChunkCodec::decode(const String & str, const DAGSchema & schema)
{
ReadBufferFromString read_buffer(str);
std::vector<String> output_names;
for (const auto & c : schema)
output_names.push_back(c.first);
NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names));
return block_in.read();
return CHBlockChunkCodec(schema).decodeImpl(read_buffer);
}

Block CHBlockChunkCodec::decode(const String & str, const Block & header)
{
ReadBufferFromString read_buffer(str);
NativeBlockInputStream block_in(read_buffer, header, 0, /*align_column_name_with_header=*/true);
return block_in.read();
return CHBlockChunkCodec(header).decodeImpl(read_buffer);
}

} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Flash/Coprocessor/CHBlockChunkCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,34 @@
#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/CodecUtils.h>

namespace DB
{
class CHBlockChunkCodec : public ChunkCodec
class CHBlockChunkDecodeAndSquash;

class CHBlockChunkCodec final : public ChunkCodec
{
public:
CHBlockChunkCodec() = default;
CHBlockChunkCodec(const Block & header_);
CHBlockChunkCodec(const DAGSchema & schema);

Block decode(const String &, const DAGSchema & schema) override;
static Block decode(const String &, const Block & header);
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;

private:
friend class CHBlockChunkDecodeAndSquash;
void readColumnMeta(size_t i, ReadBuffer & istr, ColumnWithTypeAndName & column);
void readBlockMeta(ReadBuffer & istr, size_t & columns, size_t & rows) const;
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows);
/// 'reserve_size' used for Squash usage, and takes effect when 'reserve_size' > 0
Block decodeImpl(ReadBuffer & istr, size_t reserve_size = 0);

Block header;
std::vector<CodecUtils::DataTypeWithTypeName> header_datatypes;
std::vector<String> output_names;
};

} // namespace DB
49 changes: 49 additions & 0 deletions dbms/src/Flash/Coprocessor/CodecUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/CodecUtils.h>
#include <Flash/Coprocessor/DAGUtils.h>

namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

namespace CodecUtils
{
void checkColumnSize(size_t expected, size_t actual)
{
if (expected != actual)
yibin87 marked this conversation as resolved.
Show resolved Hide resolved
throw Exception(
fmt::format("NativeBlockInputStream schema mismatch, expected {}, actual {}.", expected, actual),
ErrorCodes::LOGICAL_ERROR);
}

void checkDataTypeName(size_t column_index, const String & expected, const String & actual)
{
if (expected != actual)
yibin87 marked this conversation as resolved.
Show resolved Hide resolved
throw Exception(
fmt::format(
"NativeBlockInputStream schema mismatch at column {}, expected {}, actual {}",
column_index,
expected,
actual),
ErrorCodes::LOGICAL_ERROR);
}

} // namespace CodecUtils
} // namespace DB
Loading