Skip to content

Commit b58b9e4

Browse files
authored
(cloud-merge) Support shadow tablet to do cumulative compaction in cloud mode (#37293)
In cloud mode, when do schema change, shadow tablet encounters -235 because it cant do cumulative compaction in the case of a large number of loads. And it will prevents the user from continuing to loads. Implementation details: 1. When start schema change, record the end convert rowset version `alter_version` into SchemaChangeJob. 2. For origin tablet, only can do base compaction in [0, `alter_version`] and do cumulative compaction in (`alter_version`, N]. can not do compaction across `alter_verison` such as compaction [a, `alter_version` + n]. 3. For shadow tablet, cannot do base compaction and and do cumulative compaction in (`alter_version`, N]. 4. When the schema change failed because FE or BE coredump, it will retry. When retry the schema change, it will get the `alter_version` from meta_serive, and continue to do it. 5. When finish the schema change job or cancel it, we need to clear the schema change job. Before this pr, it will cover by next schema change.
1 parent 2459a3e commit b58b9e4

39 files changed

+3419
-142
lines changed

be/src/cloud/cloud_base_compaction.cpp

+37-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "service/backend_options.h"
3030
#include "util/thread.h"
3131
#include "util/uuid_generator.h"
32+
#include "vec/runtime/vdatetime_value.h"
3233

3334
namespace doris {
3435
using namespace ErrorCode;
@@ -82,21 +83,40 @@ Status CloudBaseCompaction::prepare_compact() {
8283
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
8384
compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
8485
compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
86+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
87+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
8588
using namespace std::chrono;
8689
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
8790
_expiration = now + config::compaction_timeout_seconds;
8891
compaction_job->set_expiration(_expiration);
8992
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
9093
cloud::StartTabletJobResponse resp;
91-
//auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
9294
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
95+
if (resp.has_alter_version()) {
96+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
97+
}
9398
if (!st.ok()) {
9499
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
95100
// set last_sync_time to 0 to force sync tablet next time
96101
cloud_tablet()->last_sync_time_s = 0;
97102
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
98103
// tablet not found
99104
cloud_tablet()->clear_cache();
105+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
106+
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
107+
std::stringstream ss;
108+
ss << "failed to prepare cumu compaction. Check compaction input versions "
109+
"failed in schema change. The input version end must "
110+
"less than or equal to alter_version."
111+
"current alter version in BE is not correct."
112+
"input_version_start="
113+
<< compaction_job->input_versions(0)
114+
<< " input_version_end=" << compaction_job->input_versions(1)
115+
<< " current alter_version=" << cloud_tablet->alter_version()
116+
<< " schema_change_alter_version=" << resp.alter_version();
117+
std::string msg = ss.str();
118+
LOG(WARNING) << msg;
119+
return Status::InternalError(msg);
100120
}
101121
return st;
102122
}
@@ -314,6 +334,22 @@ Status CloudBaseCompaction::modify_rowsets() {
314334
if (!st.ok()) {
315335
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
316336
cloud_tablet()->clear_cache();
337+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
338+
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
339+
std::stringstream ss;
340+
ss << "failed to prepare cumu compaction. Check compaction input versions "
341+
"failed in schema change. The input version end must "
342+
"less than or equal to alter_version."
343+
"current alter version in BE is not correct."
344+
"input_version_start="
345+
<< compaction_job->input_versions(0)
346+
<< " input_version_end=" << compaction_job->input_versions(1)
347+
<< " current alter_version=" << cloud_tablet->alter_version()
348+
<< " schema_change_alter_version=" << resp.alter_version();
349+
std::string msg = ss.str();
350+
LOG(WARNING) << msg;
351+
cloud_tablet->set_alter_version(resp.alter_version());
352+
return Status::InternalError(msg);
317353
}
318354
return st;
319355
}

be/src/cloud/cloud_cumulative_compaction.cpp

+38-8
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
4848
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
4949

5050
Status CloudCumulativeCompaction::prepare_compact() {
51-
if (_tablet->tablet_state() != TABLET_RUNNING) {
51+
if (_tablet->tablet_state() != TABLET_RUNNING &&
52+
(!config::enable_new_tablet_do_compaction ||
53+
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
5254
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
5355
}
5456

@@ -110,11 +112,11 @@ Status CloudCumulativeCompaction::prepare_compact() {
110112
_expiration = now + config::compaction_timeout_seconds;
111113
compaction_job->set_expiration(_expiration);
112114
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
113-
if (config::enable_parallel_cumu_compaction) {
114-
// Set input version range to let meta-service judge version range conflict
115-
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
116-
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
117-
}
115+
116+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
117+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
118+
// Set input version range to let meta-service check version range conflict
119+
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
118120
cloud::StartTabletJobResponse resp;
119121
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
120122
if (!st.ok()) {
@@ -141,6 +143,18 @@ Status CloudCumulativeCompaction::prepare_compact() {
141143
.tag("msg", resp.status().msg());
142144
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
143145
}
146+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
147+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
148+
std::stringstream ss;
149+
ss << "failed to prepare cumu compaction. Check compaction input versions "
150+
"failed in schema change. "
151+
"input_version_start="
152+
<< compaction_job->input_versions(0)
153+
<< " input_version_end=" << compaction_job->input_versions(1)
154+
<< " schema_change_alter_version=" << resp.alter_version();
155+
std::string msg = ss.str();
156+
LOG(WARNING) << msg;
157+
return Status::InternalError(msg);
144158
}
145159
return st;
146160
}
@@ -256,12 +270,27 @@ Status CloudCumulativeCompaction::modify_rowsets() {
256270

257271
cloud::FinishTabletJobResponse resp;
258272
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
273+
if (resp.has_alter_version()) {
274+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
275+
}
259276
if (!st.ok()) {
260277
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
261278
cloud_tablet()->clear_cache();
279+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
280+
std::stringstream ss;
281+
ss << "failed to prepare cumu compaction. Check compaction input versions "
282+
"failed in schema change. "
283+
"input_version_start="
284+
<< compaction_job->input_versions(0)
285+
<< " input_version_end=" << compaction_job->input_versions(1)
286+
<< " schema_change_alter_version=" << resp.alter_version();
287+
std::string msg = ss.str();
288+
LOG(WARNING) << msg;
289+
return Status::InternalError(msg);
262290
}
263291
return st;
264292
}
293+
265294
auto& stats = resp.stats();
266295
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
267296
{
@@ -344,8 +373,9 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
344373
std::shared_lock rlock(_tablet->get_header_lock());
345374
_base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
346375
_cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt();
347-
int64_t candidate_version =
348-
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1);
376+
int64_t candidate_version = std::max(
377+
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1),
378+
cloud_tablet()->alter_version() + 1);
349379
// Get all rowsets whose version >= `candidate_version` as candidate rowsets
350380
cloud_tablet()->traverse_rowsets(
351381
[&candidate_rowsets, candidate_version](const RowsetSharedPtr& rs) {

be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp

+10-8
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
154154
};
155155
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
156156
auto sync_st = tablet->sync_rowsets();
157-
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
158-
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
159-
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
160-
"tablet_id: "
161-
<< _tablet_id << " txn_id: " << _transaction_id
162-
<< ", request_version=" << _version;
163-
return sync_st;
164-
}
165157
if (!sync_st.ok()) {
166158
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
167159
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
168160
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
169161
return sync_st;
170162
}
163+
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
164+
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
165+
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
166+
"tablet_id: "
167+
<< _tablet_id << " txn_id: " << _transaction_id
168+
<< ", request_version=" << _version;
169+
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
170+
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
171+
tablet->tablet_id());
172+
}
171173
}
172174
auto sync_rowset_time_us = MonotonicMicros() - t2;
173175
max_version = tablet->max_version_unlocked();

be/src/cloud/cloud_meta_mgr.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
448448
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
449449
tablet->last_sync_time_s = now;
450450

451-
if (tablet->enable_unique_key_merge_on_write()) {
451+
// If is mow, the tablet has no delete bitmap in base rowsets.
452+
// So dont need to sync it.
453+
if (tablet->enable_unique_key_merge_on_write() &&
454+
tablet->tablet_state() == TABLET_RUNNING) {
452455
DeleteBitmap delete_bitmap(tablet_id);
453456
int64_t old_max_version = req.start_version() - 1;
454457
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),

0 commit comments

Comments
 (0)