Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](schema-change) Cloud schema change do clean up when job failed #48426

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 37 additions & 43 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,59 +268,53 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age
// Do not need to adjust delete success or not
// Because if delete failed create rollup will failed
TTabletId new_tablet_id = 0;
if (status.ok()) {
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
engine.memory_limitation_bytes_per_thread_for_schema_change()));
SCOPED_ATTACH_TASK(mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = Status::OK();
try {
LOG_INFO("start {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.tag("mem_limit",
engine.memory_limitation_bytes_per_thread_for_schema_change());
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
CloudSchemaChangeJob job(engine,
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
agent_task_req.alter_tablet_req_v2.expiration);
status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
} catch (const Exception& e) {
status = e.to_status();
}
if (!status.ok()) {
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
}
}
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
engine.memory_limitation_bytes_per_thread_for_schema_change()));
SCOPED_ATTACH_TASK(mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);

LOG_INFO("start {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change());
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
agent_task_req.alter_tablet_req_v2.expiration);
status = [&]() {
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
[&](const doris::Exception& ex) {
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
job.clean_up_on_failed();
});
return Status::OK();
}();

if (status.ok()) {
increase_report_version();
LOG_INFO("successfully {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id);
} else {
LOG_WARNING("failed to {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.error(status);
}

// Return result to fe
finish_task_request->__set_backend(BackendOptions::get_local_backend());
finish_task_request->__set_report_version(s_report_version);
finish_task_request->__set_task_type(task_type);
finish_task_request->__set_signature(signature);

if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
LOG_WARNING("failed to {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id)
.error(status);
} else {
LOG_INFO("successfully {}", process_name)
.tag("signature", agent_task_req.signature)
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
.tag("new_tablet_id", new_tablet_id);
}
finish_task_request->__set_task_status(status.to_thrift());
}

Expand Down
12 changes: 12 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,16 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
return Status::OK();
}

void CloudSchemaChangeJob::clean_up_on_failed() {
for (const auto& output_rs : _output_rowsets) {
if (output_rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << " has "
<< output_rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
return;
}
output_rs->clear_cache();
}
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_schema_change_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class CloudSchemaChangeJob {
// This method is idempotent for a same request.
Status process_alter_tablet(const TAlterTabletReqV2& request);

void clean_up_on_failed();

private:
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
cloud::TabletJobInfoPB& job);
Expand Down
28 changes: 8 additions & 20 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/config.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
Expand Down Expand Up @@ -452,27 +453,14 @@ void CloudTablet::clear_cache() {
}

void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
for (auto& rs : rowsets) {
// Clear cached opened segments and inverted index cache in memory
rs->clear_cache();
}

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
continue;
}
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
// TODO: Segment::file_cache_key
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached_async(file_key);
}
for (const auto& rs : rowsets) {
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
if (rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
return;
}
rs->clear_cache();
}
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <gen_cpp/olap_file.pb.h>

#include "common/config.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/olap_define.h"
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -118,6 +120,13 @@ void Rowset::clear_cache() {
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
clear_inverted_index_cache();
}
if (config::enable_file_cache) {
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached_async(file_key);
}
}
}

Result<std::string> Rowset::segment_path(int64_t seg_id) {
Expand Down
Loading