Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test: Mock Input columns for operator tests #5041

Merged
merged 32 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8e3205a
init.
ywqzzy Jun 1, 2022
a54e510
more apis
ywqzzy Jun 1, 2022
27f98d2
mock exchange receiver data.
ywqzzy Jun 1, 2022
b2bf618
support multiple source, bug not fixed.
ywqzzy Jun 2, 2022
75820cf
join done.
ywqzzy Jun 2, 2022
803f831
remove comments.
ywqzzy Jun 2, 2022
8072210
format.
ywqzzy Jun 2, 2022
200d7b5
Merge branch 'master' of https://github.com/pingcap/tiflash into mock…
ywqzzy Jun 2, 2022
071fcb0
update.
ywqzzy Jun 2, 2022
43db3e9
little refine.
ywqzzy Jun 2, 2022
f6a1ec5
Merge branch 'master' of https://github.com/pingcap/tiflash into mock…
ywqzzy Jun 2, 2022
6259faa
meet a bug.
ywqzzy Jun 6, 2022
75eba4c
try....
ywqzzy Jun 6, 2022
dc88cda
fix mock stream bug.
ywqzzy Jun 6, 2022
8b2e582
test multiple source.
ywqzzy Jun 6, 2022
4379ad3
check source name duplicated.
ywqzzy Jun 6, 2022
fca6c58
meet unstable test
ywqzzy Jun 6, 2022
e20ec44
fix unstable tests.
ywqzzy Jun 7, 2022
3906174
tiny refine
ywqzzy Jun 7, 2022
064254a
format.
ywqzzy Jun 7, 2022
b3eabc2
bug fix, refine.
ywqzzy Jun 7, 2022
900fddd
remove cout.
ywqzzy Jun 7, 2022
6bc6084
rename function
ywqzzy Jun 7, 2022
2da9059
update.
ywqzzy Jun 7, 2022
0dc63ae
address comments.
ywqzzy Jun 7, 2022
b6ede66
fix build fail.
ywqzzy Jun 7, 2022
8d472f0
Address comments.
ywqzzy Jun 7, 2022
61ba6b1
address comments
ywqzzy Jun 7, 2022
cbef9bb
address comments.
ywqzzy Jun 7, 2022
6a4b8c9
address comments
ywqzzy Jun 8, 2022
c64220b
add more comments
ywqzzy Jun 8, 2022
38e6a7a
Merge branch 'master' into mock_stream_inputblock
ywqzzy Jun 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const tipb::Exc
}
}

MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size)
: columns(columns)
, output_index(0)
, max_block_size(max_block_size)
{
rows = 0;
for (const auto & elem : columns)
{
if (elem.column)
{
assert(rows == 0 || rows == elem.column->size());
rows = elem.column->size();
}
}
}

ColumnPtr MockExchangeReceiverInputStream::makeColumn(ColumnWithTypeAndName elem) const
{
auto column = elem.type->createColumn();
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/MockExchangeReceiverInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ class MockExchangeReceiverInputStream : public IProfilingBlockInputStream
{
public:
MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_);
Block getHeader() const override { return Block(columns); }
MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size);
Block getHeader() const override
{
return Block(columns);
}
String getName() const override { return "MockExchangeReceiver"; }
ColumnsWithTypeAndName columns;
size_t output_index;
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,13 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_
return tables_regions_info.getTableRegionInfoByTableID(table_id);
}

ColumnsWithTypeAndName DAGContext::columnsForTest(String executor_id)
{
auto it = columns_for_test_map.find(executor_id);
if (unlikely(it == columns_for_test_map.end()))
{
throw DB::Exception("Don't have columns for mock source executors");
}
return it->second;
}
} // namespace DB
9 changes: 7 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ class DAGContext
}

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);

bool columnsForTestEmpty() { return columns_for_test_map.empty(); }

void cancelAllExchangeReceiver();

Expand All @@ -317,8 +321,8 @@ class DAGContext
Clock::time_point read_wait_index_end_timestamp{Clock::duration::zero()};
String table_scan_executor_id;
String tidb_host = "Unknown";
bool collect_execution_summaries;
bool return_executor_id;
bool collect_execution_summaries{};
bool return_executor_id{};
bool is_mpp_task = false;
bool is_root_mpp_task = false;
bool is_batch_cop = false;
Expand Down Expand Up @@ -372,6 +376,7 @@ class DAGContext
std::vector<SubqueriesForSets> subqueries;

bool is_test = false; /// switch for test, do not use it in production.
std::unordered_map<String, ColumnsWithTypeAndName> columns_for_test_map; /// <exector_id, columns>, for multiple sources
};

} // namespace DB
49 changes: 35 additions & 14 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Mpp/ExchangeReceiver.h>
Expand Down Expand Up @@ -159,13 +160,22 @@ AnalysisResult analyzeExpressions(
// for tests, we need to mock tableScan blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
auto names_and_types = genNamesAndTypes(table_scan);
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(table_scan.getTableScanExecutorID()).empty())
{
auto names_and_types = genNamesAndTypes(table_scan);
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}
}
else
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
auto [names_and_types, mock_table_scan_streams] = mockSourceStream<MockTableScanBlockInputStream>(context, max_streams, log, table_scan.getTableScanExecutorID());
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end());
}
}

Expand Down Expand Up @@ -266,7 +276,8 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
stream->setExtraInfo(
fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id));
});
executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/true, "for join");
// for test, join executor need the return blocks to output.
executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!dagContext().isTest(), "for join");
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

right_query.source = build_pipeline.firstStream();
right_query.join = join_ptr;
Expand Down Expand Up @@ -491,19 +502,29 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

// for tests, we need to mock ExchangeReceiver blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockExchangeReceiver(DAGPipeline & pipeline)
{
for (size_t i = 0; i < max_streams; ++i)
if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(query_block.source_name).empty())
{
// use max_block_size / 10 to determine the mock block's size
pipeline.streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
for (size_t i = 0; i < max_streams; ++i)
{
// use max_block_size / 10 to determine the mock block's size
pipeline.streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
}
NamesAndTypes source_columns;
for (const auto & col : pipeline.firstStream()->getHeader())
{
source_columns.emplace_back(col.name, col.type);
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}
NamesAndTypes source_columns;
for (const auto & col : pipeline.firstStream()->getHeader())
else
{
source_columns.emplace_back(col.name, col.type);
auto [names_and_types, mock_exchange_streams] = mockSourceStream<MockExchangeReceiverInputStream>(context, max_streams, log, query_block.source_name);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
pipeline.streams.insert(pipeline.streams.end(), mock_exchange_streams.begin(), mock_exchange_streams.end());
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection)
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ BlockIO InterpreterDAG::execute()
DAGPipeline pipeline;
pipeline.streams = streams;
/// add union to run in parallel if needed
if (dagContext().isMPPTask())
if (dagContext().isTest())
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test");
else if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, dagContext().log, false, "for non mpp");
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp");
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
Expand Down
59 changes: 59 additions & 0 deletions dbms/src/Flash/Coprocessor/MockSourceStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Common/Exception.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/Context.h>

namespace DB
{
template <typename SourceType>
std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id)
{
ColumnsWithTypeAndName columns_with_type_and_name;
NamesAndTypes names_and_types;
size_t rows = 0;
std::vector<std::shared_ptr<SourceType>> mock_source_streams;
columns_with_type_and_name = context.getDAGContext()->columnsForTest(executor_id);
for (const auto & col : columns_with_type_and_name)
{
if (rows == 0)
rows = col.column->size();
RUNTIME_ASSERT(rows == col.column->size(), log, "each column has same size");
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
names_and_types.push_back({col.name, col.type});
}
size_t row_for_each_stream = rows / max_streams;
size_t rows_left = rows - row_for_each_stream * max_streams;
size_t start = 0;
for (size_t i = 0; i < max_streams; ++i)
{
ColumnsWithTypeAndName columns_for_stream;
size_t row_for_current_stream = row_for_each_stream + (i < rows_left ? 1 : 0);
for (const auto & column_with_type_and_name : columns_with_type_and_name)
{
columns_for_stream.push_back(
ColumnWithTypeAndName(
column_with_type_and_name.column->cut(start, row_for_current_stream),
column_with_type_and_name.type,
column_with_type_and_name.name));
}
start += row_for_current_stream;
mock_source_streams.emplace_back(std::make_shared<SourceType>(columns_for_stream, context.getSettingsRef().max_block_size));
}
return {names_and_types, mock_source_streams};
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
}
} // namespace DB
Loading