Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prewhere with aliased columns. #3037

Merged
merged 11 commits into from
Sep 5, 2018
53 changes: 51 additions & 2 deletions dbms/src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ ExpressionAction ExpressionAction::project(const Names & projected_columns_)
return a;
}

ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_columns_)
{
ExpressionAction a;
a.type = ADD_ALIASES;
a.projection = aliased_columns_;
return a;
}

ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context)
{
if (array_joined_columns.empty())
Expand Down Expand Up @@ -256,6 +264,8 @@ void ExpressionAction::prepare(Block & sample_block)
const std::string & name = projection[i].first;
const std::string & alias = projection[i].second;
ColumnWithTypeAndName column = sample_block.getByName(name);
if (column.column)
column.column = (*std::move(column.column)).mutate();
if (alias != "")
column.name = alias;
new_block.insert(std::move(column));
Expand All @@ -265,6 +275,19 @@ void ExpressionAction::prepare(Block & sample_block)
break;
}

case ADD_ALIASES:
{
for (size_t i = 0; i < projection.size(); ++i)
{
const std::string & name = projection[i].first;
const std::string & alias = projection[i].second;
const ColumnWithTypeAndName & column = sample_block.getByName(name);
if (alias != "" && !sample_block.has(alias))
sample_block.insert({column.column, column.type, alias});
}
break;
}

case REMOVE_COLUMN:
{
sample_block.erase(source_name);
Expand Down Expand Up @@ -438,6 +461,8 @@ void ExpressionAction::execute(Block & block, std::unordered_map<std::string, si
const std::string & name = projection[i].first;
const std::string & alias = projection[i].second;
ColumnWithTypeAndName column = block.getByName(name);
if (column.column)
column.column = (*std::move(column.column)).mutate();
if (alias != "")
column.name = alias;
new_block.insert(std::move(column));
Expand All @@ -448,6 +473,19 @@ void ExpressionAction::execute(Block & block, std::unordered_map<std::string, si
break;
}

case ADD_ALIASES:
{
for (size_t i = 0; i < projection.size(); ++i)
{
const std::string & name = projection[i].first;
const std::string & alias = projection[i].second;
const ColumnWithTypeAndName & column = block.getByName(name);
if (alias != "" && !block.has(alias))
block.insert({column.column, column.type, alias});
}
break;
}

case REMOVE_COLUMN:
block.erase(source_name);
break;
Expand Down Expand Up @@ -529,8 +567,9 @@ std::string ExpressionAction::toString() const
}
break;

case PROJECT:
ss << "PROJECT ";
case PROJECT: [[fallthrough]];
case ADD_ALIASES:
ss << (type == PROJECT ? "PROJECT " : "ADD_ALIASES ");
for (size_t i = 0; i < projection.size(); ++i)
{
if (i)
Expand Down Expand Up @@ -786,6 +825,16 @@ void ExpressionActions::finalize(const Names & output_columns)
needed_columns = NameSet(in.begin(), in.end());
unmodified_columns.clear();
}
else if (action.type == ExpressionAction::ADD_ALIASES)
{
needed_columns.insert(in.begin(), in.end());
for (auto & name_wit_alias : action.projection)
{
auto it = unmodified_columns.find(name_wit_alias.second);
if (it != unmodified_columns.end())
unmodified_columns.erase(it);
}
}
else if (action.type == ExpressionAction::ARRAY_JOIN)
{
/// Do not ARRAY JOIN columns that are not used anymore.
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Interpreters/ExpressionActions.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ struct ExpressionAction

/// Reorder and rename the columns, delete the extra ones. The same column names are allowed in the result.
PROJECT,
/// Add columns with alias names. This columns are the same as non-aliased. PROJECT columns if you need to modify them.
ADD_ALIASES,
};

Type type;
Expand Down Expand Up @@ -106,6 +108,7 @@ struct ExpressionAction
static ExpressionAction copyColumn(const std::string & from_name, const std::string & to_name);
static ExpressionAction project(const NamesWithAliases & projected_columns_);
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);
static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context);
static ExpressionAction ordinaryJoin(std::shared_ptr<const Join> join_, const Names & join_key_names_left,
const NamesAndTypesList & columns_added_by_join_);
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2894,7 +2894,7 @@ void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, Express
}


ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result)
{
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(source_columns, settings);
NamesWithAliases result_columns;
Expand All @@ -2911,7 +2911,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
{
std::string name = asts[i]->getColumnName();
std::string alias;
if (project_result)
if (add_aliases)
alias = asts[i]->getAliasOrColumnName();
else
alias = name;
Expand All @@ -2920,11 +2920,15 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
getRootActions(asts[i], false, false, actions);
}

if (project_result)
if (add_aliases)
{
actions->add(ExpressionAction::project(result_columns));
if (project_result)
actions->add(ExpressionAction::project(result_columns));
else
actions->add(ExpressionAction::addAliases(result_columns));
}
else

if (!(add_aliases && project_result))
{
/// We will not delete the original columns.
for (const auto & column_name_type : source_columns)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ class ExpressionAnalyzer : private boost::noncopyable
void appendProjectResult(ExpressionActionsChain & chain) const;

/// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression.
/// If project_result, only the calculated values in the desired order, renamed to aliases, remain in the output block.
/// If add_aliases, only the calculated values in the desired order and add aliases.
/// If also project_result, than only aliases remain in the output block.
/// Otherwise, only temporary columns will be deleted from the block.
ExpressionActionsPtr getActions(bool project_result);
ExpressionActionsPtr getActions(bool add_aliases, bool project_result = true);

/// Actions that can be performed on an empty block: adding constants and applying functions that depend only on constants.
/// Does not execute subqueries.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/ExpressionJIT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,11 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
current_dependents[proj.first].emplace();
break;

case ExpressionAction::ADD_ALIASES:
for (const auto & proj : actions[i].projection)
current_dependents[proj.first].emplace();
break;

case ExpressionAction::ADD_COLUMN:
case ExpressionAction::COPY_COLUMN:
case ExpressionAction::ARRAY_JOIN:
Expand Down
95 changes: 90 additions & 5 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,9 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, const PrewhereInfoPtr & prewhere_info)
{

const Settings & settings = context.getSettingsRef();

/// Actions to calculate ALIAS if required.
ExpressionActionsPtr alias_actions;
/// Are ALIAS columns required for query execution?
Expand All @@ -647,26 +650,108 @@ void InterpreterSelectQuery::executeFetchColumns(

if (alias_columns_required)
{
/// Columns required for prewhere actions.
NameSet required_prewhere_columns;
/// Columns required for prewhere actions which are aliases in storage.
NameSet required_prewhere_aliases;
Block prewhere_actions_result;
if (prewhere_info)
{
auto required_columns = prewhere_info->prewhere_actions->getRequiredColumns();
required_prewhere_columns.insert(required_columns.begin(), required_columns.end());
prewhere_actions_result = prewhere_info->prewhere_actions->getSampleBlock();
}

/// We will create an expression to return all the requested columns, with the calculation of the required ALIAS columns.
auto required_columns_expr_list = std::make_shared<ASTExpressionList>();
/// Separate expression for columns used in prewhere.
auto required_prewhere_columns_expr_list = std::make_shared<ASTExpressionList>();

/// Columns which we will get after prewhere execution.
auto source_columns = storage->getColumns().getAllPhysical();

for (const auto & column : required_columns)
{
ASTPtr column_expr;
const auto default_it = column_defaults.find(column);
if (default_it != std::end(column_defaults) && default_it->second.kind == ColumnDefaultKind::Alias)
required_columns_expr_list->children.emplace_back(setAlias(default_it->second.expression->clone(), column));
bool is_alias = default_it != std::end(column_defaults) && default_it->second.kind == ColumnDefaultKind::Alias;
if (is_alias)
column_expr = setAlias(default_it->second.expression->clone(), column);
else
required_columns_expr_list->children.emplace_back(std::make_shared<ASTIdentifier>(column));
column_expr = std::make_shared<ASTIdentifier>(column);

if (required_prewhere_columns.count(column))
{
required_prewhere_columns_expr_list->children.emplace_back(std::move(column_expr));

if (is_alias)
required_prewhere_aliases.insert(column);
}
else
required_columns_expr_list->children.emplace_back(std::move(column_expr));
}

/// Add columns which will be added by prewhere (otherwise we will remove them in project action).
for (const auto & column : prewhere_actions_result)
{
if (prewhere_info->remove_prewhere_column && column.name == prewhere_info->prewhere_column_name)
continue;

required_columns_expr_list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
source_columns.emplace_back(column.name, column.type);
}

alias_actions = ExpressionAnalyzer(required_columns_expr_list, context, storage).getActions(true);
alias_actions = ExpressionAnalyzer(required_columns_expr_list, context, nullptr, source_columns).getActions(true);

/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
required_columns = alias_actions->getRequiredColumns();

/// Do not remove prewhere filter if it is a column which is used as alias.
if (prewhere_info && prewhere_info->remove_prewhere_column)
if (required_columns.end()
!= std::find(required_columns.begin(), required_columns.end(), prewhere_info->prewhere_column_name))
prewhere_info->remove_prewhere_column = false;

/// Remove columns which will be added by prewhere.
for (size_t i = 0; i < required_columns.size(); ++i)
{
if (!storage->getColumns().hasPhysical(required_columns[i]))
{
std::swap(required_columns[i], required_columns.back());
required_columns.pop_back();
}
}

if (prewhere_info)
{
/// Don't remove columns which are needed to be aliased.
auto new_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions->getRequiredColumnsWithTypes(), settings);
for (const auto & action : prewhere_info->prewhere_actions->getActions())
{
if (action.type != ExpressionAction::REMOVE_COLUMN
|| required_columns.end() == std::find(required_columns.begin(), required_columns.end(), action.source_name))
new_actions->add(action);
}
prewhere_info->prewhere_actions = std::move(new_actions);

prewhere_info->alias_actions = ExpressionAnalyzer(required_prewhere_columns_expr_list, context, storage).getActions(true, false);

/// Add columns required by alias actions.
auto required_aliased_columns = prewhere_info->alias_actions->getRequiredColumns();
for (auto & column : required_aliased_columns)
if (!prewhere_actions_result.has(column))
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);

/// Add columns required by prewhere actions.
for (const auto & column : required_prewhere_columns)
if (required_prewhere_aliases.count(column) == 0)
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);
}
}
}

const Settings & settings = context.getSettingsRef();

/// Limitation on the number of columns to read.
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,28 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (reader->getColumns().empty())
{
task->range_reader = MergeTreeRangeReader(
pre_reader.get(), index_granularity, nullptr, prewhere_info->prewhere_actions,
pre_reader.get(), index_granularity, nullptr,
prewhere_info->alias_actions, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, true);
}
else
{
task->pre_range_reader = MergeTreeRangeReader(
pre_reader.get(), index_granularity, nullptr, prewhere_info->prewhere_actions,
pre_reader.get(), index_granularity, nullptr,
prewhere_info->alias_actions, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, false);

task->range_reader = MergeTreeRangeReader(
reader.get(), index_granularity, &task->pre_range_reader, nullptr,
reader.get(), index_granularity, &task->pre_range_reader, nullptr, nullptr,
nullptr, &task->ordered_names, true, false, true);
}
}
else
{
task->range_reader = MergeTreeRangeReader(
reader.get(), index_granularity, nullptr, nullptr,
reader.get(), index_granularity, nullptr, nullptr, nullptr,
nullptr, &task->ordered_names, task->should_reorder, false, true);
}
}
Expand Down Expand Up @@ -218,6 +220,9 @@ void MergeTreeBaseBlockInputStream::executePrewhereActions(Block & block, const
{
if (prewhere_info)
{
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(block);

prewhere_info->prewhere_actions->execute(block);
if (prewhere_info->remove_prewhere_column)
block.erase(prewhere_info->prewhere_column_name);
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ try

if (prewhere_info)
{
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
else
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();

if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,13 @@ void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)


MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader * merge_tree_reader, size_t index_granularity,
MergeTreeRangeReader * prev_reader, ExpressionActionsPtr prewhere_actions,
MergeTreeReader * merge_tree_reader, size_t index_granularity, MergeTreeRangeReader * prev_reader,
ExpressionActionsPtr alias_actions, ExpressionActionsPtr prewhere_actions,
const String * prewhere_column_name, const Names * ordered_names,
bool always_reorder, bool remove_prewhere_column, bool last_reader_in_chain)
: index_granularity(index_granularity), merge_tree_reader(merge_tree_reader)
, prev_reader(prev_reader), prewhere_column_name(prewhere_column_name)
, ordered_names(ordered_names), prewhere_actions(std::move(prewhere_actions))
, ordered_names(ordered_names), alias_actions(alias_actions), prewhere_actions(std::move(prewhere_actions))
, always_reorder(always_reorder), remove_prewhere_column(remove_prewhere_column)
, last_reader_in_chain(last_reader_in_chain), is_initialized(true)
{
Expand Down Expand Up @@ -571,6 +571,9 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (!prewhere_actions)
return;

if (alias_actions)
alias_actions->execute(result.block);

prewhere_actions->execute(result.block);
auto & prewhere_column = result.block.getByName(*prewhere_column_name);
size_t prev_rows = result.block.rows();
Expand Down
Loading