Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interpreter: Serialize executors #4742

Merged
merged 24 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,15 @@ bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t col
auto * meta_string = exchange_sender->add_encoded_task_meta();
meta.AppendToString(meta_string);
}

for (auto & field : output_schema)
{
auto tipb_type = TiDB::columnInfoToFieldType(field.second);
tipb_type.set_collate(collator_id);
auto * field_type = exchange_sender->add_all_field_types();
*field_type = tipb_type;
}

auto * child_executor = exchange_sender->mutable_child();
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}
Expand Down
92 changes: 92 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,98 @@ const String & getFunctionName(const tipb::Expr & expr)
}
}

String getExchangeTypeName(const tipb::ExchangeType & tp)
{
switch (tp)
{
case tipb::ExchangeType::Broadcast:
return "Broadcast";
case tipb::ExchangeType::PassThrough:
return "PassThrough";
case tipb::ExchangeType::Hash:
return "Hash";
default:
throw TiFlashException(fmt::format("Not supported Exchange type: {}", tp), Errors::Coprocessor::Internal);
}
}

String getJoinTypeName(const tipb::JoinType & tp)
{
switch (tp)
{
case tipb::JoinType::TypeAntiLeftOuterSemiJoin:
return "AntiLeftOuterSemiJoin";
case tipb::JoinType::TypeLeftOuterJoin:
return "LeftOuterJoin";
case tipb::JoinType::TypeRightOuterJoin:
return "RightOuterJoin";
case tipb::JoinType::TypeLeftOuterSemiJoin:
return "LeftOuterSemiJoin";
case tipb::JoinType::TypeAntiSemiJoin:
return "AntiSemiJoin";
case tipb::JoinType::TypeInnerJoin:
return "InnerJoin";
case tipb::JoinType::TypeSemiJoin:
return "SemiJoin";
default:
throw TiFlashException(fmt::format("Not supported Join type: {}", tp), Errors::Coprocessor::Internal);
}
}

String getJoinExecTypeName(const tipb::JoinExecType & tp)
{
switch (tp)
{
case tipb::JoinExecType::TypeHashJoin:
return "HashJoin";
default:
throw TiFlashException(fmt::format("Not supported Join exectution type: {}", tp), Errors::Coprocessor::Internal);
}
}

String getFieldTypeName(Int32 tp)
{
switch (tp)
{
case TiDB::TypeTiny:
return "Tiny";
case TiDB::TypeShort:
return "Short";
case TiDB::TypeInt24:
return "Int24";
case TiDB::TypeLong:
return "Long";
case TiDB::TypeLongLong:
return "Longlong";
case TiDB::TypeYear:
return "Year";
case TiDB::TypeDouble:
return "Double";
case TiDB::TypeTime:
return "Time";
case TiDB::TypeDate:
return "Date";
case TiDB::TypeDatetime:
return "Datetime";
case TiDB::TypeNewDate:
return "NewDate";
case TiDB::TypeTimestamp:
return "Timestamp";
case TiDB::TypeFloat:
return "Float";
case TiDB::TypeDecimal:
return "Decimal";
case TiDB::TypeNewDecimal:
return "NewDecimal";
case TiDB::TypeVarchar:
return "Varchar";
case TiDB::TypeString:
return "String";
default:
throw TiFlashException(fmt::format("Not supported field type: {}", tp), Errors::Coprocessor::Internal);
}
}

String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col)
{
FmtBuffer fmt_buf;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>
#include <grpcpp/impl/codegen/status_code_enum.h>
#include <tipb/executor.pb.h>
#include <tipb/select.pb.h>

#include <unordered_map>
Expand All @@ -39,6 +40,10 @@ bool isWindowFunctionExpr(const tipb::Expr & expr);
const String & getFunctionName(const tipb::Expr & expr);
const String & getAggFunctionName(const tipb::Expr & expr);
const String & getWindowFunctionName(const tipb::Expr & expr);
String getExchangeTypeName(const tipb::ExchangeType & tp);
String getJoinTypeName(const tipb::JoinType & tp);
String getFieldTypeName(Int32 tp);
String getJoinExecTypeName(const tipb::JoinExecType & tp);
bool isColumnExpr(const tipb::Expr & expr);
String getColumnNameForColumnExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col);
NameAndTypePair getColumnNameAndTypeForColumnExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col);
Expand Down
21 changes: 2 additions & 19 deletions dbms/src/Flash/Statistics/ExchangeSenderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Statistics/ExchangeSenderImpl.h>

Expand All @@ -30,31 +31,13 @@ String MPPTunnelDetail::toJson() const
bytes);
}

namespace
{
String exchangeTypeToString(const tipb::ExchangeType & exchange_type)
{
switch (exchange_type)
{
case tipb::ExchangeType::PassThrough:
return "PassThrough";
case tipb::ExchangeType::Broadcast:
return "Broadcast";
case tipb::ExchangeType::Hash:
return "Hash";
default:
throw TiFlashException("unknown ExchangeType", Errors::Coprocessor::Internal);
}
}
} // namespace

void ExchangeSenderStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
{
fmt_buffer.fmtAppend(
R"("partition_num":{},"sender_target_task_ids":[{}],"exchange_type":"{}","connection_details":[)",
partition_num,
fmt::join(sender_target_task_ids, ","),
exchangeTypeToString(exchange_type));
getExchangeTypeName(exchange_type));
fmt_buffer.joinStr(
mpp_tunnel_details.cbegin(),
mpp_tunnel_details.cend(),
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ add_headers_and_sources(test_util .)
list(REMOVE_ITEM test_util_sources "bench_dbms_main.cpp" "gtests_dbms_main.cpp")

add_library(test_util_gtest_main ${test_util_headers} ${test_util_sources} gtests_dbms_main.cpp)
target_link_libraries(test_util_gtest_main dbms gtest_main)
target_link_libraries(test_util_gtest_main dbms gtest_main clickhouse_aggregate_functions)

add_library(test_util_bench_main ${test_util_headers} ${test_util_sources} bench_dbms_main.cpp)
target_link_libraries(test_util_bench_main dbms gtest_main benchmark)
Expand Down
73 changes: 20 additions & 53 deletions dbms/src/TestUtils/InterpreterTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,72 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <TestUtils/InterpreterTestUtils.h>
#include <TestUtils/executorSerializer.h>

namespace DB::tests
{
namespace
DAGContext & MockExecutorTest::getDAGContext()
{
String toTreeString(const tipb::Executor & root_executor, size_t level = 0);
assert(dag_context_ptr != nullptr);
return *dag_context_ptr;
}

void MockExecutorTest::initializeContext()
{
dag_context_ptr = std::make_unique<DAGContext>(1024);
context = MockDAGRequestContext(TiFlashTestEnv::getContext());
}

// serialize tipb::DAGRequest, print the executor name in a Tree format.
String toTreeString(std::shared_ptr<tipb::DAGRequest> dag_request)
void MockExecutorTest::SetUpTestCase()
{
assert((dag_request->executors_size() > 0) != dag_request->has_root_executor());
if (dag_request->has_root_executor())
try
{
return toTreeString(dag_request->root_executor());
DB::registerFunctions();
DB::registerAggregateFunctions();
}
else
catch (DB::Exception &)
{
FmtBuffer buffer;
String prefix;
traverseExecutors(dag_request.get(), [&buffer, &prefix](const tipb::Executor & executor) {
assert(executor.has_executor_id());
buffer.fmtAppend("{}{}\n", prefix, executor.executor_id());
prefix.append(" ");
return true;
});
return buffer.toString();
// Maybe another test has already registered, ignore exception here.
}
}

String toTreeString(const tipb::Executor & root_executor, size_t level)
{
FmtBuffer buffer;

auto append_str = [&buffer, &level](const tipb::Executor & executor) {
assert(executor.has_executor_id());

buffer.append(String(level, ' '));
buffer.append(executor.executor_id()).append("\n");
};

traverseExecutorTree(root_executor, [&](const tipb::Executor & executor) {
if (executor.has_join())
{
append_str(executor);
++level;
for (const auto & child : executor.join().children())
buffer.append(toTreeString(child, level));
return false;
}
else
{
append_str(executor);
++level;
return true;
}
});

return buffer.toString();
}
} // namespace

void dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual)
void MockExecutorTest::dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual)
{
String actual_string = toTreeString(actual);
ASSERT_EQ(Poco::trimInPlace(expected_string), Poco::trimInPlace(actual_string));
ASSERT_EQ(Poco::trimInPlace(expected_string), Poco::trim(ExecutorSerializer().serialize(actual.get())));
}

} // namespace DB::tests
29 changes: 7 additions & 22 deletions dbms/src/TestUtils/InterpreterTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
#pragma once

#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Functions/registerFunctions.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <TestUtils/TiFlashTestEnv.h>
#include <TestUtils/executorSerializer.h>
#include <TestUtils/mockExecutor.h>
namespace DB::tests
{
void dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual);
class MockExecutorTest : public ::testing::Test
{
protected:
Expand All @@ -37,29 +38,13 @@ class MockExecutorTest : public ::testing::Test
MockExecutorTest()
: context(TiFlashTestEnv::getContext())
{}
static void SetUpTestCase()
{
try
{
DB::registerFunctions();
}
catch (DB::Exception &)
{
// Maybe another test has already registered, ignore exception here.
}
}
static void SetUpTestCase();

virtual void initializeContext()
{
dag_context_ptr = std::make_unique<DAGContext>(1024);
context = MockDAGRequestContext(TiFlashTestEnv::getContext());
}
virtual void initializeContext();

DAGContext & getDAGContext()
{
assert(dag_context_ptr != nullptr);
return *dag_context_ptr;
}
DAGContext & getDAGContext();

static void dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual);

protected:
MockDAGRequestContext context;
Expand Down
Loading