Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interpreter: Print BlockInputStream #4911

Merged
merged 55 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8e17622
init.
ywqzzy Apr 27, 2022
c8fd369
fix.
ywqzzy Apr 27, 2022
4a29a27
Merge branch 'master' of https://github.com/pingcap/tiflash into run_…
ywqzzy Apr 28, 2022
491dafd
fix restoreConcurrency.
ywqzzy Apr 28, 2022
fbd66bb
some tests
ywqzzy Apr 28, 2022
b5e5614
format.
ywqzzy Apr 28, 2022
99830e3
fix build fail.
ywqzzy Apr 28, 2022
347bf8a
format, more tests.
ywqzzy Apr 28, 2022
a7d4602
clean
ywqzzy Apr 28, 2022
9a7b988
refactor
ywqzzy Apr 28, 2022
781489e
getColumnsFromTableScan utils and clean code.
ywqzzy Apr 28, 2022
283639e
refactor gencolumn related functions.
ywqzzy Apr 29, 2022
7dbd5c5
remove useless cout.
ywqzzy Apr 29, 2022
120a983
all user choose stream size.
ywqzzy Apr 29, 2022
0ce3668
address comments.
ywqzzy Apr 29, 2022
02f3509
rename.
ywqzzy Apr 29, 2022
3dad372
rename.
ywqzzy May 5, 2022
4910d32
address comments.
ywqzzy May 5, 2022
cc81990
clean
ywqzzy May 5, 2022
fc46cd5
address comments.
ywqzzy May 5, 2022
b23f514
address comments
ywqzzy May 5, 2022
0216daa
Merge branch 'master' of https://github.com/pingcap/tiflash into run_…
ywqzzy May 5, 2022
927234c
address comments.
ywqzzy May 5, 2022
2639b7f
format.
ywqzzy May 5, 2022
eca8746
address comment.
ywqzzy May 5, 2022
eb50f1e
Merge branch 'master' of https://github.com/pingcap/tiflash into run_…
ywqzzy May 5, 2022
589263d
fix.
ywqzzy May 6, 2022
caf6b73
init.
ywqzzy May 9, 2022
0874ad5
update.
ywqzzy May 9, 2022
dc82bcb
update.
ywqzzy May 17, 2022
a701427
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 17, 2022
beb504c
update
ywqzzy May 17, 2022
e69cd5c
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 17, 2022
a5cc584
refine.
ywqzzy May 17, 2022
948d42d
tiny refine.
ywqzzy May 17, 2022
3263384
update.
ywqzzy May 18, 2022
6364a96
tiny refine.
ywqzzy May 18, 2022
baa5708
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 18, 2022
ec10a47
update.
ywqzzy May 19, 2022
5876a9b
little refine.
ywqzzy May 19, 2022
abf9c72
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 20, 2022
83bb9d1
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 23, 2022
428f147
address comments.
ywqzzy May 23, 2022
2c569c4
fix clang-tidy.
ywqzzy May 24, 2022
a2b3ab1
Merge branch 'master' into print_inputstream
ywqzzy May 24, 2022
c537d6e
fix clang-tidy.
ywqzzy May 24, 2022
cadb179
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 24, 2022
c1460d0
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 24, 2022
6f7f79c
address comments.
ywqzzy May 24, 2022
e35bb19
Merge branch 'master' into print_inputstream
ywqzzy May 25, 2022
985be0f
address comments
ywqzzy May 25, 2022
7c24b38
refine.
ywqzzy May 25, 2022
1a913f0
address comments.
ywqzzy May 25, 2022
757d130
Merge branch 'master' of https://github.com/pingcap/tiflash into prin…
ywqzzy May 25, 2022
e8fda6f
fix bug.
ywqzzy May 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/FilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ FilterBlockInputStream::FilterBlockInputStream(

Block FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.


#include <Common/FmtUtils.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
namespace DB
{
Expand All @@ -25,4 +26,26 @@ Block HashJoinBuildBlockInputStream::readImpl()
return block;
}

void HashJoinBuildBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
static const std::unordered_map<ASTTableJoin::Kind, String> join_type_map{
{ASTTableJoin::Kind::Inner, "Inner"},
{ASTTableJoin::Kind::Left, "Left"},
{ASTTableJoin::Kind::Right, "Right"},
{ASTTableJoin::Kind::Full, "Full"},
{ASTTableJoin::Kind::Cross, "Cross"},
{ASTTableJoin::Kind::Comma, "Comma"},
{ASTTableJoin::Kind::Anti, "Anti"},
{ASTTableJoin::Kind::LeftSemi, "Left_Semi"},
{ASTTableJoin::Kind::LeftAnti, "Left_Anti"},
{ASTTableJoin::Kind::Cross_Left, "Cross_Left"},
{ASTTableJoin::Kind::Cross_Right, "Cross_Right"},
{ASTTableJoin::Kind::Cross_Anti, "Cross_Anti"},
{ASTTableJoin::Kind::Cross_LeftSemi, "Cross_LeftSemi"},
{ASTTableJoin::Kind::Cross_LeftAnti, "Cross_LeftAnti"}};
auto join_type_it = join_type_map.find(join->getKind());
if (join_type_it == join_type_map.end())
throw TiFlashException("Unknown join type", Errors::Coprocessor::Internal);
buffer.fmtAppend(", join_kind = {}", join_type_it->second);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
JoinPtr join;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/IBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
return res + 1;
}


void IBlockInputStream::dumpTree(FmtBuffer & buffer, size_t indent, size_t multiplier)
{
// todo append getHeader().dumpStructure()
buffer.fmtAppend(
"{}{}{}\n",
"{}{}{}",
String(indent, ' '),
getName(),
multiplier > 1 ? fmt::format(" x {}", multiplier) : "");
if (!extra_info.empty())
buffer.fmtAppend(": <{}>", extra_info);
appendInfo(buffer);
buffer.append("\n");
++indent;

/// If the subtree is repeated several times, then we output it once with the multiplier.
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class IBlockInputStream : private boost::noncopyable
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

void setExtraInfo(String info) { extra_info = info; }

template <typename F>
void forEachChild(F && f)
Expand Down Expand Up @@ -176,6 +177,8 @@ class IBlockInputStream : private boost::noncopyable
}
}

virtual void appendInfo(FmtBuffer & /*buffer*/) const {};

protected:
BlockInputStreams children;
mutable std::shared_mutex children_mutex;
Expand All @@ -188,6 +191,9 @@ class IBlockInputStream : private boost::noncopyable
mutable std::mutex tree_id_mutex;
mutable String tree_id;

/// The info that hints why the inputStream is needed to run.
String extra_info;

/// Get text with names of this source and the entire subtree, this function should only be called after the
/// InputStream tree is constructed.
String getTreeID() const;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ Block LimitBlockInputStream::readImpl()
return res;
}

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
size_t limit;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,5 +287,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
return blocks[0].cloneWithColumns(std::move(merged_columns));
}

void MergeSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
SortDescription description;
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.

#include <Common/ClickHouseRevision.h>
#include <Common/FmtUtils.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
Expand Down Expand Up @@ -275,4 +278,9 @@ void ParallelAggregatingBlockInputStream::execute()
no_more_keys);
}

void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", max_threads: {}, final: {}", max_threads, final ? "true" : "false");
}

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream

Block getHeader() const override;

virtual void collectNewThreadCountOfThisLevel(int & cnt) override
void collectNewThreadCountOfThisLevel(int & cnt) override
{
cnt += processor.getMaxThreads();
}
Expand All @@ -62,6 +62,8 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
}

Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;


private:
const LoggerPtr log;
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/DataStreams/PartialSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/sortBlock.h>

#include <Common/FmtUtils.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <Interpreters/sortBlock.h>


namespace DB
{


Block PartialSortingBlockInputStream::readImpl()
{
Block res = children.back()->read();
sortBlock(res, description, limit);
return res;
}


void PartialSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(": limit = {}", limit);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/PartialSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
SortDescription description;
Expand Down
28 changes: 21 additions & 7 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FmtUtils.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
Expand Down Expand Up @@ -60,11 +61,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (auto & execution_summary : resp.execution_summaries())
for (const auto & execution_summary : resp.execution_summaries())
{
if (execution_summary.has_executor_id())
{
auto & executor_id = execution_summary.executor_id();
const auto & executor_id = execution_summary.executor_id();
execution_summaries[index][executor_id].time_processed_ns = execution_summary.time_processed_ns();
execution_summaries[index][executor_id].num_produced_rows = execution_summary.num_produced_rows();
execution_summaries[index][executor_id].num_iterations = execution_summary.num_iterations();
Expand All @@ -84,11 +85,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return;
}
auto & execution_summaries_map = execution_summaries[index];
for (auto & execution_summary : resp.execution_summaries())
for (const auto & execution_summary : resp.execution_summaries())
{
if (execution_summary.has_executor_id())
{
auto & executor_id = execution_summary.executor_id();
const auto & executor_id = execution_summary.executor_id();
if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end()))
{
LOG_FMT_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id);
Expand Down Expand Up @@ -224,12 +225,12 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
bool isStreamingCall() const { return is_streaming_reader; }
const std::vector<ConnectionProfileInfo> & getConnectionProfileInfos() const { return connection_profile_infos; }

virtual void collectNewThreadCountOfThisLevel(int & cnt) override
void collectNewThreadCountOfThisLevel(int & cnt) override
{
remote_reader->collectNewThreadCount(cnt);
}

virtual void resetNewThreadCountCompute() override
void resetNewThreadCountCompute() override
{
if (collected)
{
Expand All @@ -239,11 +240,24 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

protected:
virtual void readSuffixImpl() override
void readSuffixImpl() override
{
LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows);
remote_reader->close();
}

void appendInfo(FmtBuffer & buffer) const override
{
buffer.append(": schema: {");
buffer.joinStr(
sample_block.begin(),
sample_block.end(),
[](const auto & arg, FmtBuffer & fb) {
fb.fmtAppend("<{}, {}>", arg.name, arg.type->getName());
},
", ");
buffer.append("}");
}
};

using ExchangeReceiverInputStream = TiRemoteBlockInputStream<ExchangeReceiver>;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class DAGContext
explicit DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, initialize_concurrency(concurrency)
, is_mpp_task(false)
, is_mpp_task(true)
, is_root_mpp_task(false)
, tunnel_set(nullptr)
, log(Logger::get(log_identifier))
Expand Down
Loading