Skip to content

Commit e2a9897

Browse files
authored
[refactor](result sink) refactor result writers (apache#48368)
Construct a common buffer which is used to serialize a result block by normal queries / arrow flight queries / point queries.
1 parent 69a6a71 commit e2a9897

26 files changed

+1726
-954
lines changed

be/src/pipeline/exec/result_file_sink_operator.cpp

+5-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
#include "pipeline/exec/exchange_sink_buffer.h"
2424
#include "pipeline/exec/operator.h"
2525
#include "pipeline/exec/result_sink_operator.h"
26-
#include "runtime/buffer_control_block.h"
26+
#include "runtime/result_block_buffer.h"
2727
#include "runtime/result_buffer_mgr.h"
2828
#include "vec/sink/vdata_stream_sender.h"
2929

@@ -75,8 +75,8 @@ Status ResultFileSinkOperatorX::prepare(RuntimeState* state) {
7575
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::prepare(state));
7676
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
7777
if (state->query_options().enable_parallel_outfile) {
78-
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(), _buf_size,
79-
&_sender, state));
78+
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
79+
state->query_id(), _buf_size, &_sender, state, false, nullptr));
8080
}
8181
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
8282
}
@@ -94,7 +94,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
9494
_sender = _parent->cast<ResultFileSinkOperatorX>()._sender;
9595
} else {
9696
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
97-
state->fragment_instance_id(), p._buf_size, &_sender, state));
97+
state->fragment_instance_id(), p._buf_size, &_sender, state, false, nullptr));
9898
}
9999
_sender->set_dependency(state->fragment_instance_id(), _dependency->shared_from_this());
100100

@@ -130,9 +130,8 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
130130
// close sender, this is normal path end
131131
if (_sender) {
132132
int64_t written_rows = _writer == nullptr ? 0 : _writer->get_written_rows();
133-
_sender->update_return_rows(written_rows);
134133
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
135-
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status));
134+
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows));
136135
}
137136
state->exec_env()->result_mgr()->cancel_at_time(
138137
time(nullptr) + config::result_buffer_cancelled_interval_time,

be/src/pipeline/exec/result_file_sink_operator.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ResultFileSinkLocalState final
4444
private:
4545
friend class ResultFileSinkOperatorX;
4646

47-
std::shared_ptr<BufferControlBlock> _sender;
47+
std::shared_ptr<ResultBlockBufferBase> _sender;
4848

4949
std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
5050
int _sender_id;
@@ -86,7 +86,7 @@ class ResultFileSinkOperatorX final : public DataSinkOperatorX<ResultFileSinkLoc
8686
std::string _header_type;
8787

8888
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
89-
std::shared_ptr<BufferControlBlock> _sender = nullptr;
89+
std::shared_ptr<ResultBlockBufferBase> _sender = nullptr;
9090
};
9191

9292
#include "common/compile_check_end.h"

be/src/pipeline/exec/result_sink_operator.cpp

+35-32
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
#include "common/config.h"
2626
#include "exec/rowid_fetcher.h"
2727
#include "pipeline/exec/operator.h"
28-
#include "runtime/buffer_control_block.h"
2928
#include "runtime/exec_env.h"
29+
#include "runtime/result_block_buffer.h"
3030
#include "runtime/result_buffer_mgr.h"
3131
#include "util/arrow/row_batch.h"
3232
#include "vec/exprs/vexpr.h"
@@ -48,24 +48,23 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
4848
auto fragment_instance_id = state->fragment_instance_id();
4949

5050
auto& p = _parent->cast<ResultSinkOperatorX>();
51-
if (state->query_options().enable_parallel_result_sink) {
52-
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
53-
} else {
54-
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
55-
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state));
56-
}
57-
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
58-
5951
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
6052
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
6153
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
6254
}
63-
if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
55+
if (state->query_options().enable_parallel_result_sink) {
56+
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
57+
} else {
6458
std::shared_ptr<arrow::Schema> arrow_schema;
65-
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
66-
state->timezone()));
67-
_sender->register_arrow_schema(arrow_schema);
59+
if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
60+
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
61+
state->timezone()));
62+
}
63+
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
64+
fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state,
65+
p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL, arrow_schema));
6866
}
67+
_sender->set_dependency(fragment_instance_id, _dependency->shared_from_this());
6968
return Status::OK();
7069
}
7170

@@ -79,16 +78,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
7978
case TResultSinkType::MYSQL_PROTOCAL: {
8079
if (state->mysql_row_binary_format()) {
8180
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>(
82-
_sender.get(), _output_vexpr_ctxs, _profile));
81+
_sender, _output_vexpr_ctxs, _profile));
8382
} else {
8483
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<false>(
85-
_sender.get(), _output_vexpr_ctxs, _profile));
84+
_sender, _output_vexpr_ctxs, _profile));
8685
}
8786
break;
8887
}
8988
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
9089
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
91-
_sender.get(), _output_vexpr_ctxs, _profile));
90+
_sender, _output_vexpr_ctxs, _profile));
9291
break;
9392
}
9493
default:
@@ -102,18 +101,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
102101
ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
103102
const std::vector<TExpr>& t_output_expr,
104103
const TResultSink& sink)
105-
: DataSinkOperatorX(operator_id, 0, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {
106-
if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) {
107-
_sink_type = TResultSinkType::MYSQL_PROTOCAL;
108-
} else {
109-
_sink_type = sink.type;
110-
}
111-
if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
112-
_result_sink_buffer_size_rows = config::arrow_flight_result_sink_buffer_size_rows;
113-
} else {
114-
_result_sink_buffer_size_rows = RESULT_SINK_BUFFER_SIZE;
115-
}
116-
_fetch_option = sink.fetch_option;
104+
: DataSinkOperatorX(operator_id, 0, 0),
105+
_sink_type(!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL
106+
? TResultSinkType::MYSQL_PROTOCAL
107+
: sink.type),
108+
_result_sink_buffer_size_rows(_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL
109+
? config::arrow_flight_result_sink_buffer_size_rows
110+
: RESULT_SINK_BUFFER_SIZE),
111+
_row_desc(row_desc),
112+
_t_output_expr(t_output_expr),
113+
_fetch_option(sink.fetch_option) {
117114
_name = "ResultSink";
118115
}
119116

@@ -132,8 +129,14 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
132129
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
133130

134131
if (state->query_options().enable_parallel_result_sink) {
132+
std::shared_ptr<arrow::Schema> arrow_schema;
133+
if (_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
134+
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
135+
state->timezone()));
136+
}
135137
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
136-
state->query_id(), _result_sink_buffer_size_rows, &_sender, state));
138+
state->query_id(), _result_sink_buffer_size_rows, &_sender, state,
139+
_sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL, arrow_schema));
137140
}
138141
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
139142
}
@@ -197,13 +200,13 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) {
197200

198201
// close sender, this is normal path end
199202
if (_sender) {
203+
int64_t written_rows = 0;
200204
if (_writer) {
201-
int64_t written_rows = _writer->get_written_rows();
202-
_sender->update_return_rows(written_rows);
205+
written_rows = _writer->get_written_rows();
203206
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(
204207
written_rows);
205208
}
206-
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status));
209+
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows));
207210
}
208211
state->exec_env()->result_mgr()->cancel_at_time(
209212
time(nullptr) + config::result_buffer_cancelled_interval_time,

be/src/pipeline/exec/result_sink_operator.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
#include <stdint.h>
2222

2323
#include "operator.h"
24-
#include "runtime/buffer_control_block.h"
24+
#include "runtime/result_block_buffer.h"
2525
#include "runtime/result_writer.h"
2626

2727
namespace doris {
2828
#include "common/compile_check_begin.h"
29-
class BufferControlBlock;
29+
class ResultBlockBufferBase;
3030

3131
namespace pipeline {
3232

@@ -135,7 +135,7 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedSta
135135

136136
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
137137

138-
std::shared_ptr<BufferControlBlock> _sender = nullptr;
138+
std::shared_ptr<ResultBlockBufferBase> _sender = nullptr;
139139
std::shared_ptr<ResultWriter> _writer = nullptr;
140140

141141
RuntimeProfile::Counter* _fetch_row_id_timer = nullptr;
@@ -154,8 +154,8 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState>
154154
friend class ResultSinkLocalState;
155155

156156
Status _second_phase_fetch_data(RuntimeState* state, vectorized::Block* final_block);
157-
TResultSinkType::type _sink_type;
158-
int _result_sink_buffer_size_rows;
157+
const TResultSinkType::type _sink_type;
158+
const int _result_sink_buffer_size_rows;
159159
// set file options when sink type is FILE
160160
std::unique_ptr<ResultFileOptions> _file_opts = nullptr;
161161

@@ -167,9 +167,9 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState>
167167
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
168168

169169
// for fetch data by rowids
170-
TFetchOption _fetch_option;
170+
const TFetchOption _fetch_option;
171171

172-
std::shared_ptr<BufferControlBlock> _sender = nullptr;
172+
std::shared_ptr<ResultBlockBufferBase> _sender = nullptr;
173173
};
174174

175175
} // namespace pipeline

0 commit comments

Comments
 (0)