Skip to content

Commit 51ccdfa

Browse files
branch-3.0: [enhancement](schema-change) Cloud schema change do clean up when job failed apache#48426 (apache#48897)
Cherry-picked from apache#48426 Co-authored-by: Siyang Tang <tangsiyang2001@foxmail.com>
1 parent cd4b866 commit 51ccdfa

File tree

5 files changed

+68
-63
lines changed

5 files changed

+68
-63
lines changed

be/src/agent/task_worker_pool.cpp

+37-43
Original file line numberDiff line numberDiff line change
@@ -268,59 +268,53 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age
268268
// Do not need to adjust delete success or not
269269
// Because if delete failed create rollup will failed
270270
TTabletId new_tablet_id = 0;
271-
if (status.ok()) {
272-
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
273-
auto mem_tracker = MemTrackerLimiter::create_shared(
274-
MemTrackerLimiter::Type::SCHEMA_CHANGE,
275-
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
276-
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
277-
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
278-
engine.memory_limitation_bytes_per_thread_for_schema_change()));
279-
SCOPED_ATTACH_TASK(mem_tracker);
280-
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
281-
Status res = Status::OK();
282-
try {
283-
LOG_INFO("start {}", process_name)
284-
.tag("signature", agent_task_req.signature)
285-
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
286-
.tag("new_tablet_id", new_tablet_id)
287-
.tag("mem_limit",
288-
engine.memory_limitation_bytes_per_thread_for_schema_change());
289-
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
290-
CloudSchemaChangeJob job(engine,
291-
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
292-
agent_task_req.alter_tablet_req_v2.expiration);
293-
status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
294-
} catch (const Exception& e) {
295-
status = e.to_status();
296-
}
297-
if (!status.ok()) {
298-
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
299-
}
300-
}
271+
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
272+
auto mem_tracker = MemTrackerLimiter::create_shared(
273+
MemTrackerLimiter::Type::SCHEMA_CHANGE,
274+
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
275+
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
276+
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
277+
engine.memory_limitation_bytes_per_thread_for_schema_change()));
278+
SCOPED_ATTACH_TASK(mem_tracker);
279+
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
280+
281+
LOG_INFO("start {}", process_name)
282+
.tag("signature", agent_task_req.signature)
283+
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
284+
.tag("new_tablet_id", new_tablet_id)
285+
.tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change());
286+
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
287+
CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
288+
agent_task_req.alter_tablet_req_v2.expiration);
289+
status = [&]() {
290+
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
291+
job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
292+
[&](const doris::Exception& ex) {
293+
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
294+
job.clean_up_on_failed();
295+
});
296+
return Status::OK();
297+
}();
301298

302299
if (status.ok()) {
303300
increase_report_version();
301+
LOG_INFO("successfully {}", process_name)
302+
.tag("signature", agent_task_req.signature)
303+
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
304+
.tag("new_tablet_id", new_tablet_id);
305+
} else {
306+
LOG_WARNING("failed to {}", process_name)
307+
.tag("signature", agent_task_req.signature)
308+
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
309+
.tag("new_tablet_id", new_tablet_id)
310+
.error(status);
304311
}
305312

306313
// Return result to fe
307314
finish_task_request->__set_backend(BackendOptions::get_local_backend());
308315
finish_task_request->__set_report_version(s_report_version);
309316
finish_task_request->__set_task_type(task_type);
310317
finish_task_request->__set_signature(signature);
311-
312-
if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
313-
LOG_WARNING("failed to {}", process_name)
314-
.tag("signature", agent_task_req.signature)
315-
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
316-
.tag("new_tablet_id", new_tablet_id)
317-
.error(status);
318-
} else {
319-
LOG_INFO("successfully {}", process_name)
320-
.tag("signature", agent_task_req.signature)
321-
.tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
322-
.tag("new_tablet_id", new_tablet_id);
323-
}
324318
finish_task_request->__set_task_status(status.to_thrift());
325319
}
326320

be/src/cloud/cloud_schema_change_job.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -495,4 +495,16 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
495495
return Status::OK();
496496
}
497497

498+
void CloudSchemaChangeJob::clean_up_on_failed() {
499+
for (const auto& output_rs : _output_rowsets) {
500+
if (output_rs.use_count() > 2) {
501+
LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << " has "
502+
<< output_rs.use_count()
503+
<< " references. File Cache won't be recycled when query is using it.";
504+
return;
505+
}
506+
output_rs->clear_cache();
507+
}
508+
}
509+
498510
} // namespace doris

be/src/cloud/cloud_schema_change_job.h

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class CloudSchemaChangeJob {
3636
// This method is idempotent for a same request.
3737
Status process_alter_tablet(const TAlterTabletReqV2& request);
3838

39+
void clean_up_on_failed();
40+
3941
private:
4042
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
4143
cloud::TabletJobInfoPB& job);

be/src/cloud/cloud_tablet.cpp

+8-20
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "cloud/cloud_meta_mgr.h"
3434
#include "cloud/cloud_storage_engine.h"
3535
#include "cloud/cloud_tablet_mgr.h"
36+
#include "common/config.h"
3637
#include "common/logging.h"
3738
#include "io/cache/block_file_cache_downloader.h"
3839
#include "io/cache/block_file_cache_factory.h"
@@ -452,27 +453,14 @@ void CloudTablet::clear_cache() {
452453
}
453454

454455
void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
455-
for (auto& rs : rowsets) {
456-
// Clear cached opened segments and inverted index cache in memory
457-
rs->clear_cache();
458-
}
459-
460-
if (config::enable_file_cache) {
461-
for (const auto& rs : rowsets) {
462-
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
463-
if (rs.use_count() > 2) {
464-
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
465-
<< rs.use_count()
466-
<< " references. File Cache won't be recycled when query is using it.";
467-
continue;
468-
}
469-
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
470-
// TODO: Segment::file_cache_key
471-
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
472-
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
473-
file_cache->remove_if_cached_async(file_key);
474-
}
456+
for (const auto& rs : rowsets) {
457+
// rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
458+
if (rs.use_count() > 2) {
459+
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
460+
<< " references. File Cache won't be recycled when query is using it.";
461+
return;
475462
}
463+
rs->clear_cache();
476464
}
477465
}
478466

be/src/olap/rowset/rowset.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include <gen_cpp/olap_file.pb.h>
2121

22+
#include "common/config.h"
23+
#include "io/cache/block_file_cache_factory.h"
2224
#include "olap/olap_define.h"
2325
#include "olap/segment_loader.h"
2426
#include "olap/tablet_schema.h"
@@ -120,6 +122,13 @@ void Rowset::clear_cache() {
120122
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
121123
clear_inverted_index_cache();
122124
}
125+
if (config::enable_file_cache) {
126+
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
127+
auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
128+
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
129+
file_cache->remove_if_cached_async(file_key);
130+
}
131+
}
123132
}
124133

125134
Result<std::string> Rowset::segment_path(int64_t seg_id) {

0 commit comments

Comments
 (0)