Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix calling virtual function in ctor (#4022) #4029

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/AggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace DB
*/
class AggregatingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Aggregating";

public:
/** keys are taken from the GROUP BY part of the query
* Aggregate functions are searched everywhere in the expression.
Expand All @@ -28,7 +30,7 @@ class AggregatingBlockInputStream : public IProfilingBlockInputStream
const FileProviderPtr & file_provider_,
bool final_,
const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, getName()))
: log(getMPPTaskLog(log_, NAME))
, params(params_)
, aggregator(params, log)
, file_provider{file_provider_}
Expand All @@ -37,7 +39,7 @@ class AggregatingBlockInputStream : public IProfilingBlockInputStream
children.push_back(input);
}

String getName() const override { return "Aggregating"; }
String getName() const override { return NAME; }

Block getHeader() const override;

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/DataStreams/ConcatBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@ namespace DB
*/
class ConcatBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Concat";

public:
ConcatBlockInputStream(BlockInputStreams inputs_, const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, getName()))
: log(getMPPTaskLog(log_, NAME))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
}

String getName() const override { return "Concat"; }
String getName() const override { return NAME; }

Block getHeader() const override { return children.at(0)->getHeader(); }

protected:
Block readImpl() override
{
FilterPtr filter_;
return readImpl(filter_, false);
FilterPtr filter_ignored;
return readImpl(filter_ignored, false);
}

Block readImpl(FilterPtr & res_filter, bool return_filter) override
Expand Down
7 changes: 1 addition & 6 deletions dbms/src/DataStreams/ExpressionBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@ namespace DB
{
ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const LogWithPrefixPtr & log_)
: expression(expression_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
children.push_back(input);
}

String ExpressionBlockInputStream::getName() const
{
return "Expression";
}

Block ExpressionBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/ExpressionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ class ExpressionBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
static constexpr auto NAME = "Expression";

public:
ExpressionBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & expression_,
const LogWithPrefixPtr & log);

String getName() const override;
String getName() const override { return NAME; }
Block getTotals() override;
Block getHeader() const override;

Expand Down
9 changes: 1 addition & 8 deletions dbms/src/DataStreams/FilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ FilterBlockInputStream::FilterBlockInputStream(
const String & filter_column_name,
const LogWithPrefixPtr & log_)
: expression(expression_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
children.push_back(input);

Expand All @@ -45,13 +45,6 @@ FilterBlockInputStream::FilterBlockInputStream(
}
}


String FilterBlockInputStream::getName() const
{
return "Filter";
}


Block FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/FilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class ExpressionActions;
*/
class FilterBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Filter";

private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

Expand All @@ -25,7 +27,7 @@ class FilterBlockInputStream : public IProfilingBlockInputStream
const String & filter_column_name_,
const LogWithPrefixPtr & log_);

String getName() const override;
String getName() const override { return NAME; }
Block getTotals() override;
Block getHeader() const override;

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ namespace DB
{
class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "HashJoinBuildBlockInputStream";

public:
HashJoinBuildBlockInputStream(
const BlockInputStreamPtr & input,
JoinPtr join_,
size_t stream_index_,
const LogWithPrefixPtr & log_)
: stream_index(stream_index_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
children.push_back(input);
join = join_;
}
String getName() const override { return "HashJoinBuildBlockInputStream"; }
String getName() const override { return NAME; }
Block getHeader() const override { return children.back()->getHeader(); }

protected:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ LimitBlockInputStream::LimitBlockInputStream(
: limit(limit_)
, offset(offset_)
, always_read_till_end(always_read_till_end_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
children.push_back(input);
}
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace DB
*/
class LimitBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Limit";

public:
/** If always_read_till_end = false (by default), then after reading enough data,
* returns an empty block, and this causes the query to be canceled.
Expand All @@ -23,7 +25,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
const LogWithPrefixPtr & log_,
bool always_read_till_end_ = false);

String getName() const override { return "Limit"; }
String getName() const override { return NAME; }

Block getHeader() const override { return children.at(0)->getHeader(); }

Expand Down
12 changes: 6 additions & 6 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ MergeSortingBlockInputStream::MergeSortingBlockInputStream(
, limit(limit_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_path(tmp_path_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
children.push_back(input);
header = children.at(0)->getHeader();
Expand Down Expand Up @@ -184,7 +184,7 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
Blocks nonempty_blocks;
for (const auto & block : blocks)
Expand All @@ -201,13 +201,13 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(

if (!has_collation)
{
for (size_t i = 0; i < cursors.size(); ++i)
queue.push(SortCursor(&cursors[i]));
for (auto & cursor : cursors)
queue.push(SortCursor(&cursor));
}
else
{
for (size_t i = 0; i < cursors.size(); ++i)
queue_with_collation.push(SortCursorWithCollation(&cursors[i]));
for (auto & cursor : cursors)
queue_with_collation.push(SortCursorWithCollation(&cursor));
}
}

Expand Down
8 changes: 6 additions & 2 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace DB
*/
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "MergeSortingBlocks";

public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(
Expand All @@ -33,7 +35,7 @@ class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
size_t max_merged_block_size_,
size_t limit_ = 0);

String getName() const override { return "MergeSortingBlocks"; }
String getName() const override { return NAME; }

bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
Expand Down Expand Up @@ -71,6 +73,8 @@ class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream

class MergeSortingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "MergeSorting";

public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(
Expand All @@ -82,7 +86,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream
const std::string & tmp_path_,
const LogWithPrefixPtr & log_);

String getName() const override { return "MergeSorting"; }
String getName() const override { return NAME; }

bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
size_t max_threads_,
size_t temporary_data_merge_threads_,
const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, getName()))
: log(getMPPTaskLog(log_, NAME))
, params(params_)
, aggregator(params, log)
, file_provider(file_provider_)
Expand Down Expand Up @@ -156,7 +156,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_
if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel();

if (data.size())
if (!data.empty())
parent.aggregator.writeToTemporaryFile(data, parent.file_provider);
}
}
Expand All @@ -172,7 +172,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel();

if (data->size())
if (!data->empty())
parent.aggregator.writeToTemporaryFile(*data, parent.file_provider);
}
}
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace DB
*/
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "ParallelAggregating";

public:
/** Columns from key_names and arguments of aggregate functions must already be computed.
*/
Expand All @@ -30,7 +32,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
size_t temporary_data_merge_threads_,
const LogWithPrefixPtr & log_);

String getName() const override { return "ParallelAggregating"; }
String getName() const override { return NAME; }

void cancel(bool kill) override;

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/PartialSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace DB
*/
class PartialSortingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "PartialSorting";

public:
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.
PartialSortingBlockInputStream(
Expand All @@ -21,12 +23,12 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream
size_t limit_ = 0)
: description(description_)
, limit(limit_)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
{
children.push_back(input_);
}

String getName() const override { return "PartialSorting"; }
String getName() const override { return NAME; }

bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ namespace DB
*/
class SharedQueryBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "SharedQuery";

public:
SharedQueryBlockInputStream(size_t clients, const BlockInputStreamPtr & in_, const LogWithPrefixPtr & log_)
: queue(clients)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, NAME))
, in(in_)
{
children.push_back(in);
Expand All @@ -38,7 +40,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
}

String getName() const override { return "SharedQuery"; }
String getName() const override { return NAME; }

Block getHeader() const override { return children.back()->getHeader(); }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/SquashingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ SquashingBlockInputStream::SquashingBlockInputStream(
size_t min_block_size_rows,
size_t min_block_size_bytes,
const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, getName()))
: log(getMPPTaskLog(log_, NAME))
, transform(min_block_size_rows, min_block_size_bytes, log)
{
children.emplace_back(src);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/SquashingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ namespace DB
*/
class SquashingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Squashing";

public:
SquashingBlockInputStream(
const BlockInputStreamPtr & src,
size_t min_block_size_rows,
size_t min_block_size_bytes,
const LogWithPrefixPtr & log_);

String getName() const override { return "Squashing"; }
String getName() const override { return NAME; }

Block getHeader() const override { return children.at(0)->getHeader(); }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemoteBlockInputStream({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(getMPPTaskLog(log_, getName()))
, log(getMPPTaskLog(log_, name))
, total_rows(0)
{
// generate sample block
Expand Down
Loading