diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 0cd14899f524fb..029bea7494ef58 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -99,7 +99,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i if (p._is_top_sink) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout())); + state->fragment_instance_id(), p._buf_size, &_sender, state->execution_timeout(), + state->batch_size())); // create writer _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( p._file_opts.get(), p._storage_type, state->fragment_instance_id(), @@ -175,7 +176,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) // close sender, this is normal path end if (_sender) { _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); - RETURN_IF_ERROR(_sender->close(final_status)); + RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status)); } state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 4fa31f615ceb90..7623dae7fea09a 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -107,7 +107,7 @@ class ResultFileSinkOperatorX final : public DataSinkOperatorXcast()._sender; } else { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender, - state->execution_timeout())); + fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(), + state->batch_size())); } - _sender->set_dependency(_dependency->shared_from_this()); + _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); return Status::OK(); } @@ -122,7 +122,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) { if (state->query_options().enable_parallel_result_sink) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout())); + state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout(), + state->batch_size())); } return Status::OK(); } @@ -139,7 +140,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); } - RETURN_IF_ERROR(local_state._writer->write(*block)); + RETURN_IF_ERROR(local_state._writer->write(state, *block)); if (_fetch_option.use_two_phase_fetch) { // Block structure may be changed by calling _second_phase_fetch_data(). // So we should clear block in case of unmatched columns @@ -185,7 +186,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_writer) { _sender->update_return_rows(_writer->get_written_rows()); } - RETURN_IF_ERROR(_sender->close(final_status)); + RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status)); } state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 0ccb7f4946b9b7..1d2490f486d92f 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -104,7 +104,7 @@ struct ResultFileOptions { } }; -constexpr int RESULT_SINK_BUFFER_SIZE = 4096; +constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8; class ResultSinkLocalState final : public PipelineXSinkLocalState { ENABLE_FACTORY_CREATOR(ResultSinkLocalState); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 51d2c8268e71f7..a8dc13438c1f3b 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -200,8 +200,9 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); + size_t allocated_bytes = new_block.allocated_bytes(); if (_data_queue[channel_id].enqueue(std::move(new_block))) { + local_state._shared_state->add_mem_usage(channel_id, allocated_bytes); local_state._shared_state->set_ready_to_read(channel_id); } @@ -220,25 +221,16 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - if (_free_block_limit == 0 || - _free_blocks.size_approx() < _free_block_limit * _num_sources) { - _free_blocks.enqueue(std::move(next_block)); - } - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); + local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } - local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 8ef23265e3f2c2..a1a83b22840b2b 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -31,7 +31,7 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" -#include "pipeline/exec/result_sink_operator.h" +#include "pipeline/dependency.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/thrift_util.h" @@ -85,13 +85,14 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul delete this; } -BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) +BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size) : _fragment_id(id), _is_close(false), _is_cancelled(false), _buffer_rows(0), _buffer_limit(buffer_size), - _packet_num(0) { + _packet_num(0), + _batch_size(batch_size) { _query_statistics = std::make_unique(); } @@ -103,165 +104,153 @@ Status BufferControlBlock::init() { return Status::OK(); } -Status BufferControlBlock::add_batch(std::unique_ptr& result) { - { - std::unique_lock l(_lock); - - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } - - int num_rows = result->result_batch.rows.size(); - - while ((!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit) && - !_is_cancelled) { - _data_removal.wait_for(l, std::chrono::seconds(1)); - } +Status BufferControlBlock::add_batch(RuntimeState* state, + std::unique_ptr& result) { + std::unique_lock l(_lock); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - if (_waiting_rpc.empty()) { - // Merge result into batch to reduce rpc times - if (!_fe_result_batch_queue.empty() && - ((_fe_result_batch_queue.back()->result_batch.rows.size() + num_rows) < - _buffer_limit) && - !result->eos) { - std::vector& back_rows = - _fe_result_batch_queue.back()->result_batch.rows; - std::vector& result_rows = result->result_batch.rows; - back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), - std::make_move_iterator(result_rows.end())); - } else { - _fe_result_batch_queue.push_back(std::move(result)); - } - _buffer_rows += num_rows; + int num_rows = result->result_batch.rows.size(); + if (_waiting_rpc.empty()) { + // Merge result into batch to reduce rpc times + if (!_fe_result_batch_queue.empty() && + ((_fe_result_batch_queue.back()->result_batch.rows.size() + num_rows) < + _buffer_limit) && + !result->eos) { + std::vector& back_rows = _fe_result_batch_queue.back()->result_batch.rows; + std::vector& result_rows = result->result_batch.rows; + back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), + std::make_move_iterator(result_rows.end())); } else { - auto* ctx = _waiting_rpc.front(); - _waiting_rpc.pop_front(); - ctx->on_data(result, _packet_num); - _packet_num++; + _instance_rows_in_queue.emplace_back(); + _fe_result_batch_queue.push_back(std::move(result)); } + _buffer_rows += num_rows; + _instance_rows[state->fragment_instance_id()] += num_rows; + _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + } else { + auto* ctx = _waiting_rpc.front(); + _waiting_rpc.pop_front(); + ctx->on_data(result, _packet_num); + _packet_num++; } + _update_dependency(); return Status::OK(); } -Status BufferControlBlock::add_arrow_batch(std::shared_ptr& result) { - { - std::unique_lock l(_lock); - - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } +Status BufferControlBlock::add_arrow_batch(RuntimeState* state, + std::shared_ptr& result) { + std::unique_lock l(_lock); - int num_rows = result->num_rows(); + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > _buffer_limit) && - !_is_cancelled) { - _data_removal.wait_for(l, std::chrono::seconds(1)); - } + int num_rows = result->num_rows(); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - // TODO: merge RocordBatch, ToStructArray -> Make again + // TODO: merge RocordBatch, ToStructArray -> Make again - _arrow_flight_batch_queue.push_back(std::move(result)); - _buffer_rows += num_rows; - _data_arrival.notify_one(); - } + _arrow_flight_batch_queue.push_back(std::move(result)); + _buffer_rows += num_rows; + _instance_rows_in_queue.emplace_back(); + _instance_rows[state->fragment_instance_id()] += num_rows; + _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; _update_dependency(); return Status::OK(); } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { - { - std::lock_guard l(_lock); - if (!_status.ok()) { - ctx->on_failure(_status); - _update_dependency(); - return; - } - if (_is_cancelled) { - ctx->on_failure(Status::Cancelled("Cancelled")); - _update_dependency(); - return; - } - if (!_fe_result_batch_queue.empty()) { - // get result - std::unique_ptr result = std::move(_fe_result_batch_queue.front()); - _fe_result_batch_queue.pop_front(); - _buffer_rows -= result->result_batch.rows.size(); - _data_removal.notify_one(); - - ctx->on_data(result, _packet_num); - _packet_num++; - _update_dependency(); - return; - } - if (_is_close) { - ctx->on_close(_packet_num, _query_statistics.get()); - _update_dependency(); - return; + std::lock_guard l(_lock); + if (!_status.ok()) { + ctx->on_failure(_status); + _update_dependency(); + return; + } + if (_is_cancelled) { + ctx->on_failure(Status::Cancelled("Cancelled")); + _update_dependency(); + return; + } + if (!_fe_result_batch_queue.empty()) { + // get result + std::unique_ptr result = std::move(_fe_result_batch_queue.front()); + _fe_result_batch_queue.pop_front(); + _buffer_rows -= result->result_batch.rows.size(); + for (auto it : _instance_rows_in_queue.front()) { + _instance_rows[it.first] -= it.second; } - // no ready data, push ctx to waiting list - _waiting_rpc.push_back(ctx); + _instance_rows_in_queue.pop_front(); + + ctx->on_data(result, _packet_num); + _packet_num++; + _update_dependency(); + return; + } + if (_is_close) { + ctx->on_close(_packet_num, _query_statistics.get()); + _update_dependency(); + return; } + // no ready data, push ctx to waiting list + _waiting_rpc.push_back(ctx); _update_dependency(); } Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result) { - { - std::unique_lock l(_lock); - if (!_status.ok()) { - return _status; - } - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } - - while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { - _data_arrival.wait_for(l, std::chrono::seconds(1)); - } + std::unique_lock l(_lock); + if (!_status.ok()) { + return _status; + } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } + if (_is_cancelled) { + return Status::Cancelled("Cancelled"); + } - if (!_arrow_flight_batch_queue.empty()) { - *result = std::move(_arrow_flight_batch_queue.front()); - _arrow_flight_batch_queue.pop_front(); - _buffer_rows -= (*result)->num_rows(); - _data_removal.notify_one(); - _packet_num++; - _update_dependency(); - return Status::OK(); + if (!_arrow_flight_batch_queue.empty()) { + *result = std::move(_arrow_flight_batch_queue.front()); + _arrow_flight_batch_queue.pop_front(); + _buffer_rows -= (*result)->num_rows(); + for (auto it : _instance_rows_in_queue.front()) { + _instance_rows[it.first] -= it.second; } + _instance_rows_in_queue.pop_front(); + _packet_num++; + _update_dependency(); + return Status::OK(); + } - // normal path end - if (_is_close) { - _update_dependency(); - return Status::OK(); - } + // normal path end + if (_is_close) { + _update_dependency(); + return Status::OK(); } return Status::InternalError("Abnormal Ending"); } -Status BufferControlBlock::close(Status exec_status) { +Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { std::unique_lock l(_lock); - close_cnt++; - if (close_cnt < _result_sink_dependencys.size()) { + auto it = _result_sink_dependencys.find(id); + if (it != _result_sink_dependencys.end()) { + it->second->set_always_ready(); + _result_sink_dependencys.erase(it); + } + if (!_result_sink_dependencys.empty()) { return Status::OK(); } _is_close = true; _status = exec_status; - // notify blocked get thread - _data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { @@ -280,8 +269,6 @@ Status BufferControlBlock::close(Status exec_status) { void BufferControlBlock::cancel() { std::unique_lock l(_lock); _is_cancelled = true; - _data_removal.notify_all(); - _data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } @@ -290,18 +277,25 @@ void BufferControlBlock::cancel() { } void BufferControlBlock::set_dependency( - std::shared_ptr result_sink_dependency) { - _result_sink_dependencys.push_back(result_sink_dependency); + const TUniqueId& id, std::shared_ptr result_sink_dependency) { + std::unique_lock l(_lock); + _result_sink_dependencys[id] = result_sink_dependency; + _update_dependency(); } void BufferControlBlock::_update_dependency() { - if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) { - for (auto dependency : _result_sink_dependencys) { - dependency->set_ready(); + if (_is_cancelled) { + for (auto it : _result_sink_dependencys) { + it.second->set_ready(); } - } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled) { - for (auto dependency : _result_sink_dependencys) { - dependency->block(); + return; + } + + for (auto it : _result_sink_dependencys) { + if (_instance_rows[it.first] > _batch_size) { + it.second->block(); + } else { + it.second->set_ready(); } } } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index c8c240f928a8d2..1296f2c606b328 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -27,15 +27,16 @@ #include #include #include +#include #include "common/status.h" #include "runtime/query_statistics.h" +#include "runtime/runtime_state.h" +#include "util/hash_util.hpp" -namespace google { -namespace protobuf { +namespace google::protobuf { class Closure; -} -} // namespace google +} // namespace google::protobuf namespace arrow { class RecordBatch; @@ -71,19 +72,19 @@ struct GetResultBatchCtx { // buffer used for result customer and producer class BufferControlBlock { public: - BufferControlBlock(const TUniqueId& id, int buffer_size); + BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size); ~BufferControlBlock(); Status init(); - Status add_batch(std::unique_ptr& result); - Status add_arrow_batch(std::shared_ptr& result); + Status add_batch(RuntimeState* state, std::unique_ptr& result); + Status add_arrow_batch(RuntimeState* state, std::shared_ptr& result); void get_batch(GetResultBatchCtx* ctx); Status get_arrow_batch(std::shared_ptr* result); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. - Status close(Status exec_status); + Status close(const TUniqueId& id, Status exec_status); // this is called by RPC, called from coordinator void cancel(); @@ -98,7 +99,8 @@ class BufferControlBlock { } } - void set_dependency(std::shared_ptr result_sink_dependency); + void set_dependency(const TUniqueId& id, + std::shared_ptr result_sink_dependency); protected: void _update_dependency(); @@ -121,18 +123,17 @@ class BufferControlBlock { // protects all subsequent data in this block std::mutex _lock; - // signal arrival of new batch or the eos/cancelled condition - std::condition_variable _data_arrival; - // signal removal of data by stream consumer - std::condition_variable _data_removal; std::deque _waiting_rpc; // only used for FE using return rows to check limit std::unique_ptr _query_statistics; - std::atomic_bool _batch_queue_empty = false; - std::vector> _result_sink_dependencys; - size_t close_cnt = 0; + // instance id to dependency + std::unordered_map> _result_sink_dependencys; + std::unordered_map _instance_rows; + std::list> _instance_rows_in_queue; + + int _batch_size; }; } // namespace doris diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 23f440d1909f90..ccbf0c3ff6729e 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -67,8 +67,8 @@ Status ResultBufferMgr::init() { } Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, - int exec_timout) { + std::shared_ptr* sender, int exec_timout, + int batch_size) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr control_block = nullptr; - control_block = std::make_shared(query_id, buffer_size); + control_block = std::make_shared(query_id, buffer_size, batch_size); { std::unique_lock wlock(_buffer_map_lock); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 30b1b61eb7dbea..8bac69c23ac522 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -58,7 +58,8 @@ class ResultBufferMgr { // the returned sender do not need release // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, int exec_timeout); + std::shared_ptr* sender, int exec_timeout, + int batch_size); // fetch data result to FE void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx); diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index 78082956d0ed66..df1b7a808d9c8a 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -47,7 +47,7 @@ class ResultWriter { [[nodiscard]] bool output_object_data() const { return _output_object_data; } // Write is sync, it will do real IO work. - virtual Status write(vectorized::Block& block) = 0; + virtual Status write(RuntimeState* state, vectorized::Block& block) = 0; void set_output_object_data(bool output_object_data) { _output_object_data = output_object_data; diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index d4d20ea5a489c9..8078467d5cae29 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -432,7 +432,7 @@ Status PointQueryExecutor::_lookup_row_data() { _reusable->get_col_default_values(), _reusable->include_col_uids()); } if (!_reusable->missing_col_uids().empty()) { - if (!_reusable->runtime_state().enable_short_circuit_query_access_column_store()) { + if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) { std::string missing_columns; for (int cid : _reusable->missing_col_uids()) { missing_columns += _tablet->tablet_schema()->column_by_uid(cid).name() + ","; @@ -487,10 +487,10 @@ Status PointQueryExecutor::_lookup_row_data() { } template -Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block, - PTabletKeyLookupResponse* response) { +Status serialize_block(RuntimeState* state, MysqlWriter& mysql_writer, vectorized::Block& block, + PTabletKeyLookupResponse* response) { block.clear_names(); - RETURN_IF_ERROR(mysql_writer.write(block)); + RETURN_IF_ERROR(mysql_writer.write(state, block)); assert(mysql_writer.results().size() == 1); uint8_t* buf = nullptr; uint32_t len = 0; @@ -508,11 +508,13 @@ Status PointQueryExecutor::_output_data() { if (_binary_row_format) { vectorized::VMysqlResultWriter mysql_writer(nullptr, _reusable->output_exprs(), nullptr); - RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); + RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), mysql_writer, + *_result_block, _response)); } else { vectorized::VMysqlResultWriter mysql_writer(nullptr, _reusable->output_exprs(), nullptr); - RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response)); + RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), mysql_writer, + *_result_block, _response)); } VLOG_DEBUG << "dump block " << _result_block->dump_data(); } else { diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 1bed53891c3973..f374e0948062ab 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -98,7 +98,7 @@ class Reusable { const std::unordered_set include_col_uids() const { return _include_col_uids; } - const RuntimeState& runtime_state() const { return *_runtime_state; } + RuntimeState* runtime_state() { return _runtime_state.get(); } private: // caching TupleDescriptor, output_expr, etc... diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index d646cf66f347e5..b23d1668465bbd 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -53,7 +53,7 @@ void VArrowFlightResultWriter::_init_profile() { _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); } -Status VArrowFlightResultWriter::write(Block& input_block) { +Status VArrowFlightResultWriter::write(RuntimeState* state, Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { @@ -80,7 +80,7 @@ Status VArrowFlightResultWriter::write(Block& input_block) { SCOPED_TIMER(_result_send_timer); // If this is a dry run task, no need to send data block if (!_is_dry_run) { - status = _sinker->add_arrow_batch(result); + status = _sinker->add_arrow_batch(state, result); } if (status.ok()) { _written_rows += num_rows; diff --git a/be/src/vec/sink/varrow_flight_result_writer.h b/be/src/vec/sink/varrow_flight_result_writer.h index 862b074cb35569..ab2578421c80bc 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.h +++ b/be/src/vec/sink/varrow_flight_result_writer.h @@ -44,7 +44,7 @@ class VArrowFlightResultWriter final : public ResultWriter { Status init(RuntimeState* state) override; - Status write(Block& block) override; + Status write(RuntimeState* state, Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 804f50f0fc8f06..45941173d4d0c0 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -102,7 +102,7 @@ void VMysqlResultWriter::_init_profile() { } template -Status VMysqlResultWriter::write(Block& input_block) { +Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { @@ -194,7 +194,7 @@ Status VMysqlResultWriter::write(Block& input_block) { // If this is a dry run task, no need to send data block if (!_is_dry_run) { if (_sinker) { - status = _sinker->add_batch(result); + status = _sinker->add_batch(state, result); } else { _results.push_back(std::move(result)); } diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index da3cdcf0690ec8..306d062a6be682 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -47,7 +47,7 @@ class VMysqlResultWriter final : public ResultWriter { Status init(RuntimeState* state) override; - Status write(Block& block) override; + Status write(RuntimeState* state, Block& block) override; Status close(Status status) override; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 814d1b754c43a7..42fd8468e86987 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -140,7 +140,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } auto block = _get_block_from_queue(); - auto status = write(*block); + auto status = write(state, *block); if (!status.ok()) [[unlikely]] { std::unique_lock l(_m); _writer_status.update(status); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 2703330406cbe2..fc8aacdbfa11db 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() { return partition_columns; } -Status VIcebergTableWriter::write(vectorized::Block& block) { +Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); if (block.rows() == 0) { return Status::OK(); diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h index 35e71d1960f400..e2e582e04ad8fd 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h @@ -54,7 +54,7 @@ class VIcebergTableWriter final : public AsyncResultWriter { Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status write(vectorized::Block& block) override; + Status write(RuntimeState* state, vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index 96c4edc82b55dc..c897892cbfc401 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -194,7 +194,7 @@ std::string VFileResultWriter::_file_format_to_name() { } } -Status VFileResultWriter::write(Block& block) { +Status VFileResultWriter::write(RuntimeState* state, Block& block) { if (block.rows() == 0) { return Status::OK(); } @@ -291,7 +291,8 @@ Status VFileResultWriter::_send_result() { attach_infos.insert(std::make_pair("URL", file_url)); result->result_batch.__set_attached_infos(attach_infos); - RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result"); + RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result), + "failed to send outfile result"); return Status::OK(); } diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 44b0695505fecf..42753a5e261cb5 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -60,7 +60,7 @@ class VFileResultWriter final : public AsyncResultWriter { VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - Status write(Block& block) override; + Status write(RuntimeState* state, Block& block) override; Status close(Status exec_status) override; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 0e64060eb0baae..f90c7134ccd1cf 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { return Status::OK(); } -Status VHiveTableWriter::write(vectorized::Block& block) { +Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); if (block.rows() == 0) { diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 4989ba443c7e20..6c8b972f280f48 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -41,13 +41,13 @@ class VHiveTableWriter final : public AsyncResultWriter { public: VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - ~VHiveTableWriter() = default; + ~VHiveTableWriter() override = default; Status init_properties(ObjectPool* pool); Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status write(vectorized::Block& block) override; + Status write(RuntimeState* state, vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp b/be/src/vec/sink/writer/vjdbc_table_writer.cpp index 9493bfbf0723b0..b7c8d1f78dd2ff 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp @@ -60,7 +60,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs) : AsyncResultWriter(output_expr_ctxs), JdbcConnector(create_connect_param(t_sink)) {} -Status VJdbcTableWriter::write(vectorized::Block& block) { +Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h index a683259c992485..b8216c3bcd6ca2 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.h +++ b/be/src/vec/sink/writer/vjdbc_table_writer.h @@ -44,7 +44,7 @@ class VJdbcTableWriter final : public AsyncResultWriter, public JdbcConnector { return init_to_write(profile); } - Status write(vectorized::Block& block) override; + Status write(RuntimeState* state, vectorized::Block& block) override; Status finish(RuntimeState* state) override { return JdbcConnector::finish_trans(); } diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp b/be/src/vec/sink/writer/vmysql_table_writer.cpp index d9ca6d96f99b53..45afe8ce01930f 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.cpp +++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp @@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { return Status::OK(); } -Status VMysqlTableWriter::write(vectorized::Block& block) { +Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h b/be/src/vec/sink/writer/vmysql_table_writer.h index 856d0a21ec5147..072885b176b72f 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.h +++ b/be/src/vec/sink/writer/vmysql_table_writer.h @@ -51,7 +51,7 @@ class VMysqlTableWriter final : public AsyncResultWriter { // connect to mysql server Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status write(vectorized::Block& block) override; + Status write(RuntimeState* state, vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp b/be/src/vec/sink/writer/vodbc_table_writer.cpp index da068c3d677884..c70bdd4ca19a14 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp @@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs) : AsyncResultWriter(output_expr_ctxs), ODBCConnector(create_connect_param(t_sink)) {} -Status VOdbcTableWriter::write(vectorized::Block& block) { +Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h index 687b5106a8babf..fa4dc47b77f1a1 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.h +++ b/be/src/vec/sink/writer/vodbc_table_writer.h @@ -44,7 +44,7 @@ class VOdbcTableWriter final : public AsyncResultWriter, public ODBCConnector { return init_to_write(profile); } - Status write(vectorized::Block& block) override; + Status write(RuntimeState* state, vectorized::Block& block) override; Status finish(RuntimeState* state) override { return ODBCConnector::finish_trans(); } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 2151696714bd7f..b31796fe72405c 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1368,7 +1368,7 @@ Status VTabletWriter::_send_new_partition_batch() { // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(this->write(tmp_block)); + RETURN_IF_ERROR(this->write(_state, tmp_block)); _row_distribution._batching_block->set_mutable_columns( tmp_block.mutate_columns()); // Recovery back _row_distribution._batching_block->clear_column_data(); @@ -1675,7 +1675,7 @@ void VTabletWriter::_generate_index_channels_payloads( } } -Status VTabletWriter::write(doris::vectorized::Block& input_block) { +Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 21d7b1c9f178ce..b9fbc4d0873042 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -544,7 +544,7 @@ class VTabletWriter final : public AsyncResultWriter { public: VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - Status write(Block& block) override; + Status write(RuntimeState* state, Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 3c8dede657fd80..9bd154ce212cac 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -373,7 +373,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, return Status::OK(); } -Status VTabletWriterV2::write(Block& input_block) { +Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -502,7 +502,7 @@ Status VTabletWriterV2::_send_new_partition_batch() { // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(this->write(tmp_block)); + RETURN_IF_ERROR(this->write(_state, tmp_block)); _row_distribution._batching_block->set_mutable_columns( tmp_block.mutate_columns()); // Recovery back _row_distribution._batching_block->clear_column_data(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index ff31e1552dd7d8..363dea54c3b0e9 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -106,7 +106,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { ~VTabletWriterV2() override; - Status write(Block& block) override; + Status write(RuntimeState* state, Block& block) override; Status open(RuntimeState* state, RuntimeProfile* profile) override; diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp index 5ba8af8b81f46d..97e78f05c549a8 100644 --- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp @@ -317,7 +317,7 @@ void serialize_and_deserialize_mysql_test() { // mysql_writer init vectorized::VMysqlResultWriter mysql_writer(nullptr, _output_vexpr_ctxs, nullptr); - Status st = mysql_writer.write(block); + Status st = mysql_writer.write(&runtime_stat, block); EXPECT_TRUE(st.ok()); }