Skip to content

Commit

Permalink
(cloud-merge) Fix the coredump because of change_cache_type to origin…
Browse files Browse the repository at this point in the history
…_type (apache#38518)

When call method `change_cache_type_by_mgr`, if the cache_type is same
as change_type, it will return true directly and execute the next codes
and break the old assumptions.
  • Loading branch information
Lchangliang authored and feiniaofeiafei committed Aug 9, 2024
1 parent 297cd67 commit 057106b
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 30 deletions.
66 changes: 42 additions & 24 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
DCHECK(!file_blocks.empty());
// change to ttl if the blocks aren't ttl
if (context.cache_type == FileCacheType::TTL && _key_to_time.find(hash) == _key_to_time.end()) {
for (auto& [_, cell] : file_blocks) {
Status st = cell.file_block->update_expiration_time(context.expiration_time);
if (!st.ok()) {
LOG_WARNING("Failed to change key meta").error(st);
}
}
for (auto& [_, cell] : file_blocks) {
FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
Expand All @@ -295,9 +301,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
} else {
cell.queue_iterator.reset();
}
st = cell.file_block->update_expiration_time(context.expiration_time);
}
if (!st.ok()) {
} else {
LOG_WARNING("Failed to change key meta").error(st);
}
}
Expand All @@ -324,7 +328,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
}
if (context.expiration_time == 0) {
for (auto& [_, cell] : file_blocks) {
if (cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL)) {
auto cache_type = cell.file_block->cache_type();
if (cache_type != FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
Expand All @@ -333,6 +340,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
cell.queue_iterator =
queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
} else {
LOG_WARNING("Failed to change key meta").error(st);
}
}
_key_to_time.erase(iter);
Expand Down Expand Up @@ -681,22 +690,30 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha
<< ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);

auto& offsets = _files[hash];
DCHECK((context.expiration_time == 0 && context.cache_type != FileCacheType::TTL) ||
(context.cache_type == FileCacheType::TTL && context.expiration_time != 0))
<< fmt::format("expiration time {}, cache type {}", context.expiration_time,
context.cache_type);

FileCacheKey key;
key.hash = hash;
key.offset = offset;
key.meta.type = context.cache_type;
key.meta.expiration_time = context.expiration_time;
FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
if (context.cache_type != FileCacheType::TTL || config::enable_ttl_cache_evict_using_lru) {
auto& queue = get_queue(context.cache_type);
Status st;
if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
} else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
}
if (!st.ok()) {
LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
<< " cache_type=" << cache_type_to_string(context.cache_type)
<< " error=" << st.msg();
}
if (cell.file_block->cache_type() != FileCacheType::TTL ||
config::enable_ttl_cache_evict_using_lru) {
auto& queue = get_queue(cell.file_block->cache_type());
cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
}
if (context.cache_type == FileCacheType::TTL) {
if (cell.file_block->cache_type() == FileCacheType::TTL) {
if (_key_to_time.find(hash) == _key_to_time.end()) {
_key_to_time[hash] = context.expiration_time;
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
Expand Down Expand Up @@ -1005,19 +1022,18 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b
}
}
for (auto& [_, cell] : _files[file_key]) {
if (cell.file_block->cache_type() == FileCacheType::TTL) {
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
}
auto& queue = get_queue(FileCacheType::NORMAL);
cell.queue_iterator = queue.add(
cell.file_block->get_hash_value(), cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
} else {
LOG_WARNING("Failed to change cache type to normal").error(st);
if (cell.file_block->cache_type() == FileCacheType::NORMAL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
}
auto& queue = get_queue(FileCacheType::NORMAL);
cell.queue_iterator =
queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(),
cell.file_block->range().size(), cache_lock);
} else {
LOG_WARNING("Failed to change cache type to normal").error(st);
}
}
} else {
Expand Down Expand Up @@ -1579,6 +1595,7 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
for (auto& [_, cell] : _files[hash]) {
Status st = cell.file_block->update_expiration_time(new_expiration_time);
if (!st.ok()) {
LOG_WARNING("Failed to modify expiration time").error(st);
}
}

Expand All @@ -1588,12 +1605,13 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
if (auto iter = _files.find(hash); iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
Status st = cell.file_block->update_expiration_time(new_expiration_time);
if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
for (auto& [_, cell] : iter->second) {
FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
Expand Down
14 changes: 9 additions & 5 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <thread>

#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"

namespace doris {
Expand Down Expand Up @@ -162,14 +163,14 @@ Status FileBlock::read(Slice buffer, size_t read_offset) {

Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) {
std::lock_guard block_lock(_mutex);
if (new_type == _key.meta.type) {
return Status::OK();
}
DCHECK(new_type != _key.meta.type);
if (_download_state == State::DOWNLOADED) {
KeyMeta new_meta;
new_meta.expiration_time = _key.meta.expiration_time;
new_meta.type = new_type;
RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta));
auto st = _mgr->_storage->change_key_meta(_key, new_meta);
TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
if (!st.ok()) return st;
}
_key.meta.type = new_type;
return Status::OK();
Expand Down Expand Up @@ -198,7 +199,10 @@ Status FileBlock::update_expiration_time(uint64_t expiration_time) {
KeyMeta new_meta;
new_meta.expiration_time = expiration_time;
new_meta.type = _key.meta.type;
RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta));
auto st = _mgr->_storage->change_key_meta(_key, new_meta);
if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
return st;
}
}
_key.meta.expiration_time = expiration_time;
return Status::OK();
Expand Down
Loading

0 comments on commit 057106b

Please sign in to comment.