Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Sep 12, 2024
1 parent ff92417 commit c54a746
Show file tree
Hide file tree
Showing 23 changed files with 176 additions and 81 deletions.
14 changes: 6 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,28 +233,26 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
io::IOContext io_ctx;
io_ctx.expiration_time = expiration_time;
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(
*rowset_meta, seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx,
.download_done {},
});

io::IOContext io_ctx2;
io_ctx2.expiration_time = expiration_time;
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx2,
.download_done {},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
Expand Down
14 changes: 6 additions & 8 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,15 @@ void CloudWarmUpManager::handle_jobs() {
}

wait->add_count();
io::IOContext io_ctx;
io_ctx.expiration_time = expiration_time;
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rs,
seg_id),
.file_size = rs->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx,
.download_done =
[wait](Status st) {
if (!st) {
Expand All @@ -125,15 +124,14 @@ void CloudWarmUpManager::handle_jobs() {
},
});

io::IOContext io_ctx2;
io_ctx2.expiration_time = expiration_time;
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx2,
.download_done =
[wait](Status st) {
if (!st) {
Expand Down
9 changes: 4 additions & 5 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ void FileCacheBlockDownloader::download_file_cache_block(
}
};

IOContext io_ctx;
io_ctx.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX;
io_ctx.expiration_time = meta.expiration_time();
DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(*find_it->second,
meta.segment_id()),
Expand All @@ -186,11 +189,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.offset = meta.offset(),
.download_size = meta.size(),
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
},
.ctx = io_ctx,
.download_done = std::move(download_done),
};
download_segment_file(download_meta);
Expand Down
17 changes: 8 additions & 9 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
// If cache is not enabled, just call inner file reader to read data
if (_cache_type == FileCachePolicy::NO_CACHE) {
Status st = _remote_file_reader->read_at(offset, result, bytes_read, io_ctx);
stats.hit_cache = false;
stats.bytes_read += *bytes_read;
// stats.hit_cache = false;
// stats.bytes_read += *bytes_read;
return st;
}

Expand All @@ -143,7 +143,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
*bytes_read = 0;
return Status::OK();
}
stats.bytes_read += bytes_req;
// stats.bytes_read += bytes_req;
if (config::enable_read_cache_file_directly) {
// read directly
size_t need_read_size = bytes_req;
Expand All @@ -167,7 +167,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
SCOPED_RAW_TIMER(&stats.local_read_timer);
if (!iter->second
->read(Slice(result.data + (cur_offset - offset), reserve_bytes),
file_offset)
file_offset, io_ctx)
.ok()) {
break;
}
Expand Down Expand Up @@ -301,7 +301,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
size_t file_offset = current_offset - left;
SCOPED_RAW_TIMER(&stats.local_read_timer);
st = block->read(Slice(result.data + (current_offset - offset), read_size),
file_offset);
file_offset, io_ctx);
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
size_t bytes_read {0};
Expand All @@ -310,7 +310,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
SCOPED_RAW_TIMER(&stats.remote_read_timer);
RETURN_IF_ERROR(_remote_file_reader->read_at(
current_offset, Slice(result.data + (current_offset - offset), read_size),
&bytes_read));
&bytes_read, io_ctx));
DCHECK(bytes_read == read_size);
}
}
Expand All @@ -328,10 +328,10 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
// statis->bytes_read_from_local += read_stats.bytes_read;
} else {
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
// statis->bytes_read_from_remote += read_stats.bytes_read;
}
statis->remote_io_timer += read_stats.remote_read_timer;
statis->local_io_timer += read_stats.local_read_timer;
Expand All @@ -340,7 +340,6 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
statis->write_cache_io_timer += read_stats.local_write_timer;

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
}

} // namespace doris::io
2 changes: 1 addition & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class CachedRemoteFileReader final : public FileReader {
struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
int64_t bytes_read = 0;
// int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ Status FileBlock::finalize() {
return st;
}

Status FileBlock::read(Slice buffer, size_t read_offset) {
return _mgr->_storage->read(_key, read_offset, buffer);
Status FileBlock::read(Slice buffer, size_t read_offset, const IOContext* io_ctx) {
return _mgr->_storage->read(_key, read_offset, buffer, io_ctx);
}

Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class FileBlock {
[[nodiscard]] Status append(Slice data);

// read data from cache file
[[nodiscard]] Status read(Slice buffer, size_t read_offset);
[[nodiscard]] Status read(Slice buffer, size_t read_offset, const IOContext* io_ctx);

// finish write, release the file writer
[[nodiscard]] Status finalize();
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class FileCacheStorage {
// finalize the block
virtual Status finalize(const FileCacheKey& key) = 0;
// read the block
virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result) = 0;
virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result,
const IOContext* io_ctx) = 0;
// remove the block
virtual Status remove(const FileCacheKey& key) = 0;
// change the block meta
Expand Down
5 changes: 3 additions & 2 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
return fs->rename(file_writer->path(), true_file);
}

Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer) {
Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer,
const IOContext* io_ctx) {
AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
if (!file_reader) {
Expand All @@ -163,7 +164,7 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl
FDCache::instance()->insert_file_reader(fd_key, file_reader);
}
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read, io_ctx));
DCHECK(bytes_read == buffer.get_size());
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class FSFileCacheStorage : public FileCacheStorage {
Status init(BlockFileCache* _mgr) override;
Status append(const FileCacheKey& key, const Slice& value) override;
Status finalize(const FileCacheKey& key) override;
Status read(const FileCacheKey& key, size_t value_offset, Slice buffer) override;
Status read(const FileCacheKey& key, size_t value_offset, Slice buffer,
const IOContext* io_ctx) override;
Status remove(const FileCacheKey& key) override;
Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta) override;
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
Expand Down
6 changes: 5 additions & 1 deletion be/src/io/fs/broker_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/broker_file_system.h"
#include "io/io_common.h"
#include "util/doris_metrics.h"

namespace doris::io {
Expand Down Expand Up @@ -92,7 +93,7 @@ Status BrokerFileReader::close() {
}

Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
if (closed()) [[unlikely]] {
return Status::InternalError("read closed file: ", _path.native());
}
Expand Down Expand Up @@ -145,6 +146,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes

*bytes_read = response.data.size();
memcpy(to, response.data.data(), *bytes_read);
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}

Expand Down
47 changes: 28 additions & 19 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
// there exists occasions where the buffer is already closed but
// some prior tasks are still queued in thread pool, so we have to check whether
// the buffer is closed each time the condition variable is notified.
void PrefetchBuffer::reset_offset(size_t offset) {
void PrefetchBuffer::reset_offset(size_t offset, const IOContext* io_ctx) {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(
Expand All @@ -415,6 +415,16 @@ void PrefetchBuffer::reset_offset(size_t offset) {
} else {
_exceed = false;
}
// First update and reset file cache stats
if (io_ctx) {
if (io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->update(_owned_cache_stats);
_owned_cache_stats.reset();
}
_owned_io_ctx = *io_ctx;
_owned_io_ctx.file_cache_stats = &_owned_cache_stats;
}

_prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
[buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
}
Expand Down Expand Up @@ -455,7 +465,8 @@ void PrefetchBuffer::prefetch_buffer() {

{
SCOPED_RAW_TIMER(&_statis.read_time);
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx);
// Use owned io_ctx here
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, &_owned_io_ctx);
}
if (UNLIKELY(s.ok() && buf_size != _len)) {
// This indicates that the data size returned by S3 object storage is smaller than what we requested,
Expand Down Expand Up @@ -542,15 +553,14 @@ size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const {
return _size - remaining;
}

Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
size_t* bytes_read) {
Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, size_t* bytes_read,
const IOContext* io_ctx) {
if (UNLIKELY(off >= _file_range.end_offset)) {
// Reader can read out of [start_offset, end_offset) by synchronous method.
return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx);
return _reader->read_at(off, Slice {out, buf_len}, bytes_read, io_ctx);
}
if (_exceed) {
reset_offset((off / _size) * _size);
return read_buffer(off, out, buf_len, bytes_read);
reset_offset((off / _size) * _size, io_ctx);
return read_buffer(off, out, buf_len, bytes_read, io_ctx);
}
auto start = std::chrono::steady_clock::now();
// The baseline time is calculated by dividing the size of each buffer by MB/s.
Expand Down Expand Up @@ -581,8 +591,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
// there is only parquet would do not sequence read
// it would read the end of the file first
if (UNLIKELY(!contains(off))) {
reset_offset((off / _size) * _size);
return read_buffer(off, out, buf_len, bytes_read);
reset_offset((off / _size) * _size, io_ctx);
return read_buffer(off, out, buf_len, bytes_read, io_ctx);
}
if (UNLIKELY(0 == _len || _offset + _len < off)) {
return Status::OK();
Expand All @@ -601,7 +611,7 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
_statis.request_bytes += read_len;
}
if (off + *bytes_read == _offset + _len) {
reset_offset(_offset + _whole_buffer_size);
reset_offset(_offset + _whole_buffer_size, io_ctx);
}
return Status::OK();
}
Expand All @@ -627,9 +637,8 @@ void PrefetchBuffer::_collect_profile_before_close() {

// buffered reader
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
PrefetchRange file_range, const IOContext* io_ctx,
int64_t buffer_size)
: _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx) {
PrefetchRange file_range, int64_t buffer_size)
: _reader(std::move(reader)), _file_range(file_range) {
if (buffer_size == -1L) {
buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
}
Expand Down Expand Up @@ -664,7 +673,7 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
// to make sure the buffer reader will start to read at right position.
for (int i = 0; i < buffer_num; i++) {
_pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
_file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), _io_ctx,
_file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(),
sync_buffer));
}
}
Expand All @@ -689,9 +698,9 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t*
while (actual_bytes_read < nbytes && offset < size()) {
size_t read_num = 0;
auto buffer_pos = get_buffer_pos(offset);
RETURN_IF_ERROR(
_pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read,
nbytes - actual_bytes_read, &read_num));
RETURN_IF_ERROR(_pre_buffers[buffer_pos]->read_buffer(
offset, result.get_data() + actual_bytes_read, nbytes - actual_bytes_read,
&read_num, io_ctx));
actual_bytes_read += read_num;
offset += read_num;
}
Expand Down Expand Up @@ -855,7 +864,7 @@ Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
if (is_thread_safe) {
// PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently.
return std::make_shared<io::PrefetchBufferedReader>(
profile, std::move(reader), file_range, io_ctx);
profile, std::move(reader), file_range);
}
}

Expand Down
Loading

0 comments on commit c54a746

Please sign in to comment.