Skip to content

Commit

Permalink
Test: Mock Input columns for operator tests (#5041)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored Jun 8, 2022
1 parent 5847f1c commit fdab3f5
Show file tree
Hide file tree
Showing 15 changed files with 652 additions and 129 deletions.
16 changes: 16 additions & 0 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const tipb::Exc
}
}

MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size)
: columns(columns)
, output_index(0)
, max_block_size(max_block_size)
{
rows = 0;
for (const auto & elem : columns)
{
if (elem.column)
{
assert(rows == 0 || rows == elem.column->size());
rows = elem.column->size();
}
}
}

ColumnPtr MockExchangeReceiverInputStream::makeColumn(ColumnWithTypeAndName elem) const
{
auto column = elem.type->createColumn();
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/MockExchangeReceiverInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ class MockExchangeReceiverInputStream : public IProfilingBlockInputStream
{
public:
MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_);
Block getHeader() const override { return Block(columns); }
MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size);
Block getHeader() const override
{
return Block(columns);
}
String getName() const override { return "MockExchangeReceiver"; }
ColumnsWithTypeAndName columns;
size_t output_index;
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,13 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_
return tables_regions_info.getTableRegionInfoByTableID(table_id);
}

ColumnsWithTypeAndName DAGContext::columnsForTest(String executor_id)
{
auto it = columns_for_test_map.find(executor_id);
if (unlikely(it == columns_for_test_map.end()))
{
throw DB::Exception("Don't have columns for mock source executors");
}
return it->second;
}
} // namespace DB
9 changes: 7 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ class DAGContext
}

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);

bool columnsForTestEmpty() { return columns_for_test_map.empty(); }

void cancelAllExchangeReceiver();

Expand All @@ -317,8 +321,8 @@ class DAGContext
Clock::time_point read_wait_index_end_timestamp{Clock::duration::zero()};
String table_scan_executor_id;
String tidb_host = "Unknown";
bool collect_execution_summaries;
bool return_executor_id;
bool collect_execution_summaries{};
bool return_executor_id{};
bool is_mpp_task = false;
bool is_root_mpp_task = false;
bool is_batch_cop = false;
Expand Down Expand Up @@ -372,6 +376,7 @@ class DAGContext
std::vector<SubqueriesForSets> subqueries;

bool is_test = false; /// switch for test, do not use it in production.
std::unordered_map<String, ColumnsWithTypeAndName> columns_for_test_map; /// <exector_id, columns>, for multiple sources
};

} // namespace DB
49 changes: 35 additions & 14 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Mpp/ExchangeReceiver.h>
Expand Down Expand Up @@ -159,13 +160,22 @@ AnalysisResult analyzeExpressions(
// for tests, we need to mock tableScan blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
auto names_and_types = genNamesAndTypes(table_scan);
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(table_scan.getTableScanExecutorID()).empty())
{
auto names_and_types = genNamesAndTypes(table_scan);
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}
}
else
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
auto [names_and_types, mock_table_scan_streams] = mockSourceStream<MockTableScanBlockInputStream>(context, max_streams, log, table_scan.getTableScanExecutorID());
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end());
}
}

Expand Down Expand Up @@ -266,7 +276,8 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
stream->setExtraInfo(
fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id));
});
executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/true, "for join");
// for test, join executor need the return blocks to output.
executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!dagContext().isTest(), "for join");

right_query.source = build_pipeline.firstStream();
right_query.join = join_ptr;
Expand Down Expand Up @@ -491,19 +502,29 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

// for tests, we need to mock ExchangeReceiver blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockExchangeReceiver(DAGPipeline & pipeline)
{
for (size_t i = 0; i < max_streams; ++i)
if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(query_block.source_name).empty())
{
// use max_block_size / 10 to determine the mock block's size
pipeline.streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
for (size_t i = 0; i < max_streams; ++i)
{
// use max_block_size / 10 to determine the mock block's size
pipeline.streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
}
NamesAndTypes source_columns;
for (const auto & col : pipeline.firstStream()->getHeader())
{
source_columns.emplace_back(col.name, col.type);
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}
NamesAndTypes source_columns;
for (const auto & col : pipeline.firstStream()->getHeader())
else
{
source_columns.emplace_back(col.name, col.type);
auto [names_and_types, mock_exchange_streams] = mockSourceStream<MockExchangeReceiverInputStream>(context, max_streams, log, query_block.source_name);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
pipeline.streams.insert(pipeline.streams.end(), mock_exchange_streams.begin(), mock_exchange_streams.end());
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection)
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ BlockIO InterpreterDAG::execute()
DAGPipeline pipeline;
pipeline.streams = streams;
/// add union to run in parallel if needed
if (dagContext().isMPPTask())
if (unlikely(dagContext().isTest()))
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test");
else if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, dagContext().log, false, "for non mpp");
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp");
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
Expand Down
60 changes: 60 additions & 0 deletions dbms/src/Flash/Coprocessor/MockSourceStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Common/Exception.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/Context.h>

namespace DB
{
template <typename SourceType>
std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id)
{
ColumnsWithTypeAndName columns_with_type_and_name;
NamesAndTypes names_and_types;
size_t rows = 0;
std::vector<std::shared_ptr<SourceType>> mock_source_streams;
columns_with_type_and_name = context.getDAGContext()->columnsForTest(executor_id);
for (const auto & col : columns_with_type_and_name)
{
if (rows == 0)
rows = col.column->size();
RUNTIME_ASSERT(rows == col.column->size(), log, "each column must has same size");
names_and_types.push_back({col.name, col.type});
}
size_t row_for_each_stream = rows / max_streams;
size_t rows_left = rows - row_for_each_stream * max_streams;
size_t start = 0;
for (size_t i = 0; i < max_streams; ++i)
{
ColumnsWithTypeAndName columns_for_stream;
size_t row_for_current_stream = row_for_each_stream + (i < rows_left ? 1 : 0);
for (const auto & column_with_type_and_name : columns_with_type_and_name)
{
columns_for_stream.push_back(
ColumnWithTypeAndName(
column_with_type_and_name.column->cut(start, row_for_current_stream),
column_with_type_and_name.type,
column_with_type_and_name.name));
}
start += row_for_current_stream;
mock_source_streams.emplace_back(std::make_shared<SourceType>(columns_for_stream, context.getSettingsRef().max_block_size));
}
RUNTIME_ASSERT(start == rows, log, "mock source streams' total size must same as user input");
return {names_and_types, mock_source_streams};
}
} // namespace DB
Loading

0 comments on commit fdab3f5

Please sign in to comment.