From c54a7469495eda47cfebd4504ebba3e819a962a1 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 12 Sep 2024 11:55:53 +0800 Subject: [PATCH] 2 --- be/src/cloud/cloud_tablet.cpp | 14 ++--- be/src/cloud/cloud_warm_up_manager.cpp | 14 ++--- .../io/cache/block_file_cache_downloader.cpp | 9 ++- be/src/io/cache/cached_remote_file_reader.cpp | 17 +++--- be/src/io/cache/cached_remote_file_reader.h | 2 +- be/src/io/cache/file_block.cpp | 4 +- be/src/io/cache/file_block.h | 2 +- be/src/io/cache/file_cache_storage.h | 3 +- be/src/io/cache/fs_file_cache_storage.cpp | 5 +- be/src/io/cache/fs_file_cache_storage.h | 3 +- be/src/io/fs/broker_file_reader.cpp | 6 +- be/src/io/fs/buffered_reader.cpp | 47 +++++++++------- be/src/io/fs/buffered_reader.h | 23 ++++---- be/src/io/fs/hdfs_file_reader.cpp | 5 +- be/src/io/fs/local_file_reader.cpp | 5 +- be/src/io/fs/s3_file_reader.cpp | 5 +- be/src/io/io_common.h | 56 +++++++++++++++++++ be/src/olap/base_tablet.cpp | 4 +- .../segment_v2/indexed_column_reader.cpp | 4 +- .../rowset/segment_v2/ordinal_page_index.cpp | 4 +- be/src/olap/rowset/segment_v2/page_io.cpp | 9 +++ be/src/olap/rowset/segment_v2/segment.cpp | 11 +++- be/src/pipeline/exec/hashjoin_build_sink.h | 5 +- 23 files changed, 176 insertions(+), 81 deletions(-) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 7433b781c65d99f..9921dd2244756ae 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -233,28 +233,26 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ ? 0 : rowset_meta->newest_write_timestamp() + _tablet_meta->ttl_seconds(); + io::IOContext io_ctx; + io_ctx.expiration_time = expiration_time; _engine.file_cache_block_downloader().submit_download_task( io::DownloadFileMeta { .path = storage_resource.value()->remote_segment_path( *rowset_meta, seg_id), .file_size = rs->rowset_meta()->segment_file_size(seg_id), .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx, .download_done {}, }); + io::IOContext io_ctx2; + io_ctx2.expiration_time = expiration_time; auto download_idx_file = [&](const io::Path& idx_path) { io::DownloadFileMeta meta { .path = idx_path, .file_size = -1, .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx2, .download_done {}, }; _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 07beeaeb078a464..bd92dd29f480a05 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -106,16 +106,15 @@ void CloudWarmUpManager::handle_jobs() { } wait->add_count(); + io::IOContext io_ctx; + io_ctx.expiration_time = expiration_time; _engine.file_cache_block_downloader().submit_download_task( io::DownloadFileMeta { .path = storage_resource.value()->remote_segment_path(*rs, seg_id), .file_size = rs->segment_file_size(seg_id), .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx, .download_done = [wait](Status st) { if (!st) { @@ -125,15 +124,14 @@ void CloudWarmUpManager::handle_jobs() { }, }); + io::IOContext io_ctx2; + io_ctx2.expiration_time = expiration_time; auto download_idx_file = [&](const io::Path& idx_path) { io::DownloadFileMeta meta { .path = idx_path, .file_size = -1, .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx2, .download_done = [wait](Status st) { if (!st) { diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 026f7e2a01741da..90909d8965ccb6e 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -178,6 +178,9 @@ void FileCacheBlockDownloader::download_file_cache_block( } }; + IOContext io_ctx; + io_ctx.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX; + io_ctx.expiration_time = meta.expiration_time(); DownloadFileMeta download_meta { .path = storage_resource.value()->remote_segment_path(*find_it->second, meta.segment_id()), @@ -186,11 +189,7 @@ void FileCacheBlockDownloader::download_file_cache_block( .offset = meta.offset(), .download_size = meta.size(), .file_system = storage_resource.value()->fs, - .ctx = - { - .is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX, - .expiration_time = meta.expiration_time(), - }, + .ctx = io_ctx, .download_done = std::move(download_done), }; download_segment_file(download_meta); diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index a0618a8bec897bd..b9a16800d8d93ce 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -126,8 +126,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* // If cache is not enabled, just call inner file reader to read data if (_cache_type == FileCachePolicy::NO_CACHE) { Status st = _remote_file_reader->read_at(offset, result, bytes_read, io_ctx); - stats.hit_cache = false; - stats.bytes_read += *bytes_read; + // stats.hit_cache = false; + // stats.bytes_read += *bytes_read; return st; } @@ -143,7 +143,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* *bytes_read = 0; return Status::OK(); } - stats.bytes_read += bytes_req; + // stats.bytes_read += bytes_req; if (config::enable_read_cache_file_directly) { // read directly size_t need_read_size = bytes_req; @@ -167,7 +167,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* SCOPED_RAW_TIMER(&stats.local_read_timer); if (!iter->second ->read(Slice(result.data + (cur_offset - offset), reserve_bytes), - file_offset) + file_offset, io_ctx) .ok()) { break; } @@ -301,7 +301,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* size_t file_offset = current_offset - left; SCOPED_RAW_TIMER(&stats.local_read_timer); st = block->read(Slice(result.data + (current_offset - offset), read_size), - file_offset); + file_offset, io_ctx); } if (!st || block_state != FileBlock::State::DOWNLOADED) { size_t bytes_read {0}; @@ -310,7 +310,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* SCOPED_RAW_TIMER(&stats.remote_read_timer); RETURN_IF_ERROR(_remote_file_reader->read_at( current_offset, Slice(result.data + (current_offset - offset), read_size), - &bytes_read)); + &bytes_read, io_ctx)); DCHECK(bytes_read == read_size); } } @@ -328,10 +328,10 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, } if (read_stats.hit_cache) { statis->num_local_io_total++; - statis->bytes_read_from_local += read_stats.bytes_read; + // statis->bytes_read_from_local += read_stats.bytes_read; } else { statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; + // statis->bytes_read_from_remote += read_stats.bytes_read; } statis->remote_io_timer += read_stats.remote_read_timer; statis->local_io_timer += read_stats.local_read_timer; @@ -340,7 +340,6 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, statis->write_cache_io_timer += read_stats.local_write_timer; g_skip_cache_num << read_stats.skip_cache; - g_skip_cache_sum << read_stats.skip_cache; } } // namespace doris::io diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 979530cf81ea084..772a490153a49bd 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -71,7 +71,7 @@ class CachedRemoteFileReader final : public FileReader { struct ReadStatistics { bool hit_cache = true; bool skip_cache = false; - int64_t bytes_read = 0; + // int64_t bytes_read = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; int64_t local_read_timer = 0; diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 6586dcf589bddee..35ec5d234415a15 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -157,8 +157,8 @@ Status FileBlock::finalize() { return st; } -Status FileBlock::read(Slice buffer, size_t read_offset) { - return _mgr->_storage->read(_key, read_offset, buffer); +Status FileBlock::read(Slice buffer, size_t read_offset, const IOContext* io_ctx) { + return _mgr->_storage->read(_key, read_offset, buffer, io_ctx); } Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) { diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index b4044786dc73df0..ee5d4217edf878e 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -95,7 +95,7 @@ class FileBlock { [[nodiscard]] Status append(Slice data); // read data from cache file - [[nodiscard]] Status read(Slice buffer, size_t read_offset); + [[nodiscard]] Status read(Slice buffer, size_t read_offset, const IOContext* io_ctx); // finish write, release the file writer [[nodiscard]] Status finalize(); diff --git a/be/src/io/cache/file_cache_storage.h b/be/src/io/cache/file_cache_storage.h index 64639356f148bfd..dd49a96c517bcee 100644 --- a/be/src/io/cache/file_cache_storage.h +++ b/be/src/io/cache/file_cache_storage.h @@ -36,7 +36,8 @@ class FileCacheStorage { // finalize the block virtual Status finalize(const FileCacheKey& key) = 0; // read the block - virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result) = 0; + virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result, + const IOContext* io_ctx) = 0; // remove the block virtual Status remove(const FileCacheKey& key) = 0; // change the block meta diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 34e62d6fe6f6727..9a2a43530a0566e 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -152,7 +152,8 @@ Status FSFileCacheStorage::finalize(const FileCacheKey& key) { return fs->rename(file_writer->path(), true_file); } -Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer) { +Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer, + const IOContext* io_ctx) { AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset); FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key); if (!file_reader) { @@ -163,7 +164,7 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl FDCache::instance()->insert_file_reader(fd_key, file_reader); } size_t bytes_read = 0; - RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read)); + RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read, io_ctx)); DCHECK(bytes_read == buffer.get_size()); return Status::OK(); } diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index d3299c6af0e99fe..d2590c2802a96fa 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -63,7 +63,8 @@ class FSFileCacheStorage : public FileCacheStorage { Status init(BlockFileCache* _mgr) override; Status append(const FileCacheKey& key, const Slice& value) override; Status finalize(const FileCacheKey& key) override; - Status read(const FileCacheKey& key, size_t value_offset, Slice buffer) override; + Status read(const FileCacheKey& key, size_t value_offset, Slice buffer, + const IOContext* io_ctx) override; Status remove(const FileCacheKey& key) override; Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) override; void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key, diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 102ea3e247778ac..ef75404f7bfa582 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -32,6 +32,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/broker_file_system.h" +#include "io/io_common.h" #include "util/doris_metrics.h" namespace doris::io { @@ -92,7 +93,7 @@ Status BrokerFileReader::close() { } Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: ", _path.native()); } @@ -145,6 +146,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes *bytes_read = response.data.size(); memcpy(to, response.data.data(), *bytes_read); + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 43445ed42efd3b9..15d8f45349b0fed 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -391,7 +391,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz // there exists occasions where the buffer is already closed but // some prior tasks are still queued in thread pool, so we have to check whether // the buffer is closed each time the condition variable is notified. -void PrefetchBuffer::reset_offset(size_t offset) { +void PrefetchBuffer::reset_offset(size_t offset, const IOContext* io_ctx) { { std::unique_lock lck {_lock}; if (!_prefetched.wait_for( @@ -415,6 +415,16 @@ void PrefetchBuffer::reset_offset(size_t offset) { } else { _exceed = false; } + // First update and reset file cache stats + if (io_ctx) { + if (io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->update(_owned_cache_stats); + _owned_cache_stats.reset(); + } + _owned_io_ctx = *io_ctx; + _owned_io_ctx.file_cache_stats = &_owned_cache_stats; + } + _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } @@ -455,7 +465,8 @@ void PrefetchBuffer::prefetch_buffer() { { SCOPED_RAW_TIMER(&_statis.read_time); - s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); + // Use owned io_ctx here + s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, &_owned_io_ctx); } if (UNLIKELY(s.ok() && buf_size != _len)) { // This indicates that the data size returned by S3 object storage is smaller than what we requested, @@ -542,15 +553,14 @@ size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const { return _size - remaining; } -Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, - size_t* bytes_read) { +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, size_t* bytes_read, + const IOContext* io_ctx) { if (UNLIKELY(off >= _file_range.end_offset)) { - // Reader can read out of [start_offset, end_offset) by synchronous method. - return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx); + return _reader->read_at(off, Slice {out, buf_len}, bytes_read, io_ctx); } if (_exceed) { - reset_offset((off / _size) * _size); - return read_buffer(off, out, buf_len, bytes_read); + reset_offset((off / _size) * _size, io_ctx); + return read_buffer(off, out, buf_len, bytes_read, io_ctx); } auto start = std::chrono::steady_clock::now(); // The baseline time is calculated by dividing the size of each buffer by MB/s. @@ -581,8 +591,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, // there is only parquet would do not sequence read // it would read the end of the file first if (UNLIKELY(!contains(off))) { - reset_offset((off / _size) * _size); - return read_buffer(off, out, buf_len, bytes_read); + reset_offset((off / _size) * _size, io_ctx); + return read_buffer(off, out, buf_len, bytes_read, io_ctx); } if (UNLIKELY(0 == _len || _offset + _len < off)) { return Status::OK(); @@ -601,7 +611,7 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, _statis.request_bytes += read_len; } if (off + *bytes_read == _offset + _len) { - reset_offset(_offset + _whole_buffer_size); + reset_offset(_offset + _whole_buffer_size, io_ctx); } return Status::OK(); } @@ -627,9 +637,8 @@ void PrefetchBuffer::_collect_profile_before_close() { // buffered reader PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, - PrefetchRange file_range, const IOContext* io_ctx, - int64_t buffer_size) - : _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx) { + PrefetchRange file_range, int64_t buffer_size) + : _reader(std::move(reader)), _file_range(file_range) { if (buffer_size == -1L) { buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; } @@ -664,7 +673,7 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File // to make sure the buffer reader will start to read at right position. for (int i = 0; i < buffer_num; i++) { _pre_buffers.emplace_back(std::make_shared( - _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), _io_ctx, + _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), sync_buffer)); } } @@ -689,9 +698,9 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* while (actual_bytes_read < nbytes && offset < size()) { size_t read_num = 0; auto buffer_pos = get_buffer_pos(offset); - RETURN_IF_ERROR( - _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read, - nbytes - actual_bytes_read, &read_num)); + RETURN_IF_ERROR(_pre_buffers[buffer_pos]->read_buffer( + offset, result.get_data() + actual_bytes_read, nbytes - actual_bytes_read, + &read_num, io_ctx)); actual_bytes_read += read_num; offset += read_num; } @@ -855,7 +864,7 @@ Result DelegateReader::create_file_reader( if (is_thread_safe) { // PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently. return std::make_shared( - profile, std::move(reader), file_range, io_ctx); + profile, std::move(reader), file_range); } } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 7afb8cfec22de86..100f664101eeb72 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -278,13 +278,11 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size, - io::FileReader* reader, const IOContext* io_ctx, - std::function sync_profile) + io::FileReader* reader, std::function sync_profile) : _file_range(file_range), _size(buffer_size), _whole_buffer_size(whole_buffer_size), _reader(reader), - _io_ctx(io_ctx), _buf(new char[buffer_size]), _sync_profile(std::move(sync_profile)) {} @@ -295,7 +293,6 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro _size(other._size), _whole_buffer_size(other._whole_buffer_size), _reader(other._reader), - _io_ctx(other._io_ctx), _buf(std::move(other._buf)), _sync_profile(std::move(other._sync_profile)) {} @@ -311,7 +308,12 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t _len {0}; size_t _whole_buffer_size; io::FileReader* _reader = nullptr; - const IOContext* _io_ctx = nullptr; + // PrefetchBuffer is running in separate thread. + // MUST use self owned FileCacheStatistics to avoid stack-use-after-scope error. + // And after reading finish, the caller should update the parent's + // FileCacheStatistics by using stats from this one. + FileCacheStatistics _owned_cache_stats; + IOContext _owned_io_ctx; std::unique_ptr _buf; BufferStatus _buffer_status {BufferStatus::RESET}; std::mutex _lock; @@ -331,7 +333,7 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro // @brief: reset the start offset of this buffer to offset // @param: the new start offset for this buffer - void reset_offset(size_t offset); + void reset_offset(size_t offset, const IOContext* io_ctx); // @brief: start to fetch the content between [_offset, _offset + _size) void prefetch_buffer(); // @brief: used by BufferedReader to read the prefetched data @@ -339,7 +341,8 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro // @param[buf] buffer to put the actual content // @param[buf_len] maximum len trying to read // @param[bytes_read] actual bytes read - Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read); + Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read, + const IOContext* io_ctx); // @brief: shut down the buffer until the prior prefetching task is done void close(); // @brief: to detect whether this buffer contains off @@ -378,8 +381,7 @@ constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB class PrefetchBufferedReader final : public io::FileReader { public: PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, - PrefetchRange file_range, const IOContext* io_ctx = nullptr, - int64_t buffer_size = -1L); + PrefetchRange file_range, int64_t buffer_size = -1L); ~PrefetchBufferedReader() override; Status close() override; @@ -416,14 +418,13 @@ class PrefetchBufferedReader final : public io::FileReader { int64_t cur_pos = position + i * s_max_pre_buffer_size; int cur_buf_pos = get_buffer_pos(cur_pos); // reset would do all the prefetch work - _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos)); + _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos), nullptr); } } io::FileReaderSPtr _reader; PrefetchRange _file_range; const std::vector* _random_access_ranges = nullptr; - const IOContext* _io_ctx = nullptr; std::vector> _pre_buffers; int64_t _whole_pre_buffer_size; bool _initialized = false; diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index d43cfae1c28228a..831db6bec1f1d78 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -116,7 +116,7 @@ Status HdfsFileReader::close() { #ifdef USE_HADOOP_HDFS Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: {}", _path.native()); } @@ -163,6 +163,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r *bytes_read = has_read; hdfs_bytes_read_total << *bytes_read; hdfs_bytes_per_read << *bytes_read; + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index b4f144a633048e4..93ab4f6f638b188 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -117,7 +117,7 @@ Status LocalFileReader::close() { } Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_impl", Status::IOError("inject io error")); if (closed()) [[unlikely]] { @@ -152,6 +152,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_ *bytes_read += res; } } + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read; + } DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); return Status::OK(); } diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 86590d91632162a..7d9860d3b69d5e2 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -103,7 +103,7 @@ Status S3FileReader::close() { } Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); if (offset > _file_size) { return Status::InternalError( @@ -168,6 +168,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", _path.native(), retry_count, total_sleep_time); } + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } return Status::InternalError("failed to read from s3, exceeded maximum retries"); diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 80a594473dc376a..9f4788f22fa7801 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -45,6 +45,31 @@ struct FileCacheStatistics { int64_t write_cache_io_timer = 0; int64_t bytes_write_into_cache = 0; int64_t num_skip_cache_io_total = 0; + + void update(const FileCacheStatistics& other) { + num_local_io_total += num_local_io_total; + num_remote_io_total += num_remote_io_total; + local_io_timer += local_io_timer; + bytes_read_from_local += bytes_read_from_local; + bytes_read_from_remote += bytes_read_from_remote; + remote_io_timer += remote_io_timer; + write_cache_io_timer += write_cache_io_timer; + write_cache_io_timer += write_cache_io_timer; + bytes_write_into_cache += bytes_write_into_cache; + num_skip_cache_io_total += num_skip_cache_io_total; + } + + void reset() { + num_local_io_total = 0; + num_remote_io_total = 0; + local_io_timer = 0; + bytes_read_from_local = 0; + bytes_read_from_remote = 0; + remote_io_timer = 0; + write_cache_io_timer = 0; + bytes_write_into_cache = 0; + num_skip_cache_io_total = 0; + } }; struct IOContext { @@ -60,6 +85,37 @@ struct IOContext { int64_t expiration_time = 0; const TUniqueId* query_id = nullptr; // Ref FileCacheStatistics* file_cache_stats = nullptr; // Ref + + IOContext() = default; + + IOContext(const IOContext& other) { + reader_type = other.reader_type; + is_disposable = other.is_disposable; + is_index_data = other.is_index_data; + read_file_cache = other.read_file_cache; + is_persistent = other.is_persistent; + should_stop = other.should_stop; + expiration_time = other.expiration_time; + query_id = other.query_id; + file_cache_stats = other.file_cache_stats; + } + + IOContext& operator=(const IOContext& other) { + if (this == &other) { + return *this; + } + + reader_type = other.reader_type; + is_disposable = other.is_disposable; + is_index_data = other.is_index_data; + read_file_cache = other.read_file_cache; + is_persistent = other.is_persistent; + should_stop = other.should_stop; + expiration_time = other.expiration_time; + query_id = other.query_id; + file_cache_stats = other.file_cache_stats; + return *this; + } }; } // namespace io diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 1fd3b785b9072f0..4581440a79a8439 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -75,11 +75,13 @@ Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t } segment_v2::SegmentSharedPtr segment = *it; RETURN_IF_ERROR(segment->new_column_iterator(target_column, column_iterator, nullptr)); + io::IOContext ctx; + ctx.reader_type = ReaderType::READER_QUERY; segment_v2::ColumnIteratorOptions opt { .use_page_cache = !config::disable_storage_page_cache, .file_reader = segment->file_reader().get(), .stats = stats, - .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY}, + .io_ctx = ctx, }; RETURN_IF_ERROR((*column_iterator)->init(opt)); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 59251b5595dd074..89315835bed92de 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -115,6 +115,8 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, PageFooterPB* footer, PageTypePB type, BlockCompressionCodec* codec, bool pre_decode) const { OlapReaderStatistics tmp_stats; + io::IOContext ctx; + ctx.is_index_data = true; PageReadOptions opts { .use_page_cache = _use_page_cache, .kept_in_memory = _kept_in_memory, @@ -125,7 +127,7 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, .codec = codec, .stats = &tmp_stats, .encoding_info = _encoding_info, - .io_ctx = io::IOContext {.is_index_data = true}, + .io_ctx = ctx, }; if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 24b2e3379963bc2..dde191cccb22ddf 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -90,6 +90,8 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, } // need to read index page OlapReaderStatistics tmp_stats; + io::IOContext ctx; + ctx.is_index_data = true; PageReadOptions opts { .use_page_cache = use_page_cache, .kept_in_memory = kept_in_memory, @@ -99,7 +101,7 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, // ordinal index page uses NO_COMPRESSION right now .codec = nullptr, .stats = &tmp_stats, - .io_ctx = io::IOContext {.is_index_data = true}, + .io_ctx = ctx, }; // read index page diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 07d5656ee8a44b1..247dcf57d091ff4 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -40,6 +40,7 @@ #include "util/crc32c.h" #include "util/faststring.h" #include "util/runtime_profile.h" +#include "util/stack_util.h" namespace doris { namespace segment_v2 { @@ -154,6 +155,14 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle &opts.io_ctx)); DCHECK_EQ(bytes_read, page_size); opts.stats->compressed_bytes_read += page_size; + LOG(INFO) << "yy debug read_and_decompress_page_: " + << (opts.io_ctx.file_cache_stats == nullptr) << ", bytes_read: " << bytes_read + << ", compressed_bytes_read: " << opts.stats->compressed_bytes_read + << get_stack_trace(); + if (opts.io_ctx.file_cache_stats != nullptr) { + LOG(INFO) << "yy debug: " << opts.io_ctx.file_cache_stats->bytes_read_from_local << ", " + << opts.io_ctx.file_cache_stats->bytes_read_from_remote; + } } if (opts.verify_checksum) { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 0354cd4e3e7b5ae..fe393d093d0c84f 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -306,7 +306,8 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) { uint8_t fixed_buf[12]; size_t bytes_read = 0; // TODO(plat1ko): Support session variable `enable_file_cache` - io::IOContext io_ctx {.is_index_data = true}; + io::IOContext io_ctx; + io_ctx.is_index_data = true; RETURN_IF_ERROR( _file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx)); DCHECK_EQ(bytes_read, 12); @@ -411,6 +412,8 @@ Status Segment::_load_index_impl() { } else { // read and parse short key index page OlapReaderStatistics tmp_stats; + io::IOContext ctx; + ctx.is_index_data = true; PageReadOptions opts { .use_page_cache = true, .type = INDEX_PAGE, @@ -419,7 +422,7 @@ Status Segment::_load_index_impl() { // short key index page uses NO_COMPRESSION for now .codec = nullptr, .stats = &tmp_stats, - .io_ctx = io::IOContext {.is_index_data = true}, + .io_ctx = ctx, }; Slice body; PageFooterPB footer; @@ -980,11 +983,13 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto std::unique_ptr& iterator_hint) { StorageReadOptions storage_read_opt; storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY; + io::IOContext ctx; + ctx.reader_type = ReaderType::READER_QUERY; segment_v2::ColumnIteratorOptions opt { .use_page_cache = !config::disable_storage_page_cache, .file_reader = file_reader().get(), .stats = &stats, - .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY}, + .io_ctx = ctx, }; std::vector single_row_loc {row_id}; if (!slot->column_paths().empty()) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index b7ae612510fcb46..cf677833fb5b64e 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -132,9 +132,8 @@ class HashJoinBuildSinkOperatorX final if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { - return _child->ignore_data_distribution() - ? DataDistribution(ExchangeType::PASS_TO_ONE) - : DataDistribution(ExchangeType::NOOP); + return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::PASS_TO_ONE) + : DataDistribution(ExchangeType::NOOP); } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE