diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1d8178ceec2e86..6713cf7e851af6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -782,7 +782,7 @@ DEFINE_Int32(load_stream_messages_in_batch, "128"); // brpc streaming StreamWait seconds on EAGAIN DEFINE_Int32(load_stream_eagain_wait_seconds, "60"); // max tasks per flush token in load stream -DEFINE_Int32(load_stream_flush_token_max_tasks, "5"); +DEFINE_Int32(load_stream_flush_token_max_tasks, "15"); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 307cd4ef30b161..836d4da147afbb 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -43,6 +43,9 @@ namespace doris { +bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); +bvar::Adder g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); + TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) : _id(id), @@ -130,6 +133,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data butil::IOBuf buf = data->movable(); auto flush_func = [this, new_segid, eos, buf, header]() { signal::set_signal_task_id(_load_id); + g_load_stream_flush_running_threads << -1; auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf); if (eos && st.ok()) { st = _load_stream_writer->close_segment(new_segid); @@ -140,9 +144,15 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data } }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; + MonotonicStopWatch timer; + timer.start(); while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) { - bthread_usleep(10 * 1000); // 10ms + bthread_usleep(2 * 1000); // 2ms } + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; + g_load_stream_flush_wait_ms << time_ms; + g_load_stream_flush_running_threads << 1; return flush_token->submit_func(flush_func); }