diff --git a/dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp b/dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp index b1de3e23914..3f46bb46cc8 100644 --- a/dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp +++ b/dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp @@ -30,6 +30,22 @@ MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const tipb::Exc } } +MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size) + : columns(columns) + , output_index(0) + , max_block_size(max_block_size) +{ + rows = 0; + for (const auto & elem : columns) + { + if (elem.column) + { + assert(rows == 0 || rows == elem.column->size()); + rows = elem.column->size(); + } + } +} + ColumnPtr MockExchangeReceiverInputStream::makeColumn(ColumnWithTypeAndName elem) const { auto column = elem.type->createColumn(); diff --git a/dbms/src/DataStreams/MockExchangeReceiverInputStream.h b/dbms/src/DataStreams/MockExchangeReceiverInputStream.h index 24ae80d4f62..8c0a5b85822 100644 --- a/dbms/src/DataStreams/MockExchangeReceiverInputStream.h +++ b/dbms/src/DataStreams/MockExchangeReceiverInputStream.h @@ -26,7 +26,11 @@ class MockExchangeReceiverInputStream : public IProfilingBlockInputStream { public: MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_); - Block getHeader() const override { return Block(columns); } + MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size); + Block getHeader() const override + { + return Block(columns); + } String getName() const override { return "MockExchangeReceiver"; } ColumnsWithTypeAndName columns; size_t output_index; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 17fb6553eab..1736e0b6cec 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -271,4 +271,13 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_ return tables_regions_info.getTableRegionInfoByTableID(table_id); } +ColumnsWithTypeAndName DAGContext::columnsForTest(String executor_id) +{ + auto it = columns_for_test_map.find(executor_id); + if (unlikely(it == columns_for_test_map.end())) + { + throw DB::Exception("Don't have columns for mock source executors"); + } + return it->second; +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index e3e5efdcbc6..c20eb3a367e 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -298,6 +298,10 @@ class DAGContext } bool isTest() const { return is_test; } + void setColumnsForTest(std::unordered_map & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; } + ColumnsWithTypeAndName columnsForTest(String executor_id); + + bool columnsForTestEmpty() { return columns_for_test_map.empty(); } void cancelAllExchangeReceiver(); @@ -317,8 +321,8 @@ class DAGContext Clock::time_point read_wait_index_end_timestamp{Clock::duration::zero()}; String table_scan_executor_id; String tidb_host = "Unknown"; - bool collect_execution_summaries; - bool return_executor_id; + bool collect_execution_summaries{}; + bool return_executor_id{}; bool is_mpp_task = false; bool is_root_mpp_task = false; bool is_batch_cop = false; @@ -372,6 +376,7 @@ class DAGContext std::vector subqueries; bool is_test = false; /// switch for test, do not use it in production. + std::unordered_map columns_for_test_map; /// , for multiple sources }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 5fac49faaed..86d6428c92a 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -159,13 +160,22 @@ AnalysisResult analyzeExpressions( // for tests, we need to mock tableScan blockInputStream as the source stream. void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline) { - auto names_and_types = genNamesAndTypes(table_scan); - auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types); - analyzer = std::make_unique(std::move(names_and_types), context); - for (size_t i = 0; i < max_streams; ++i) + if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(table_scan.getTableScanExecutorID()).empty()) + { + auto names_and_types = genNamesAndTypes(table_scan); + auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types); + analyzer = std::make_unique(std::move(names_and_types), context); + for (size_t i = 0; i < max_streams; ++i) + { + auto mock_table_scan_stream = std::make_shared(columns_with_type_and_name, context.getSettingsRef().max_block_size); + pipeline.streams.emplace_back(mock_table_scan_stream); + } + } + else { - auto mock_table_scan_stream = std::make_shared(columns_with_type_and_name, context.getSettingsRef().max_block_size); - pipeline.streams.emplace_back(mock_table_scan_stream); + auto [names_and_types, mock_table_scan_streams] = mockSourceStream(context, max_streams, log, table_scan.getTableScanExecutorID()); + analyzer = std::make_unique(std::move(names_and_types), context); + pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); } } @@ -266,7 +276,8 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & stream->setExtraInfo( fmt::format("join build, build_side_root_executor_id = {}", dagContext().getJoinExecuteInfoMap()[query_block.source_name].build_side_root_executor_id)); }); - executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/true, "for join"); + // for test, join executor need the return blocks to output. + executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!dagContext().isTest(), "for join"); right_query.source = build_pipeline.firstStream(); right_query.join = join_ptr; @@ -491,19 +502,29 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) analyzer = std::make_unique(std::move(source_columns), context); } +// for tests, we need to mock ExchangeReceiver blockInputStream as the source stream. void DAGQueryBlockInterpreter::handleMockExchangeReceiver(DAGPipeline & pipeline) { - for (size_t i = 0; i < max_streams; ++i) + if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(query_block.source_name).empty()) { - // use max_block_size / 10 to determine the mock block's size - pipeline.streams.push_back(std::make_shared(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10)); + for (size_t i = 0; i < max_streams; ++i) + { + // use max_block_size / 10 to determine the mock block's size + pipeline.streams.push_back(std::make_shared(query_block.source->exchange_receiver(), context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10)); + } + NamesAndTypes source_columns; + for (const auto & col : pipeline.firstStream()->getHeader()) + { + source_columns.emplace_back(col.name, col.type); + } + analyzer = std::make_unique(std::move(source_columns), context); } - NamesAndTypes source_columns; - for (const auto & col : pipeline.firstStream()->getHeader()) + else { - source_columns.emplace_back(col.name, col.type); + auto [names_and_types, mock_exchange_streams] = mockSourceStream(context, max_streams, log, query_block.source_name); + analyzer = std::make_unique(std::move(names_and_types), context); + pipeline.streams.insert(pipeline.streams.end(), mock_exchange_streams.begin(), mock_exchange_streams.end()); } - analyzer = std::make_unique(std::move(source_columns), context); } void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection) diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 741aa7b5e26..a67ebf20aa5 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -83,11 +83,13 @@ BlockIO InterpreterDAG::execute() DAGPipeline pipeline; pipeline.streams = streams; /// add union to run in parallel if needed - if (dagContext().isMPPTask()) + if (unlikely(dagContext().isTest())) + executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test"); + else if (dagContext().isMPPTask()) /// MPPTask do not need the returned blocks. executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp"); else - executeUnion(pipeline, max_streams, dagContext().log, false, "for non mpp"); + executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp"); if (dagContext().hasSubquery()) { const Settings & settings = context.getSettingsRef(); diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.h b/dbms/src/Flash/Coprocessor/MockSourceStream.h new file mode 100644 index 00000000000..039cba22e3d --- /dev/null +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.h @@ -0,0 +1,60 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +template +std::pair>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id) +{ + ColumnsWithTypeAndName columns_with_type_and_name; + NamesAndTypes names_and_types; + size_t rows = 0; + std::vector> mock_source_streams; + columns_with_type_and_name = context.getDAGContext()->columnsForTest(executor_id); + for (const auto & col : columns_with_type_and_name) + { + if (rows == 0) + rows = col.column->size(); + RUNTIME_ASSERT(rows == col.column->size(), log, "each column must has same size"); + names_and_types.push_back({col.name, col.type}); + } + size_t row_for_each_stream = rows / max_streams; + size_t rows_left = rows - row_for_each_stream * max_streams; + size_t start = 0; + for (size_t i = 0; i < max_streams; ++i) + { + ColumnsWithTypeAndName columns_for_stream; + size_t row_for_current_stream = row_for_each_stream + (i < rows_left ? 1 : 0); + for (const auto & column_with_type_and_name : columns_with_type_and_name) + { + columns_for_stream.push_back( + ColumnWithTypeAndName( + column_with_type_and_name.column->cut(start, row_for_current_stream), + column_with_type_and_name.type, + column_with_type_and_name.name)); + } + start += row_for_current_stream; + mock_source_streams.emplace_back(std::make_shared(columns_for_stream, context.getSettingsRef().max_block_size)); + } + RUNTIME_ASSERT(start == rows, log, "mock source streams' total size must same as user input"); + return {names_and_types, mock_source_streams}; +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/tests/gtest_executor.cpp b/dbms/src/Flash/tests/gtest_executor.cpp new file mode 100644 index 00000000000..64c60f14bb6 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_executor.cpp @@ -0,0 +1,230 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +namespace tests +{ +class ExecutorTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + context.addMockTable({"test_db", "test_table"}, + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); + context.addExchangeReceiver("exchange1", + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); + + context.addExchangeReceiver("exchange_r_table", + {{"s1", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toNullableVec("s", {"banana", "banana"}), + toNullableVec("join_c", {"apple", "banana"})}); + + context.addExchangeReceiver("exchange_l_table", + {{"s1", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toNullableVec("s", {"banana", "banana"}), + toNullableVec("join_c", {"apple", "banana"})}); + + context.addMockTable({"test_db", "r_table"}, + {{"s", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toVec("s", {"banana", "banana"}), + toVec("join_c", {"apple", "banana"})}); + + context.addMockTable({"test_db", "r_table_2"}, + {{"s", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toVec("s", {"banana", "banana", "banana"}), + toVec("join_c", {"apple", "apple", "apple"})}); + + context.addMockTable({"test_db", "l_table"}, + {{"s", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toVec("s", {"banana", "banana"}), + toVec("join_c", {"apple", "banana"})}); + } +}; + +TEST_F(ExecutorTestRunner, Filter) +try +{ + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .build(context); + { + executeStreams(request, + {toNullableVec({"banana"}), + toNullableVec({"banana"})}); + } + + request = context.receive("exchange1") + .filter(eq(col("s1"), col("s2"))) + .build(context); + { + executeStreams(request, + {toNullableVec({"banana"}), + toNullableVec({"banana"})}); + } +} +CATCH + +TEST_F(ExecutorTestRunner, JoinWithTableScan) +try +{ + auto request = context + .scan("test_db", "l_table") + .join(context.scan("test_db", "r_table"), {col("join_c")}, ASTTableJoin::Kind::Left) + .topN("join_c", false, 2) + .build(context); + { + String expected = "topn_3 | order_by: {(<1, String>, desc: false)}, limit: 2\n" + " Join_2 | LeftOuterJoin, HashJoin. left_join_keys: {<0, String>}, right_join_keys: {<0, String>}\n" + " table_scan_0 | {<0, String>, <1, String>}\n" + " table_scan_1 | {<0, String>, <1, String>}\n"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); + + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 5); + + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); + } + request = context + .scan("test_db", "l_table") + .join(context.scan("test_db", "r_table"), {col("join_c")}, ASTTableJoin::Kind::Left) + .project({"s", "join_c"}) + .topN("join_c", false, 2) + .build(context); + { + String expected = "topn_4 | order_by: {(<1, String>, desc: false)}, limit: 2\n" + " project_3 | {<0, String>, <1, String>}\n" + " Join_2 | LeftOuterJoin, HashJoin. left_join_keys: {<0, String>}, right_join_keys: {<0, String>}\n" + " table_scan_0 | {<0, String>, <1, String>}\n" + " table_scan_1 | {<0, String>, <1, String>}\n"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); + } + + request = context + .scan("test_db", "l_table") + .join(context.scan("test_db", "r_table_2"), {col("join_c")}, ASTTableJoin::Kind::Left) + .topN("join_c", false, 4) + .build(context); + { + String expected = "topn_3 | order_by: {(<1, String>, desc: false)}, limit: 4\n" + " Join_2 | LeftOuterJoin, HashJoin. left_join_keys: {<0, String>}, right_join_keys: {<0, String>}\n" + " table_scan_0 | {<0, String>, <1, String>}\n" + " table_scan_1 | {<0, String>, <1, String>}\n"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + executeStreams(request, + {toNullableVec({"banana", "banana", "banana", "banana"}), + toNullableVec({"apple", "apple", "apple", "banana"}), + toNullableVec({"banana", "banana", "banana", {}}), + toNullableVec({"apple", "apple", "apple", {}})}, + 2); + executeStreams(request, + {toNullableVec({"banana", "banana", "banana", "banana"}), + toNullableVec({"apple", "apple", "apple", "banana"}), + toNullableVec({"banana", "banana", "banana", {}}), + toNullableVec({"apple", "apple", "apple", {}})}, + 3); + } +} +CATCH + +TEST_F(ExecutorTestRunner, JoinWithExchangeReceiver) +try +{ + auto request = context + .receive("exchange_l_table") + .join(context.receive("exchange_r_table"), {col("join_c")}, ASTTableJoin::Kind::Left) + .topN("join_c", false, 2) + .build(context); + { + String expected = "topn_3 | order_by: {(<1, String>, desc: false)}, limit: 2\n" + " Join_2 | LeftOuterJoin, HashJoin. left_join_keys: {<0, String>}, right_join_keys: {<0, String>}\n" + " exchange_receiver_0 | type:PassThrough, {<0, String>, <1, String>}\n" + " exchange_receiver_1 | type:PassThrough, {<0, String>, <1, String>}\n"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); + + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 5); + + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); + } +} +CATCH + +TEST_F(ExecutorTestRunner, JoinWithTableScanAndReceiver) +try +{ + auto request = context + .scan("test_db", "l_table") + .join(context.receive("exchange_r_table"), {col("join_c")}, ASTTableJoin::Kind::Left) + .topN("join_c", false, 2) + .build(context); + { + String expected = "topn_3 | order_by: {(<1, String>, desc: false)}, limit: 2\n" + " Join_2 | LeftOuterJoin, HashJoin. left_join_keys: {<0, String>}, right_join_keys: {<0, String>}\n" + " table_scan_0 | {<0, String>, <1, String>}\n" + " exchange_receiver_1 | type:PassThrough, {<0, String>, <1, String>}\n"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + executeStreams(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); + } +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index aed9d9e90f9..a6bb8ff1702 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -12,19 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include namespace DB { namespace tests { -class InterpreterExecuteTest : public DB::tests::InterpreterTest +class InterpreterExecuteTest : public DB::tests::ExecutorTest { public: void initializeContext() override { - InterpreterTest::initializeContext(); + ExecutorTest::initializeContext(); context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); @@ -47,7 +47,7 @@ try .build(context); { String expected = R"( -Union: +Union: SharedQuery x 10: Expression: MergeSorting, limit = 10 @@ -72,7 +72,7 @@ Union: { String expected = R"( -Union: +Union: SharedQuery x 10: Limit, limit = 10 Union: @@ -100,7 +100,7 @@ try .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -122,7 +122,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -147,7 +147,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -181,7 +181,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: SharedQuery x 10: Limit, limit = 10 Union: @@ -244,7 +244,7 @@ CreatingSets HashJoinProbe: Expression: MockTableScan - Union: + Union: Expression x 10: Expression: HashJoinProbe: @@ -260,7 +260,7 @@ CreatingSets .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -283,7 +283,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: MockExchangeSender x 10 Expression: Expression: @@ -331,7 +331,7 @@ CreatingSets HashJoinProbe: Expression: MockExchangeReceiver - Union: + Union: Expression x 10: Expression: HashJoinProbe: @@ -373,7 +373,7 @@ CreatingSets HashJoinProbe: Expression: MockExchangeReceiver - Union: + Union: MockExchangeSender x 10 Expression: Expression: diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp new file mode 100644 index 00000000000..67a21d12286 --- /dev/null +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -0,0 +1,133 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +namespace DB::tests +{ +DAGContext & ExecutorTest::getDAGContext() +{ + assert(dag_context_ptr != nullptr); + return *dag_context_ptr; +} + +void ExecutorTest::initializeContext() +{ + dag_context_ptr = std::make_unique(1024); + context = MockDAGRequestContext(TiFlashTestEnv::getContext()); + dag_context_ptr->log = Logger::get("executorTest"); +} + +void ExecutorTest::SetUpTestCase() +{ + try + { + DB::registerFunctions(); + DB::registerAggregateFunctions(); + } + catch (DB::Exception &) + { + // Maybe another test has already registered, ignore exception here. + } +} + +void ExecutorTest::initializeClientInfo() +{ + context.context.setCurrentQueryId("test"); + ClientInfo & client_info = context.context.getClientInfo(); + client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; + client_info.interface = ClientInfo::Interface::GRPC; +} + +void ExecutorTest::executeInterpreter(const String & expected_string, const std::shared_ptr & request, size_t concurrency) +{ + DAGContext dag_context(*request, "interpreter_test", concurrency); + context.context.setDAGContext(&dag_context); + // Currently, don't care about regions information in interpreter tests. + DAGQuerySource dag(context.context); + auto res = executeQuery(dag, context.context, false, QueryProcessingStage::Complete); + FmtBuffer fb; + res.in->dumpTree(fb); + ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); +} + +namespace +{ +Block mergeBlocks(Blocks blocks) +{ + if (blocks.empty()) + return {}; + + Block sample_block = blocks.back(); + std::vector actual_cols; + for (const auto & column : sample_block.getColumnsWithTypeAndName()) + { + actual_cols.push_back(column.type->createColumn()); + } + for (const auto & block : blocks) + { + for (size_t i = 0; i < block.columns(); ++i) + { + for (size_t j = 0; j < block.rows(); ++j) + { + actual_cols[i]->insert((*(block.getColumnsWithTypeAndName())[i].column)[j]); + } + } + } + + ColumnsWithTypeAndName actual_columns; + for (size_t i = 0; i < actual_cols.size(); ++i) + actual_columns.push_back({std::move(actual_cols[i]), sample_block.getColumnsWithTypeAndName()[i].type, sample_block.getColumnsWithTypeAndName()[i].name, sample_block.getColumnsWithTypeAndName()[i].column_id}); + return Block(actual_columns); +} + +void readBlock(BlockInputStreamPtr stream, const ColumnsWithTypeAndName & expect_columns) +{ + Blocks actual_blocks; + Block except_block(expect_columns); + stream->readPrefix(); + while (auto block = stream->read()) + { + actual_blocks.push_back(block); + } + stream->readSuffix(); + Block actual_block = mergeBlocks(actual_blocks); + ASSERT_BLOCK_EQ(except_block, actual_block); +} +} // namespace + +void ExecutorTest::executeStreams(const std::shared_ptr & request, std::unordered_map & source_columns_map, const ColumnsWithTypeAndName & expect_columns, size_t concurrency) +{ + DAGContext dag_context(*request, "executor_test", concurrency); + dag_context.setColumnsForTest(source_columns_map); + context.context.setDAGContext(&dag_context); + // Currently, don't care about regions information in tests. + DAGQuerySource dag(context.context); + readBlock(executeQuery(dag, context.context, false, QueryProcessingStage::Complete).in, expect_columns); +} + +void ExecutorTest::executeStreams(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns, size_t concurrency) +{ + executeStreams(request, context.executorIdColumnsMap(), expect_columns, concurrency); +} + +void ExecutorTest::dagRequestEqual(const String & expected_string, const std::shared_ptr & actual) +{ + ASSERT_EQ(Poco::trim(expected_string), Poco::trim(ExecutorSerializer().serialize(actual.get()))); +} + +} // namespace DB::tests diff --git a/dbms/src/TestUtils/InterpreterTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h similarity index 61% rename from dbms/src/TestUtils/InterpreterTestUtils.h rename to dbms/src/TestUtils/ExecutorTestUtils.h index 28d44d3a5f2..977b46abbd2 100644 --- a/dbms/src/TestUtils/InterpreterTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -27,7 +27,7 @@ namespace DB::tests { void executeInterpreter(const std::shared_ptr & request, Context & context); -class InterpreterTest : public ::testing::Test +class ExecutorTest : public ::testing::Test { protected: void SetUp() override @@ -37,7 +37,7 @@ class InterpreterTest : public ::testing::Test } public: - InterpreterTest() + ExecutorTest() : context(TiFlashTestEnv::getContext()) {} static void SetUpTestCase(); @@ -52,6 +52,40 @@ class InterpreterTest : public ::testing::Test void executeInterpreter(const String & expected_string, const std::shared_ptr & request, size_t concurrency); + void executeStreams( + const std::shared_ptr & request, + std::unordered_map & source_columns_map, + const ColumnsWithTypeAndName & expect_columns, + size_t concurrency = 1); + void executeStreams( + const std::shared_ptr & request, + const ColumnsWithTypeAndName & expect_columns, + size_t concurrency = 1); + + template + ColumnWithTypeAndName toNullableVec(const std::vector::FieldType>> & v) + { + return createColumn>(v); + } + + template + ColumnWithTypeAndName toVec(const std::vector::FieldType> & v) + { + return createColumn(v); + } + + template + ColumnWithTypeAndName toNullableVec(String name, const std::vector::FieldType>> & v) + { + return createColumn>(v, name); + } + + template + ColumnWithTypeAndName toVec(String name, const std::vector::FieldType> & v) + { + return createColumn(v, name); + } + protected: MockDAGRequestContext context; std::unique_ptr dag_context_ptr; diff --git a/dbms/src/TestUtils/InterpreterTestUtils.cpp b/dbms/src/TestUtils/InterpreterTestUtils.cpp deleted file mode 100644 index 2cc096d4095..00000000000 --- a/dbms/src/TestUtils/InterpreterTestUtils.cpp +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -namespace DB::tests -{ -DAGContext & InterpreterTest::getDAGContext() -{ - assert(dag_context_ptr != nullptr); - return *dag_context_ptr; -} - -void InterpreterTest::initializeContext() -{ - dag_context_ptr = std::make_unique(1024); - context = MockDAGRequestContext(TiFlashTestEnv::getContext()); - dag_context_ptr->log = Logger::get("interpreterTest"); -} - -void InterpreterTest::SetUpTestCase() -{ - try - { - DB::registerFunctions(); - DB::registerAggregateFunctions(); - } - catch (DB::Exception &) - { - // Maybe another test has already registered, ignore exception here. - } -} - -void InterpreterTest::initializeClientInfo() -{ - context.context.setCurrentQueryId("test"); - ClientInfo & client_info = context.context.getClientInfo(); - client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; - client_info.interface = ClientInfo::Interface::GRPC; -} - -void InterpreterTest::executeInterpreter(const String & expected_string, const std::shared_ptr & request, size_t concurrency) -{ - DAGContext dag_context(*request, "interpreter_test", concurrency); - context.context.setDAGContext(&dag_context); - // Currently, don't care about regions information in interpreter tests. - DAGQuerySource dag(context.context); - auto res = executeQuery(dag, context.context, false, QueryProcessingStage::Complete); - FmtBuffer fb; - res.in->dumpTree(fb); - ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); -} - -void InterpreterTest::dagRequestEqual(const String & expected_string, const std::shared_ptr & actual) -{ - ASSERT_EQ(Poco::trim(expected_string), Poco::trim(ExecutorSerializer().serialize(actual.get()))); -} - -} // namespace DB::tests diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 3313aae6a93..af939002cff 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -23,6 +23,8 @@ #include #include +#include + namespace DB::tests { ASTPtr buildColumn(const String & column_name) @@ -274,52 +276,116 @@ DAGRequestBuilder & DAGRequestBuilder::buildAggregation(ASTPtr agg_funcs, ASTPtr return *this; } -void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoList & columns) +void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoList & columnInfos) { - std::vector v_column_info(columns.size()); + std::vector v_column_info(columnInfos.size()); size_t i = 0; - for (const auto & info : columns) + for (const auto & info : columnInfos) { v_column_info[i++] = std::move(info); } mock_tables[name.first + "." + name.second] = v_column_info; } -void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfos & columns) +void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfos & columnInfos) { - mock_tables[db + "." + table] = columns; + mock_tables[db + "." + table] = columnInfos; } -void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfos & columns) +void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfos & columnInfos) { - mock_tables[name.first + "." + name.second] = columns; + mock_tables[name.first + "." + name.second] = columnInfos; } -void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfos & columns) +void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfos & columnInfos) { - exchange_schemas[name] = columns; + exchange_schemas[name] = columnInfos; } -void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoList & columns) +void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoList & columnInfos) { - std::vector v_column_info(columns.size()); + std::vector v_column_info(columnInfos.size()); size_t i = 0; - for (const auto & info : columns) + for (const auto & info : columnInfos) { v_column_info[i++] = std::move(info); } exchange_schemas[name] = v_column_info; } +void MockDAGRequestContext::addMockTableColumnData(const String & db, const String & table, ColumnsWithTypeAndName columns) +{ + mock_table_columns[db + "." + table] = columns; +} + +void MockDAGRequestContext::addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns) +{ + mock_table_columns[name.first + "." + name.second] = columns; +} + +void MockDAGRequestContext::addExchangeReceiverColumnData(const String & name, ColumnsWithTypeAndName columns) +{ + mock_exchange_columns[name] = columns; +} + +void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoList & columnInfos, ColumnsWithTypeAndName columns) +{ + addMockTable(db, table, columnInfos); + addMockTableColumnData(db, table, columns); +} + +void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfos & columnInfos, ColumnsWithTypeAndName columns) +{ + addMockTable(db, table, columnInfos); + addMockTableColumnData(db, table, columns); +} + +void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoList & columnInfos, ColumnsWithTypeAndName columns) +{ + addMockTable(name, columnInfos); + addMockTableColumnData(name, columns); +} + +void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfos & columnInfos, ColumnsWithTypeAndName columns) +{ + addMockTable(name, columnInfos); + addMockTableColumnData(name, columns); +} + +void MockDAGRequestContext::addExchangeReceiver(const String & name, MockColumnInfos columnInfos, ColumnsWithTypeAndName columns) +{ + addExchangeRelationSchema(name, columnInfos); + addExchangeReceiverColumnData(name, columns); +} + +void MockDAGRequestContext::addExchangeReceiver(const String & name, MockColumnInfoList columnInfos, ColumnsWithTypeAndName columns) +{ + addExchangeRelationSchema(name, columnInfos); + addExchangeReceiverColumnData(name, columns); +} + DAGRequestBuilder MockDAGRequestContext::scan(String db_name, String table_name) { - return DAGRequestBuilder(index).mockTable({db_name, table_name}, mock_tables[db_name + "." + table_name]); + auto builder = DAGRequestBuilder(index).mockTable({db_name, table_name}, mock_tables[db_name + "." + table_name]); + // If don't have related columns, user must pass input columns as argument of executeStreams in order to run Executors Tests. + // If user don't want to test executors, it will be safe to run Interpreter Tests. + if (mock_table_columns.find(db_name + "." + table_name) != mock_table_columns.end()) + { + executor_id_columns_map[builder.getRoot()->name] = mock_table_columns[db_name + "." + table_name]; + } + return builder; } DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name) { auto builder = DAGRequestBuilder(index).exchangeReceiver(exchange_schemas[exchange_name]); receiver_source_task_ids_map[builder.getRoot()->name] = {}; + // If don't have related columns, user must pass input columns as argument of executeStreams in order to run Executors Tests. + // If user don't want to test executors, it will be safe to run Interpreter Tests. + if (mock_exchange_columns.find(exchange_name) != mock_exchange_columns.end()) + { + executor_id_columns_map[builder.getRoot()->name] = mock_exchange_columns[exchange_name]; + } return builder; } } // namespace DB::tests \ No newline at end of file diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 2f6d3542ebb..88d98158b74 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -122,11 +123,23 @@ class MockDAGRequestContext return DAGRequestBuilder(index); } - void addMockTable(const MockTableName & name, const MockColumnInfoList & columns); - void addMockTable(const String & db, const String & table, const MockColumnInfos & columns); - void addMockTable(const MockTableName & name, const MockColumnInfos & columns); - void addExchangeRelationSchema(String name, const MockColumnInfos & columns); - void addExchangeRelationSchema(String name, const MockColumnInfoList & columns); + void addMockTable(const MockTableName & name, const MockColumnInfoList & columnInfos); + void addMockTable(const String & db, const String & table, const MockColumnInfos & columnInfos); + void addMockTable(const MockTableName & name, const MockColumnInfos & columnInfos); + void addExchangeRelationSchema(String name, const MockColumnInfos & columnInfos); + void addExchangeRelationSchema(String name, const MockColumnInfoList & columnInfos); + void addMockTableColumnData(const String & db, const String & table, ColumnsWithTypeAndName columns); + void addMockTable(const String & db, const String & table, const MockColumnInfoList & columnInfos, ColumnsWithTypeAndName columns); + void addMockTable(const String & db, const String & table, const MockColumnInfos & columnInfos, ColumnsWithTypeAndName columns); + void addMockTable(const MockTableName & name, const MockColumnInfoList & columnInfos, ColumnsWithTypeAndName columns); + void addMockTable(const MockTableName & name, const MockColumnInfos & columnInfos, ColumnsWithTypeAndName columns); + void addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns); + void addExchangeReceiverColumnData(const String & name, ColumnsWithTypeAndName columns); + void addExchangeReceiver(const String & name, MockColumnInfos columnInfos, ColumnsWithTypeAndName columns); + void addExchangeReceiver(const String & name, MockColumnInfoList columnInfos, ColumnsWithTypeAndName columns); + + std::unordered_map & executorIdColumnsMap() { return executor_id_columns_map; } + DAGRequestBuilder scan(String db_name, String table_name); DAGRequestBuilder receive(String exchange_name); @@ -134,6 +147,9 @@ class MockDAGRequestContext size_t index; std::unordered_map mock_tables; std::unordered_map exchange_schemas; + std::unordered_map mock_table_columns; + std::unordered_map mock_exchange_columns; + std::unordered_map executor_id_columns_map; /// public: // Currently don't support task_id, so the following to structure is useless, diff --git a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp index 6dbf791669f..214148fe47f 100644 --- a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp +++ b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp @@ -12,19 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include namespace DB { namespace tests { -class MockDAGRequestTest : public DB::tests::InterpreterTest +class MockDAGRequestTest : public DB::tests::ExecutorTest { public: void initializeContext() override { - InterpreterTest::initializeContext(); + ExecutorTest::initializeContext(); context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeLong}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}});