Skip to content

Commit 145683c

Browse files
authored
[improvement](group commit) make get column function more reliable when replaying wal (#28900)
1 parent 7107415 commit 145683c

File tree

4 files changed

+53
-39
lines changed

4 files changed

+53
-39
lines changed

be/src/olap/wal_table.cpp

+28-18
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,17 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
204204
if (!st.ok()) {
205205
LOG(WARNING) << "abort txn " << wal_id << " fail";
206206
}
207-
RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
207+
auto get_st = _get_column_info(_db_id, _table_id);
208+
if (!get_st.ok()) {
209+
if (get_st.is<ErrorCode::NOT_FOUND>()) {
210+
{
211+
std::lock_guard<std::mutex> lock(_replay_wal_lock);
212+
_replay_wal_map.erase(wal);
213+
}
214+
RETURN_IF_ERROR(_delete_wal(wal_id));
215+
}
216+
return get_st;
217+
}
208218
#endif
209219
RETURN_IF_ERROR(_send_request(wal_id, wal, label));
210220
return Status::OK();
@@ -354,8 +364,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
354364
}
355365
} else {
356366
LOG(INFO) << "success to replay wal =" << wal;
357-
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
358-
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
367+
RETURN_IF_ERROR(_delete_wal(wal_id));
359368
std::lock_guard<std::mutex> lock(_replay_wal_lock);
360369
if (_replay_wal_map.erase(wal)) {
361370
LOG(INFO) << "erase " << wal << " from _replay_wal_map";
@@ -414,26 +423,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
414423
[&request, &result](FrontendServiceConnection& client) {
415424
client->getColumnInfo(result, request);
416425
}));
417-
std::string columns_str = result.column_info;
418-
std::vector<std::string> column_element;
419-
doris::vectorized::WalReader::string_split(columns_str, ",", column_element);
426+
status = Status::create(result.status);
427+
if (!status.ok()) {
428+
return status;
429+
}
430+
std::vector<TColumnInfo> column_element = result.columns;
420431
int64_t column_index = 1;
421432
_column_id_name_map.clear();
422433
_column_id_index_map.clear();
423434
for (auto column : column_element) {
424-
auto pos = column.find(":");
425-
try {
426-
auto column_name = column.substr(0, pos);
427-
int64_t column_id = std::strtoll(column.substr(pos + 1).c_str(), NULL, 10);
428-
_column_id_name_map.emplace(column_id, column_name);
429-
_column_id_index_map.emplace(column_id, column_index);
430-
column_index++;
431-
} catch (const std::invalid_argument& e) {
432-
return Status::InvalidArgument("Invalid format, {}", e.what());
433-
}
435+
auto column_name = column.columnName;
436+
auto column_id = column.columnId;
437+
_column_id_name_map.emplace(column_id, column_name);
438+
_column_id_index_map.emplace(column_id, column_index);
439+
column_index++;
434440
}
435-
436-
status = Status::create(result.status);
437441
}
438442
return status;
439443
}
@@ -447,4 +451,10 @@ Status WalTable::_read_wal_header(const std::string& wal_path, std::string& colu
447451
return Status::OK();
448452
}
449453

454+
Status WalTable::_delete_wal(int64_t wal_id) {
455+
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
456+
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
457+
return Status::OK();
458+
}
459+
450460
} // namespace doris

be/src/olap/wal_table.h

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class WalTable {
5050
Status _read_wal_header(const std::string& wal, std::string& columns);
5151
bool _need_replay(const replay_wal_info& info);
5252
Status _replay_wal_internal(const std::string& wal);
53+
Status _delete_wal(int64_t wal_id);
5354

5455
private:
5556
ExecEnv* _exec_env;

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

+18-20
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.apache.doris.thrift.TCheckAuthResult;
115115
import org.apache.doris.thrift.TColumnDef;
116116
import org.apache.doris.thrift.TColumnDesc;
117+
import org.apache.doris.thrift.TColumnInfo;
117118
import org.apache.doris.thrift.TCommitTxnRequest;
118119
import org.apache.doris.thrift.TCommitTxnResult;
119120
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
@@ -240,7 +241,6 @@
240241
import java.util.HashSet;
241242
import java.util.List;
242243
import java.util.Map;
243-
import java.util.NoSuchElementException;
244244
import java.util.Set;
245245
import java.util.concurrent.ConcurrentHashMap;
246246
import java.util.concurrent.ExecutionException;
@@ -3280,40 +3280,38 @@ private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp)
32803280
@Override
32813281
public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
32823282
TGetColumnInfoResult result = new TGetColumnInfoResult();
3283-
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
3283+
TStatus status = new TStatus(TStatusCode.OK);
3284+
result.setStatus(status);
32843285
long dbId = request.getDbId();
32853286
long tableId = request.getTableId();
32863287
if (!Env.getCurrentEnv().isMaster()) {
3287-
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
3288-
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
3288+
status.setStatusCode(TStatusCode.NOT_MASTER);
3289+
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
32893290
LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG);
32903291
return result;
32913292
}
32923293

3293-
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
3294+
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
32943295
if (db == null) {
3295-
errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
3296-
result.setStatus(errorStatus);
3296+
status.setStatusCode(TStatusCode.NOT_FOUND);
3297+
status.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
32973298
return result;
32983299
}
3299-
3300-
Table table;
3301-
try {
3302-
table = db.getTable(tableId).get();
3303-
} catch (NoSuchElementException e) {
3304-
errorStatus.setErrorMsgs(
3300+
Table table = db.getTableNullable(tableId);
3301+
if (table == null) {
3302+
status.setStatusCode(TStatusCode.NOT_FOUND);
3303+
status.setErrorMsgs(
33053304
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
3306-
result.setStatus(errorStatus);
33073305
return result;
33083306
}
3309-
StringBuilder sb = new StringBuilder();
3307+
List<TColumnInfo> columnsResult = Lists.newArrayList();
33103308
for (Column column : table.getBaseSchema(true)) {
3311-
sb.append(column.getName() + ":" + column.getUniqueId() + ",");
3309+
final TColumnInfo info = new TColumnInfo();
3310+
info.setColumnName(column.getName());
3311+
info.setColumnId(column.getUniqueId());
3312+
columnsResult.add(info);
33123313
}
3313-
String columnInfo = sb.toString();
3314-
columnInfo = columnInfo.substring(0, columnInfo.length() - 1);
3315-
result.setStatus(new TStatus(TStatusCode.OK));
3316-
result.setColumnInfo(columnInfo);
3314+
result.setColumns(columnsResult);
33173315
return result;
33183316
}
33193317

gensrc/thrift/FrontendService.thrift

+6-1
Original file line numberDiff line numberDiff line change
@@ -1302,14 +1302,19 @@ struct TGetBackendMetaResult {
13021302
3: optional Types.TNetworkAddress master_address
13031303
}
13041304

1305+
struct TColumnInfo {
1306+
1: optional string columnName
1307+
2: optional i64 columnId
1308+
}
1309+
13051310
struct TGetColumnInfoRequest {
13061311
1: optional i64 db_id
13071312
2: optional i64 table_id
13081313
}
13091314

13101315
struct TGetColumnInfoResult {
13111316
1: optional Status.TStatus status
1312-
2: optional string column_info
1317+
2: optional list<TColumnInfo> columns
13131318
}
13141319

13151320
service FrontendService {

0 commit comments

Comments
 (0)