Skip to content

Commit

Permalink
Test: Mock window function, refactor window function tests (#5021)
Browse files Browse the repository at this point in the history
ref #4609, close #5081
  • Loading branch information
ywqzzy authored Jun 13, 2022
1 parent 9c8a588 commit f4c2e01
Show file tree
Hide file tree
Showing 20 changed files with 823 additions and 348 deletions.
18 changes: 18 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/Arena.h>
#include <DataStreams/WindowBlockInputStream.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/WindowDescription.h>
#include <Interpreters/convertFieldToType.h>

namespace DB
Expand Down Expand Up @@ -574,4 +575,21 @@ void WindowBlockInputStream::tryCalculate()
peer_group_number = 1;
}
}

void WindowBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.append(", function: {");
buffer.joinStr(
window_description.window_functions_descriptions.begin(),
window_description.window_functions_descriptions.end(),
[&](const auto & func, FmtBuffer & b) {
b.append(func.window_function->getName());
},
", ");
buffer.fmtAppend(
"}}, frame: {{type: {}, boundary_begin: {}, boundary_end: {}}}",
frameTypeToString(window_description.frame.type),
boundaryTypeToString(window_description.frame.begin_type),
boundaryTypeToString(window_description.frame.end_type));
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FmtUtils.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/WindowDescription.h>
Expand Down Expand Up @@ -169,6 +170,7 @@ class WindowBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

LoggerPtr log;

Expand Down
199 changes: 197 additions & 2 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTSelectQuery.h>
#include <Poco/StringTokenizer.h>
#include <common/logger_useful.h>

namespace DB
{
using ASTPartitionByElement = ASTOrderByElement;
void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & val_field, tipb::Expr * expr, Int32 collator_id)
{
*(expr->mutable_field_type()) = columnInfoToFieldType(ci);
Expand Down Expand Up @@ -190,6 +190,12 @@ std::unordered_map<String, tipb::ExprType> agg_func_name_to_sig({
{"group_concat", tipb::ExprType::GroupConcat},
});

std::unordered_map<String, tipb::ExprType> window_func_name_to_sig({
{"RowNumber", tipb::ExprType::RowNumber},
{"Rank", tipb::ExprType::Rank},
{"DenseRank", tipb::ExprType::DenseRank},
});

DAGColumnInfo toNullableDAGColumnInfo(const DAGColumnInfo & input)
{
DAGColumnInfo output = input;
Expand Down Expand Up @@ -1343,6 +1349,105 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
exchange_map[left_exchange_receiver->name] = std::make_pair(left_exchange_receiver, left_exchange_sender);
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
tipb::Window * window = tipb_executor->mutable_window();
auto & input_schema = children[0]->output_schema;
for (const auto & expr : func_descs)
{
tipb::Expr * window_expr = window->add_func_desc();
const auto * window_func = typeid_cast<const ASTFunction *>(expr.get());
for (const auto & arg : window_func->arguments->children)
{
tipb::Expr * func = window_expr->add_children();
astToPB(input_schema, arg, func, collator_id, context);
}
auto window_sig_it = window_func_name_to_sig.find(window_func->name);
if (window_sig_it == window_func_name_to_sig.end())
throw Exception(fmt::format("Unsupported window function {}", window_func->name), ErrorCodes::LOGICAL_ERROR);
auto window_sig = window_sig_it->second;
window_expr->set_tp(window_sig);
auto * ft = window_expr->mutable_field_type();
// TODO: Maybe more window functions with different field type.
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagBinary);
ft->set_collate(collator_id);
ft->set_flen(21);
ft->set_decimal(-1);
}

for (const auto & child : order_by_exprs)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = window->add_order_by();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context);
}

for (const auto & child : partition_by_exprs)
{
auto * elem = typeid_cast<ASTPartitionByElement *>(child.get());
if (!elem)
throw Exception("Invalid partition by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = window->add_partition_by();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context);
}

if (frame.type.has_value())
{
tipb::WindowFrame * mut_frame = window->mutable_frame();
mut_frame->set_type(frame.type.value());
if (frame.start.has_value())
{
auto * start = mut_frame->mutable_start();
start->set_offset(std::get<2>(frame.start.value()));
start->set_unbounded(std::get<1>(frame.start.value()));
start->set_type(std::get<0>(frame.start.value()));
}

if (frame.end.has_value())
{
auto * end = mut_frame->mutable_end();
end->set_offset(std::get<2>(frame.end.value()));
end->set_unbounded(std::get<1>(frame.end.value()));
end->set_type(std::get<0>(frame.end.value()));
}
}

auto * children_executor = window->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
tipb::Sort * sort = tipb_executor->mutable_sort();
sort->set_ispartialsort(is_partial_sort);

for (const auto & child : by_exprs)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = sort->add_byitems();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context);
}

auto * children_executor = sort->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

} // namespace mock

ExecutorPtr compileTableScan(size_t & executor_index, TableInfo & table_info, String & table_alias, bool append_pk_column)
Expand Down Expand Up @@ -1561,11 +1666,101 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti
return exchange_sender;
}


ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema)
{
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema);
return exchange_receiver;
}

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame)
{
std::vector<ASTPtr> partition_columns;
if (partition_by_expr_list != nullptr)
{
for (const auto & child : partition_by_expr_list->children)
{
auto * elem = typeid_cast<ASTPartitionByElement *>(child.get());
if (!elem)
throw Exception("Invalid partition by element", ErrorCodes::LOGICAL_ERROR);
partition_columns.push_back(child);
compileExpr(input->output_schema, elem->children[0]);
}
}

std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
{
for (const auto & child : order_by_expr_list->children)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
order_columns.push_back(child);
compileExpr(input->output_schema, elem->children[0]);
}
}

DAGSchema output_schema;
output_schema.insert(output_schema.end(), input->output_schema.begin(), input->output_schema.end());

std::vector<ASTPtr> window_exprs;
if (func_desc_list != nullptr)
{
for (const auto & expr : func_desc_list->children)
{
const auto * func = typeid_cast<const ASTFunction *>(expr.get());
window_exprs.push_back(expr);
std::vector<TiDB::ColumnInfo> children_ci;
for (const auto & arg : func->arguments->children)
{
children_ci.push_back(compileExpr(input->output_schema, arg));
}
// TODO: add more window functions
TiDB::ColumnInfo ci;
switch (window_func_name_to_sig[func->name])
{
case tipb::ExprType::RowNumber:
case tipb::ExprType::Rank:
case tipb::ExprType::DenseRank:
{
ci.tp = TiDB::TypeLongLong;
ci.flag = TiDB::ColumnFlagBinary;
break;
}
default:
throw Exception(fmt::format("Unsupported window function {}", func->name), ErrorCodes::LOGICAL_ERROR);
}
output_schema.emplace_back(std::make_pair(func->getColumnName(), ci));
}
}

ExecutorPtr window = std::make_shared<mock::Window>(
executor_index,
output_schema,
window_exprs,
std::move(partition_columns),
std::move(order_columns),
frame);
window->children.push_back(input);
return window;
}

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort)
{
std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
{
for (const auto & child : order_by_expr_list->children)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
order_columns.push_back(child);
compileExpr(input->output_schema, elem->children[0]);
}
}
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort);
sort->children.push_back(input);
return sort;
}
} // namespace DB
56 changes: 54 additions & 2 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Debug/MockTiDB.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST.h>
#include <Parsers/ParserSelectQuery.h>
Expand All @@ -28,6 +29,8 @@
#include <Storages/Transaction/Types.h>
#include <tipb/select.pb.h>

#include <optional>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -272,6 +275,54 @@ struct Join : Executor

void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map) override;
};

using MockWindowFrameBound = std::tuple<tipb::WindowBoundType, bool, UInt64>;

struct MockWindowFrame
{
std::optional<tipb::WindowFrameType> type;
std::optional<MockWindowFrameBound> start;
std::optional<MockWindowFrameBound> end;
// TODO: support calcFuncs
};

struct Window : Executor
{
std::vector<ASTPtr> func_descs;
std::vector<ASTPtr> partition_by_exprs;
std::vector<ASTPtr> order_by_exprs;
MockWindowFrame frame;

Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_)
: Executor(index_, "window_" + std::to_string(index_), output_schema_)
, func_descs(std::move(func_descs_))
, partition_by_exprs(std::move(partition_by_exprs_))
, order_by_exprs(order_by_exprs_)
, frame(frame_)
{
}
// Currently only use Window Executor in Unit Test which don't call columnPrume.
// TODO: call columnPrune in unit test and further benchmark test to eliminate compute process.
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};

struct Sort : Executor
{
std::vector<ASTPtr> by_exprs;
bool is_partial_sort;

Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_)
: Executor(index_, "sort_" + std::to_string(index_), output_schema_)
, by_exprs(by_exprs_)
, is_partial_sort(is_partial_sort_)
{
}
// Currently only use Sort Executor in Unit Test which don't call columnPrume.
// TODO: call columnPrune in unit test and further benchmark test to eliminate compute process.
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};
} // namespace mock

using ExecutorPtr = std::shared_ptr<mock::Executor>;
Expand All @@ -294,8 +345,9 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema);

void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id);
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame);

//TODO: add compileWindow
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort);

void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id);
} // namespace DB
4 changes: 1 addition & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,9 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendCastAfterWindow(
const ExpressionActionsPtr & actions,
const tipb::Window & window,
const size_t window_columns_start_index);
size_t window_columns_start_index);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
NamesAndTypes buildOrderColumns(
const ExpressionActionsPtr & actions,
const ::google::protobuf::RepeatedPtrField<tipb::ByItem> & order_by);
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ class DAGQueryBlockInterpreter

BlockInputStreams execute();

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
void executeImpl(DAGPipeline & pipeline);
void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
Expand Down
Loading

0 comments on commit f4c2e01

Please sign in to comment.