Skip to content

Commit ff30024

Browse files
LchangliangTangSiyang2001
authored andcommitted
(cloud-merge) Support shadow tablet to do cumulative compaction in cloud mode (apache#37293)
In cloud mode, when do schema change, shadow tablet encounters -235 because it cant do cumulative compaction in the case of a large number of loads. And it will prevents the user from continuing to loads. Implementation details: 1. When start schema change, record the end convert rowset version `alter_version` into SchemaChangeJob. 2. For origin tablet, only can do base compaction in [0, `alter_version`] and do cumulative compaction in (`alter_version`, N]. can not do compaction across `alter_verison` such as compaction [a, `alter_version` + n]. 3. For shadow tablet, cannot do base compaction and and do cumulative compaction in (`alter_version`, N]. 4. When the schema change failed because FE or BE coredump, it will retry. When retry the schema change, it will get the `alter_version` from meta_serive, and continue to do it. 5. When finish the schema change job or cancel it, we need to clear the schema change job. Before this pr, it will cover by next schema change.
1 parent d3d3584 commit ff30024

39 files changed

+3419
-142
lines changed

be/src/cloud/cloud_base_compaction.cpp

+37-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "service/backend_options.h"
3030
#include "util/thread.h"
3131
#include "util/uuid_generator.h"
32+
#include "vec/runtime/vdatetime_value.h"
3233

3334
namespace doris {
3435
using namespace ErrorCode;
@@ -82,21 +83,40 @@ Status CloudBaseCompaction::prepare_compact() {
8283
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
8384
compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
8485
compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
86+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
87+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
8588
using namespace std::chrono;
8689
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
8790
_expiration = now + config::compaction_timeout_seconds;
8891
compaction_job->set_expiration(_expiration);
8992
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
9093
cloud::StartTabletJobResponse resp;
91-
//auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
9294
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
95+
if (resp.has_alter_version()) {
96+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
97+
}
9398
if (!st.ok()) {
9499
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
95100
// set last_sync_time to 0 to force sync tablet next time
96101
cloud_tablet()->last_sync_time_s = 0;
97102
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
98103
// tablet not found
99104
cloud_tablet()->clear_cache();
105+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
106+
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
107+
std::stringstream ss;
108+
ss << "failed to prepare cumu compaction. Check compaction input versions "
109+
"failed in schema change. The input version end must "
110+
"less than or equal to alter_version."
111+
"current alter version in BE is not correct."
112+
"input_version_start="
113+
<< compaction_job->input_versions(0)
114+
<< " input_version_end=" << compaction_job->input_versions(1)
115+
<< " current alter_version=" << cloud_tablet->alter_version()
116+
<< " schema_change_alter_version=" << resp.alter_version();
117+
std::string msg = ss.str();
118+
LOG(WARNING) << msg;
119+
return Status::InternalError(msg);
100120
}
101121
return st;
102122
}
@@ -320,6 +340,22 @@ Status CloudBaseCompaction::modify_rowsets() {
320340
if (!st.ok()) {
321341
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
322342
cloud_tablet()->clear_cache();
343+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
344+
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
345+
std::stringstream ss;
346+
ss << "failed to prepare cumu compaction. Check compaction input versions "
347+
"failed in schema change. The input version end must "
348+
"less than or equal to alter_version."
349+
"current alter version in BE is not correct."
350+
"input_version_start="
351+
<< compaction_job->input_versions(0)
352+
<< " input_version_end=" << compaction_job->input_versions(1)
353+
<< " current alter_version=" << cloud_tablet->alter_version()
354+
<< " schema_change_alter_version=" << resp.alter_version();
355+
std::string msg = ss.str();
356+
LOG(WARNING) << msg;
357+
cloud_tablet->set_alter_version(resp.alter_version());
358+
return Status::InternalError(msg);
323359
}
324360
return st;
325361
}

be/src/cloud/cloud_cumulative_compaction.cpp

+38-8
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
4848
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
4949

5050
Status CloudCumulativeCompaction::prepare_compact() {
51-
if (_tablet->tablet_state() != TABLET_RUNNING) {
51+
if (_tablet->tablet_state() != TABLET_RUNNING &&
52+
(!config::enable_new_tablet_do_compaction ||
53+
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
5254
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
5355
}
5456

@@ -110,11 +112,11 @@ Status CloudCumulativeCompaction::prepare_compact() {
110112
_expiration = now + config::compaction_timeout_seconds;
111113
compaction_job->set_expiration(_expiration);
112114
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
113-
if (config::enable_parallel_cumu_compaction) {
114-
// Set input version range to let meta-service judge version range conflict
115-
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
116-
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
117-
}
115+
116+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
117+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
118+
// Set input version range to let meta-service check version range conflict
119+
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
118120
cloud::StartTabletJobResponse resp;
119121
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
120122
if (!st.ok()) {
@@ -141,6 +143,18 @@ Status CloudCumulativeCompaction::prepare_compact() {
141143
.tag("msg", resp.status().msg());
142144
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
143145
}
146+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
147+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
148+
std::stringstream ss;
149+
ss << "failed to prepare cumu compaction. Check compaction input versions "
150+
"failed in schema change. "
151+
"input_version_start="
152+
<< compaction_job->input_versions(0)
153+
<< " input_version_end=" << compaction_job->input_versions(1)
154+
<< " schema_change_alter_version=" << resp.alter_version();
155+
std::string msg = ss.str();
156+
LOG(WARNING) << msg;
157+
return Status::InternalError(msg);
144158
}
145159
return st;
146160
}
@@ -262,12 +276,27 @@ Status CloudCumulativeCompaction::modify_rowsets() {
262276

263277
cloud::FinishTabletJobResponse resp;
264278
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
279+
if (resp.has_alter_version()) {
280+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
281+
}
265282
if (!st.ok()) {
266283
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
267284
cloud_tablet()->clear_cache();
285+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
286+
std::stringstream ss;
287+
ss << "failed to prepare cumu compaction. Check compaction input versions "
288+
"failed in schema change. "
289+
"input_version_start="
290+
<< compaction_job->input_versions(0)
291+
<< " input_version_end=" << compaction_job->input_versions(1)
292+
<< " schema_change_alter_version=" << resp.alter_version();
293+
std::string msg = ss.str();
294+
LOG(WARNING) << msg;
295+
return Status::InternalError(msg);
268296
}
269297
return st;
270298
}
299+
271300
auto& stats = resp.stats();
272301
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
273302
{
@@ -350,8 +379,9 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
350379
std::shared_lock rlock(_tablet->get_header_lock());
351380
_base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
352381
_cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt();
353-
int64_t candidate_version =
354-
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1);
382+
int64_t candidate_version = std::max(
383+
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1),
384+
cloud_tablet()->alter_version() + 1);
355385
// Get all rowsets whose version >= `candidate_version` as candidate rowsets
356386
cloud_tablet()->traverse_rowsets(
357387
[&candidate_rowsets, candidate_version](const RowsetSharedPtr& rs) {

be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp

+10-8
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
154154
};
155155
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
156156
auto sync_st = tablet->sync_rowsets();
157-
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
158-
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
159-
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
160-
"tablet_id: "
161-
<< _tablet_id << " txn_id: " << _transaction_id
162-
<< ", request_version=" << _version;
163-
return sync_st;
164-
}
165157
if (!sync_st.ok()) {
166158
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
167159
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
168160
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
169161
return sync_st;
170162
}
163+
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
164+
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
165+
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
166+
"tablet_id: "
167+
<< _tablet_id << " txn_id: " << _transaction_id
168+
<< ", request_version=" << _version;
169+
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
170+
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
171+
tablet->tablet_id());
172+
}
171173
}
172174
auto sync_rowset_time_us = MonotonicMicros() - t2;
173175
max_version = tablet->max_version_unlocked();

be/src/cloud/cloud_meta_mgr.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
451451
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
452452
tablet->last_sync_time_s = now;
453453

454-
if (tablet->enable_unique_key_merge_on_write()) {
454+
// If is mow, the tablet has no delete bitmap in base rowsets.
455+
// So dont need to sync it.
456+
if (tablet->enable_unique_key_merge_on_write() &&
457+
tablet->tablet_state() == TABLET_RUNNING) {
455458
DeleteBitmap delete_bitmap(tablet_id);
456459
int64_t old_max_version = req.start_version() - 1;
457460
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),

0 commit comments

Comments
 (0)