From 14c57400d89cad85753bb2b68bcb2c49854074db Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Mon, 14 Feb 2022 18:09:38 +0800 Subject: [PATCH 1/2] set request schema version when do remote read (#4021) --- dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index b5a1928c063..6de5ba0e44c 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -898,6 +898,7 @@ void DAGQueryBlockInterpreter::executeRemoteQueryImpl( dag_req.SerializeToString(&(req->data)); req->tp = pingcap::coprocessor::ReqType::DAG; req->start_ts = context.getSettingsRef().read_tso; + req->schema_version = context.getSettingsRef().schema_version; bool has_enforce_encode_type = dag_req.has_force_encode_type() && dag_req.force_encode_type(); pingcap::kv::Cluster * cluster = context.getTMTContext().getKVCluster(); From 6d05b3a1c25a8a171ba0a57d1e6b057829d8c951 Mon Sep 17 00:00:00 2001 From: JaySon Date: Mon, 14 Feb 2022 19:27:38 +0800 Subject: [PATCH 2/2] Fix calling virtual function in ctor (#4022) --- dbms/src/DataStreams/AggregatingBlockInputStream.h | 8 ++++---- dbms/src/DataStreams/ConcatBlockInputStream.h | 12 ++++++------ dbms/src/DataStreams/ExpressionBlockInputStream.cpp | 7 +------ dbms/src/DataStreams/ExpressionBlockInputStream.h | 3 ++- dbms/src/DataStreams/FilterBlockInputStream.cpp | 9 +-------- dbms/src/DataStreams/FilterBlockInputStream.h | 4 +++- .../src/DataStreams/HashJoinBuildBlockInputStream.h | 7 ++++--- dbms/src/DataStreams/LimitBlockInputStream.cpp | 2 +- dbms/src/DataStreams/LimitBlockInputStream.h | 4 +++- .../DataStreams/MergeSortingBlockInputStream.cpp | 12 ++++++------ dbms/src/DataStreams/MergeSortingBlockInputStream.h | 8 ++++++-- .../ParallelAggregatingBlockInputStream.cpp | 6 +++--- .../ParallelAggregatingBlockInputStream.h | 4 +++- .../DataStreams/PartialSortingBlockInputStream.h | 8 ++++---- dbms/src/DataStreams/SharedQueryBlockInputStream.h | 6 ++++-- dbms/src/DataStreams/SquashingBlockInputStream.cpp | 2 +- dbms/src/DataStreams/SquashingBlockInputStream.h | 4 +++- dbms/src/DataStreams/UnionBlockInputStream.h | 5 +++-- .../DeltaMerge/DMSegmentThreadInputStream.h | 13 ++++++++----- 19 files changed, 66 insertions(+), 58 deletions(-) diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.h b/dbms/src/DataStreams/AggregatingBlockInputStream.h index 560aa86b560..51b2e16a7fc 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.h @@ -17,6 +17,8 @@ namespace DB */ class AggregatingBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "Aggregating"; + public: /** keys are taken from the GROUP BY part of the query * Aggregate functions are searched everywhere in the expression. @@ -28,7 +30,7 @@ class AggregatingBlockInputStream : public IProfilingBlockInputStream const FileProviderPtr & file_provider_, bool final_, const LogWithPrefixPtr & log_) - : log(getMPPTaskLog(log_, name)) + : log(getMPPTaskLog(log_, NAME)) , params(params_) , aggregator(params, log) , file_provider{file_provider_} @@ -37,9 +39,7 @@ class AggregatingBlockInputStream : public IProfilingBlockInputStream children.push_back(input); } - static constexpr auto name = "Aggregating"; - - String getName() const override { return name; } + String getName() const override { return NAME; } Block getHeader() const override; diff --git a/dbms/src/DataStreams/ConcatBlockInputStream.h b/dbms/src/DataStreams/ConcatBlockInputStream.h index 270daeb6c08..f8fa022b610 100644 --- a/dbms/src/DataStreams/ConcatBlockInputStream.h +++ b/dbms/src/DataStreams/ConcatBlockInputStream.h @@ -12,25 +12,25 @@ namespace DB */ class ConcatBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "Concat"; + public: ConcatBlockInputStream(BlockInputStreams inputs_, const LogWithPrefixPtr & log_) - : log(getMPPTaskLog(log_, name)) + : log(getMPPTaskLog(log_, NAME)) { children.insert(children.end(), inputs_.begin(), inputs_.end()); current_stream = children.begin(); } - static constexpr auto name = "Concat"; - - String getName() const override { return name; } + String getName() const override { return NAME; } Block getHeader() const override { return children.at(0)->getHeader(); } protected: Block readImpl() override { - FilterPtr filter_; - return readImpl(filter_, false); + FilterPtr filter_ignored; + return readImpl(filter_ignored, false); } Block readImpl(FilterPtr & res_filter, bool return_filter) override diff --git a/dbms/src/DataStreams/ExpressionBlockInputStream.cpp b/dbms/src/DataStreams/ExpressionBlockInputStream.cpp index 56ce9acae9a..d88913950df 100644 --- a/dbms/src/DataStreams/ExpressionBlockInputStream.cpp +++ b/dbms/src/DataStreams/ExpressionBlockInputStream.cpp @@ -7,16 +7,11 @@ namespace DB { ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const LogWithPrefixPtr & log_) : expression(expression_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { children.push_back(input); } -String ExpressionBlockInputStream::getName() const -{ - return "Expression"; -} - Block ExpressionBlockInputStream::getTotals() { if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) diff --git a/dbms/src/DataStreams/ExpressionBlockInputStream.h b/dbms/src/DataStreams/ExpressionBlockInputStream.h index 06b2707e938..954610e9b2a 100644 --- a/dbms/src/DataStreams/ExpressionBlockInputStream.h +++ b/dbms/src/DataStreams/ExpressionBlockInputStream.h @@ -16,6 +16,7 @@ class ExpressionBlockInputStream : public IProfilingBlockInputStream { private: using ExpressionActionsPtr = std::shared_ptr; + static constexpr auto NAME = "Expression"; public: ExpressionBlockInputStream( @@ -23,7 +24,7 @@ class ExpressionBlockInputStream : public IProfilingBlockInputStream const ExpressionActionsPtr & expression_, const LogWithPrefixPtr & log); - String getName() const override; + String getName() const override { return NAME; } Block getTotals() override; Block getHeader() const override; diff --git a/dbms/src/DataStreams/FilterBlockInputStream.cpp b/dbms/src/DataStreams/FilterBlockInputStream.cpp index e78e899153d..47d85d882a2 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/FilterBlockInputStream.cpp @@ -22,7 +22,7 @@ FilterBlockInputStream::FilterBlockInputStream( const String & filter_column_name, const LogWithPrefixPtr & log_) : expression(expression_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { children.push_back(input); @@ -45,13 +45,6 @@ FilterBlockInputStream::FilterBlockInputStream( } } - -String FilterBlockInputStream::getName() const -{ - return "Filter"; -} - - Block FilterBlockInputStream::getTotals() { if (IProfilingBlockInputStream * child = dynamic_cast(&*children.back())) diff --git a/dbms/src/DataStreams/FilterBlockInputStream.h b/dbms/src/DataStreams/FilterBlockInputStream.h index 11762282370..a609011dee3 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.h +++ b/dbms/src/DataStreams/FilterBlockInputStream.h @@ -15,6 +15,8 @@ class ExpressionActions; */ class FilterBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "Filter"; + private: using ExpressionActionsPtr = std::shared_ptr; @@ -25,7 +27,7 @@ class FilterBlockInputStream : public IProfilingBlockInputStream const String & filter_column_name_, const LogWithPrefixPtr & log_); - String getName() const override; + String getName() const override { return NAME; } Block getTotals() override; Block getHeader() const override; diff --git a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h index 8f1f0acb941..25c97c60aaa 100644 --- a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.h @@ -9,6 +9,8 @@ namespace DB { class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "HashJoinBuildBlockInputStream"; + public: HashJoinBuildBlockInputStream( const BlockInputStreamPtr & input, @@ -16,13 +18,12 @@ class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream size_t stream_index_, const LogWithPrefixPtr & log_) : stream_index(stream_index_) - , log(getMPPTaskLog(log_, name)) + , log(getMPPTaskLog(log_, NAME)) { children.push_back(input); join = join_; } - static constexpr auto name = "HashJoinBuildBlockInputStream"; - String getName() const override { return name; } + String getName() const override { return NAME; } Block getHeader() const override { return children.back()->getHeader(); } protected: diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index 403272f16c0..c205aa91982 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -14,7 +14,7 @@ LimitBlockInputStream::LimitBlockInputStream( : limit(limit_) , offset(offset_) , always_read_till_end(always_read_till_end_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { children.push_back(input); } diff --git a/dbms/src/DataStreams/LimitBlockInputStream.h b/dbms/src/DataStreams/LimitBlockInputStream.h index ff149267320..dee386f2f78 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.h +++ b/dbms/src/DataStreams/LimitBlockInputStream.h @@ -10,6 +10,8 @@ namespace DB */ class LimitBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "Limit"; + public: /** If always_read_till_end = false (by default), then after reading enough data, * returns an empty block, and this causes the query to be canceled. @@ -23,7 +25,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream const LogWithPrefixPtr & log_, bool always_read_till_end_ = false); - String getName() const override { return "Limit"; } + String getName() const override { return NAME; } Block getHeader() const override { return children.at(0)->getHeader(); } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 873d70e6c8a..bc85ed71b41 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -74,7 +74,7 @@ MergeSortingBlockInputStream::MergeSortingBlockInputStream( , limit(limit_) , max_bytes_before_external_sort(max_bytes_before_external_sort_) , tmp_path(tmp_path_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { children.push_back(input); header = children.at(0)->getHeader(); @@ -184,7 +184,7 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( , description(description_) , max_merged_block_size(max_merged_block_size_) , limit(limit_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { Blocks nonempty_blocks; for (const auto & block : blocks) @@ -201,13 +201,13 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( if (!has_collation) { - for (size_t i = 0; i < cursors.size(); ++i) - queue.push(SortCursor(&cursors[i])); + for (auto & cursor : cursors) + queue.push(SortCursor(&cursor)); } else { - for (size_t i = 0; i < cursors.size(); ++i) - queue_with_collation.push(SortCursorWithCollation(&cursors[i])); + for (auto & cursor : cursors) + queue_with_collation.push(SortCursorWithCollation(&cursor)); } } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index ccfaafa5199..a643b595ae3 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -24,6 +24,8 @@ namespace DB */ class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "MergeSortingBlocks"; + public: /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlocksBlockInputStream( @@ -33,7 +35,7 @@ class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream size_t max_merged_block_size_, size_t limit_ = 0); - String getName() const override { return "MergeSortingBlocks"; } + String getName() const override { return NAME; } bool isGroupedOutput() const override { return true; } bool isSortedOutput() const override { return true; } @@ -71,6 +73,8 @@ class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream class MergeSortingBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "MergeSorting"; + public: /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream( @@ -82,7 +86,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream const std::string & tmp_path_, const LogWithPrefixPtr & log_); - String getName() const override { return "MergeSorting"; } + String getName() const override { return NAME; } bool isGroupedOutput() const override { return true; } bool isSortedOutput() const override { return true; } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index e0f66883851..7705ef8847a 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -21,7 +21,7 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream( size_t max_threads_, size_t temporary_data_merge_threads_, const LogWithPrefixPtr & log_) - : log(getMPPTaskLog(log_, getName())) + : log(getMPPTaskLog(log_, NAME)) , params(params_) , aggregator(params, log) , file_provider(file_provider_) @@ -156,7 +156,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_ if (data.isConvertibleToTwoLevel()) data.convertToTwoLevel(); - if (data.size()) + if (!data.empty()) parent.aggregator.writeToTemporaryFile(data, parent.file_provider); } } @@ -172,7 +172,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() if (data->isConvertibleToTwoLevel()) data->convertToTwoLevel(); - if (data->size()) + if (!data->empty()) parent.aggregator.writeToTemporaryFile(*data, parent.file_provider); } } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 5f539005a1f..02be1aeff5b 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -17,6 +17,8 @@ namespace DB */ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "ParallelAggregating"; + public: /** Columns from key_names and arguments of aggregate functions must already be computed. */ @@ -30,7 +32,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream size_t temporary_data_merge_threads_, const LogWithPrefixPtr & log_); - String getName() const override { return "ParallelAggregating"; } + String getName() const override { return NAME; } void cancel(bool kill) override; diff --git a/dbms/src/DataStreams/PartialSortingBlockInputStream.h b/dbms/src/DataStreams/PartialSortingBlockInputStream.h index 06e8fa2f98a..b556b6791cf 100644 --- a/dbms/src/DataStreams/PartialSortingBlockInputStream.h +++ b/dbms/src/DataStreams/PartialSortingBlockInputStream.h @@ -12,6 +12,8 @@ namespace DB */ class PartialSortingBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "PartialSorting"; + public: /// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order. PartialSortingBlockInputStream( @@ -21,14 +23,12 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream size_t limit_ = 0) : description(description_) , limit(limit_) - , log(getMPPTaskLog(log_, name)) + , log(getMPPTaskLog(log_, NAME)) { children.push_back(input_); } - static constexpr auto name = "PartialSorting"; - - String getName() const override { return name; } + String getName() const override { return NAME; } bool isGroupedOutput() const override { return true; } bool isSortedOutput() const override { return true; } diff --git a/dbms/src/DataStreams/SharedQueryBlockInputStream.h b/dbms/src/DataStreams/SharedQueryBlockInputStream.h index e3091958487..10fd43d5ec2 100644 --- a/dbms/src/DataStreams/SharedQueryBlockInputStream.h +++ b/dbms/src/DataStreams/SharedQueryBlockInputStream.h @@ -16,10 +16,12 @@ namespace DB */ class SharedQueryBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "SharedQuery"; + public: SharedQueryBlockInputStream(size_t clients, const BlockInputStreamPtr & in_, const LogWithPrefixPtr & log_) : queue(clients) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) , in(in_) { children.push_back(in); @@ -38,7 +40,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream } } - String getName() const override { return "SharedQuery"; } + String getName() const override { return NAME; } Block getHeader() const override { return children.back()->getHeader(); } diff --git a/dbms/src/DataStreams/SquashingBlockInputStream.cpp b/dbms/src/DataStreams/SquashingBlockInputStream.cpp index bf0ff127e72..4623aff9a9e 100644 --- a/dbms/src/DataStreams/SquashingBlockInputStream.cpp +++ b/dbms/src/DataStreams/SquashingBlockInputStream.cpp @@ -9,7 +9,7 @@ SquashingBlockInputStream::SquashingBlockInputStream( size_t min_block_size_rows, size_t min_block_size_bytes, const LogWithPrefixPtr & log_) - : log(getMPPTaskLog(log_, getName())) + : log(getMPPTaskLog(log_, NAME)) , transform(min_block_size_rows, min_block_size_bytes, log) { children.emplace_back(src); diff --git a/dbms/src/DataStreams/SquashingBlockInputStream.h b/dbms/src/DataStreams/SquashingBlockInputStream.h index e510220c0fe..3f3ff0349a5 100644 --- a/dbms/src/DataStreams/SquashingBlockInputStream.h +++ b/dbms/src/DataStreams/SquashingBlockInputStream.h @@ -10,6 +10,8 @@ namespace DB */ class SquashingBlockInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "Squashing"; + public: SquashingBlockInputStream( const BlockInputStreamPtr & src, @@ -17,7 +19,7 @@ class SquashingBlockInputStream : public IProfilingBlockInputStream size_t min_block_size_bytes, const LogWithPrefixPtr & log_); - String getName() const override { return "Squashing"; } + String getName() const override { return NAME; } Block getHeader() const override { return children.at(0)->getHeader(); } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 3aac82f8ada..7677d9c2b62 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -76,6 +76,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream private: using Self = UnionBlockInputStream; + static constexpr auto NAME = "Union"; public: UnionBlockInputStream( @@ -88,7 +89,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream , handler(*this) , processor(inputs, additional_input_at_end, max_threads, handler) , exception_callback(exception_callback_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { children = inputs; if (additional_input_at_end) @@ -103,7 +104,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream } } - String getName() const override { return "Union"; } + String getName() const override { return NAME; } ~UnionBlockInputStream() override { diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index c08edca263f..87e2ddb6ab4 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -21,6 +22,8 @@ using RSOperatorPtr = std::shared_ptr; class DMSegmentThreadInputStream : public IProfilingBlockInputStream { + static constexpr auto NAME = "DeltaMergeSegmentThread"; + public: /// If handle_real_type_ is empty, means do not convert handle column back to real type. DMSegmentThreadInputStream( @@ -44,18 +47,18 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream , expected_block_size(expected_block_size_) , is_raw(is_raw_) , do_range_filter_for_raw(do_range_filter_for_raw_) - , log(getMPPTaskLog(log_, getName())) + , log(getMPPTaskLog(log_, NAME)) { } - String getName() const override { return "DeltaMergeSegmentThread"; } + String getName() const override { return NAME; } Block getHeader() const override { return header; } protected: Block readImpl() override { - FilterPtr filter_; - return readImpl(filter_, false); + FilterPtr filter_ignored; + return readImpl(filter_ignored, false); } Block readImpl(FilterPtr & res_filter, bool return_filter) override @@ -88,7 +91,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream task->ranges, filter, max_version, - std::max(expected_block_size, (size_t)(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows))); + std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows))); } LOG_FMT_TRACE(log, "Start to read segment [{}]", cur_segment->segmentId()); }