Skip to content

Commit 65ae146

Browse files
committed
tmp
1 parent 84c862f commit 65ae146

16 files changed

+339
-36
lines changed

be/src/cloud/cloud_base_compaction.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ Status CloudBaseCompaction::prepare_compact() {
9292
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
9393
cloud::StartTabletJobResponse resp;
9494
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
95+
if (resp.has_alter_version()) {
96+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
97+
}
9598
if (!st.ok()) {
9699
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
97100
// set last_sync_time to 0 to force sync tablet next time
@@ -113,7 +116,6 @@ Status CloudBaseCompaction::prepare_compact() {
113116
<< " schema_change_alter_version=" << resp.alter_version();
114117
std::string msg = ss.str();
115118
LOG(WARNING) << msg;
116-
cloud_tablet->set_alter_version(resp.alter_version());
117119
return Status::InternalError(msg);
118120
}
119121
return st;

be/src/cloud/cloud_cumulative_compaction.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {
270270

271271
cloud::FinishTabletJobResponse resp;
272272
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
273+
if (resp.has_alter_version()) {
274+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
275+
}
273276
if (!st.ok()) {
274277
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
275278
cloud_tablet()->clear_cache();
276279
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
277-
(dynamic_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
278280
std::stringstream ss;
279281
ss << "failed to prepare cumu compaction. Check compaction input versions "
280282
"failed in schema change. "
@@ -288,6 +290,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
288290
}
289291
return st;
290292
}
293+
291294
auto& stats = resp.stats();
292295
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
293296
{

cloud/src/meta-service/meta_service_job.cpp

+25-18
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,10 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
919919
txn->put(job_key, job_val);
920920
INSTANCE_LOG(INFO) << "remove compaction job tabelt_id=" << tablet_id
921921
<< " key=" << hex(job_key);
922-
922+
response->set_alter_version(recorded_job.has_schema_change() &&
923+
recorded_job.schema_change().has_alter_version()
924+
? recorded_job.schema_change().alter_version()
925+
: -1);
923926
need_commit = true;
924927
}
925928

@@ -1007,9 +1010,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
10071010
}
10081011

10091012
// MUST check initiator to let the retried BE commit this schema_change job.
1010-
if (request->action() == FinishTabletJobRequest::COMMIT &&
1011-
(schema_change.id() != recorded_schema_change.id() ||
1012-
schema_change.initiator() != recorded_schema_change.initiator())) {
1013+
if (schema_change.id() != recorded_schema_change.id() ||
1014+
schema_change.initiator() != recorded_schema_change.initiator()) {
10131015
SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id()
10141016
<< " given_id=" << schema_change.id()
10151017
<< " recorded_job=" << proto_to_json(recorded_schema_change)
@@ -1031,21 +1033,22 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
10311033
{instance_id, new_table_id, new_index_id, new_partition_id, new_tablet_id});
10321034

10331035
std::string new_tablet_job_val;
1036+
TabletJobInfoPB new_recorded_job;
10341037
err = txn->get(new_tablet_job_key, &new_tablet_job_val);
1035-
if (err != TxnErrorCode::TXN_OK) {
1036-
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
1038+
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1039+
SS << "internal error,"
10371040
<< " instance_id=" << instance_id << " tablet_id=" << new_tablet_id
10381041
<< " job=" << proto_to_json(request->job()) << " err=" << err;
10391042
msg = ss.str();
10401043
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
10411044
: cast_as<ErrCategory::READ>(err);
10421045
return;
1043-
}
1044-
TabletJobInfoPB new_recorded_job;
1045-
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
1046-
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
1047-
msg = "malformed new tablet recorded job";
1048-
return;
1046+
} else if (err == TxnErrorCode::TXN_OK) {
1047+
if (!new_recorded_job.ParseFromString(new_tablet_job_val)) {
1048+
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
1049+
msg = "malformed new tablet recorded job";
1050+
return;
1051+
}
10491052
}
10501053

10511054
//==========================================================================
@@ -1058,11 +1061,13 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
10581061
recorded_schema_change.new_tablet_idx().tablet_id()) {
10591062
// remove schema change
10601063
recorded_job.clear_schema_change();
1061-
new_recorded_job.clear_schema_change();
10621064
auto job_val = recorded_job.SerializeAsString();
1063-
new_tablet_job_val = new_recorded_job.SerializeAsString();
10641065
txn->put(job_key, job_val);
1065-
txn->put(new_tablet_job_key, new_tablet_job_val);
1066+
if (!new_tablet_job_val.empty()) {
1067+
new_recorded_job.clear_schema_change();
1068+
new_tablet_job_val = new_recorded_job.SerializeAsString();
1069+
txn->put(new_tablet_job_key, new_tablet_job_val);
1070+
}
10661071
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
10671072
<< " key=" << hex(job_key);
10681073

@@ -1226,11 +1231,13 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
12261231
// remove schema_change job
12271232
//==========================================================================
12281233
recorded_job.clear_schema_change();
1229-
new_recorded_job.clear_schema_change();
12301234
auto job_val = recorded_job.SerializeAsString();
12311235
txn->put(job_key, job_val);
1232-
new_tablet_job_val = new_recorded_job.SerializeAsString();
1233-
txn->put(new_tablet_job_key, new_tablet_job_val);
1236+
if (!new_tablet_job_val.empty()) {
1237+
new_recorded_job.clear_schema_change();
1238+
new_tablet_job_val = new_recorded_job.SerializeAsString();
1239+
txn->put(new_tablet_job_key, new_tablet_job_val);
1240+
}
12341241
INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id
12351242
<< " key=" << hex(job_key);
12361243

cloud/test/meta_service_job_test.cpp

+7-3
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,11 @@ TEST(MetaServiceJobTest, ProcessSchemaChangeArguments) {
687687
recorded_sc->set_id("sc1");
688688
recorded_sc->set_initiator("BE1");
689689
job_val = recorded_job.SerializeAsString();
690+
auto new_job_key =
691+
job_tablet_key({instance_id, table_id, new_index_id, partition_id, new_tablet_id});
690692
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
691693
txn->put(job_key, job_val);
694+
txn->put(new_job_key, job_val);
692695
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
693696
meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
694697
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg();
@@ -2342,12 +2345,12 @@ TEST(MetaServiceJobTest, DoCompactionWhenSC) {
23422345
StartTabletJobResponse res;
23432346
start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
23442347
TabletCompactionJobPB::CUMULATIVE, res, {7, 10});
2345-
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL);
2348+
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION);
23462349
res.Clear();
23472350

23482351
start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
23492352
TabletCompactionJobPB::BASE, res, {0, 10});
2350-
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION_FAIL);
2353+
ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_CHECK_ALTER_VERSION);
23512354
res.Clear();
23522355

23532356
start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 7,
@@ -2499,7 +2502,8 @@ TEST(MetaServiceJobTest, CancelSC) {
24992502
FinishTabletJobResponse finish_res;
25002503
finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job_sc", "BE1", {},
25012504
finish_res, FinishTabletJobRequest::ABORT);
2502-
ASSERT_EQ(finish_res.status().code(), MetaServiceCode::OK);
2505+
ASSERT_NE(finish_res.status().msg().find("unmatched job id or initiator"),
2506+
std::string::npos);
25032507
}
25042508
{
25052509
std::unique_ptr<Transaction> txn;

fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ protected void onCancel() {
131131
Long rollupTabletId = tabletEntry.getKey();
132132
Long baseTabletId = tabletEntry.getValue();
133133
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
134-
.removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId,
134+
.removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId,
135135
partitionId, baseTabletId, rollupTabletId);
136136
}
137-
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms." +
138-
"dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}",
137+
LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."
138+
+ "dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}",
139139
dbId, tableId, rollupIndexId, partitionId, rollupTabletIdToBaseTabletId.size());
140140
}
141141
break;

fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,11 @@ protected void onCancel() {
134134
Long shadowTabletId = entry.getKey();
135135
Long originTabletId = entry.getValue();
136136
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
137-
.removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId,
137+
.removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId,
138138
partitionId, originTabletId, shadowTabletId);
139139
}
140-
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms." +
141-
"dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",
140+
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms."
141+
+ "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",
142142
dbId, tableId, originIndexId, partitionId, shadowTabletIdToOriginTabletId.size());
143143
}
144144
break;

fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -874,8 +874,8 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long ne
874874
newtabletIndexPBBuilder.setTabletId(newTabletId);
875875
final Cloud.TabletIndexPB newtabletIndex = newtabletIndexPBBuilder.build();
876876
schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex);
877-
final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
878-
schemaChangeJobPBBuilder.build();
877+
final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb =
878+
schemaChangeJobPBBuilder.build();
879879

880880
tabletJobInfoPBBuilder.setSchemaChange(tabletSchemaChangeJobPb);
881881

gensrc/proto/cloud.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,7 @@ enum MetaServiceCode {
13401340
JOB_ALREADY_SUCCESS = 5002;
13411341
ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
13421342
ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
1343-
JOB_CHECK_ALTER_VERSION_FAIL = 5005;
1343+
JOB_CHECK_ALTER_VERSION = 5005;
13441344

13451345
// Rate limit
13461346
MAX_QPS_LIMIT = 6001;

regression-test/pipeline/cloud_p0/conf/be_custom.conf

+1
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ save_load_error_log_to_s3 = true
3333
enable_stream_load_record = true
3434
stream_load_record_batch_size = 500
3535
webserver_num_workers = 128
36+
enable_new_tablet_do_compaction = true

regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ suite('test_schema_change_with_compaction10') {
2525
options.cloudMode = true
2626
options.enableDebugPoints()
2727
options.beConfigs += [ "enable_java_support=false" ]
28+
options.beConfigs += [ "enable_new_tablet_do_compaction=true" ]
2829
options.beConfigs += [ "disable_auto_compaction=true" ]
2930
options.beNum = 1
3031
docker(options) {

0 commit comments

Comments
 (0)