Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](group commit) make get column function more reliable when replaying wal #28900

Merged
merged 5 commits into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions be/src/olap/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,17 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
if (!st.ok()) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
}
RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
auto get_st = _get_column_info(_db_id, _table_id);
if (!get_st.ok()) {
if (get_st.is<ErrorCode::NOT_FOUND>()) {
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
_replay_wal_map.erase(wal);
}
RETURN_IF_ERROR(_delete_wal(wal_id));
}
return get_st;
}
#endif
RETURN_IF_ERROR(_send_request(wal_id, wal, label));
return Status::OK();
Expand Down Expand Up @@ -354,8 +364,7 @@ Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std
}
} else {
LOG(INFO) << "success to replay wal =" << wal;
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
RETURN_IF_ERROR(_delete_wal(wal_id));
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.erase(wal)) {
LOG(INFO) << "erase " << wal << " from _replay_wal_map";
Expand Down Expand Up @@ -414,26 +423,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
[&request, &result](FrontendServiceConnection& client) {
client->getColumnInfo(result, request);
}));
std::string columns_str = result.column_info;
std::vector<std::string> column_element;
doris::vectorized::WalReader::string_split(columns_str, ",", column_element);
status = Status::create(result.status);
if (!status.ok()) {
return status;
}
std::vector<TColumnInfo> column_element = result.columns;
int64_t column_index = 1;
_column_id_name_map.clear();
_column_id_index_map.clear();
for (auto column : column_element) {
auto pos = column.find(":");
try {
auto column_name = column.substr(0, pos);
int64_t column_id = std::strtoll(column.substr(pos + 1).c_str(), NULL, 10);
_column_id_name_map.emplace(column_id, column_name);
_column_id_index_map.emplace(column_id, column_index);
column_index++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
auto column_name = column.columnName;
auto column_id = column.columnId;
_column_id_name_map.emplace(column_id, column_name);
_column_id_index_map.emplace(column_id, column_index);
column_index++;
}

status = Status::create(result.status);
}
return status;
}
Expand All @@ -447,4 +451,10 @@ Status WalTable::_read_wal_header(const std::string& wal_path, std::string& colu
return Status::OK();
}

Status WalTable::_delete_wal(int64_t wal_id) {
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
return Status::OK();
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/wal_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class WalTable {
Status _read_wal_header(const std::string& wal, std::string& columns);
bool _need_replay(const replay_wal_info& info);
Status _replay_wal_internal(const std::string& wal);
Status _delete_wal(int64_t wal_id);

private:
ExecEnv* _exec_env;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.doris.thrift.TCheckAuthResult;
import org.apache.doris.thrift.TColumnDef;
import org.apache.doris.thrift.TColumnDesc;
import org.apache.doris.thrift.TColumnInfo;
import org.apache.doris.thrift.TCommitTxnRequest;
import org.apache.doris.thrift.TCommitTxnResult;
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
Expand Down Expand Up @@ -240,7 +241,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -3280,40 +3280,38 @@ private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp)
@Override
public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
TGetColumnInfoResult result = new TGetColumnInfoResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
long dbId = request.getDbId();
long tableId = request.getTableId();
if (!Env.getCurrentEnv().isMaster()) {
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG);
return result;
}

Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
result.setStatus(errorStatus);
status.setStatusCode(TStatusCode.NOT_FOUND);
status.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
return result;
}

Table table;
try {
table = db.getTable(tableId).get();
} catch (NoSuchElementException e) {
errorStatus.setErrorMsgs(
Table table = db.getTableNullable(tableId);
if (table == null) {
status.setStatusCode(TStatusCode.NOT_FOUND);
status.setErrorMsgs(
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
result.setStatus(errorStatus);
return result;
}
StringBuilder sb = new StringBuilder();
List<TColumnInfo> columnsResult = Lists.newArrayList();
for (Column column : table.getBaseSchema(true)) {
sb.append(column.getName() + ":" + column.getUniqueId() + ",");
final TColumnInfo info = new TColumnInfo();
info.setColumnName(column.getName());
info.setColumnId(column.getUniqueId());
columnsResult.add(info);
}
String columnInfo = sb.toString();
columnInfo = columnInfo.substring(0, columnInfo.length() - 1);
result.setStatus(new TStatus(TStatusCode.OK));
result.setColumnInfo(columnInfo);
result.setColumns(columnsResult);
return result;
}

Expand Down
7 changes: 6 additions & 1 deletion gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1302,14 +1302,19 @@ struct TGetBackendMetaResult {
3: optional Types.TNetworkAddress master_address
}

struct TColumnInfo {
1: optional string columnName
2: optional i64 columnId
}

struct TGetColumnInfoRequest {
1: optional i64 db_id
2: optional i64 table_id
}

struct TGetColumnInfoResult {
1: optional Status.TStatus status
2: optional string column_info
2: optional list<TColumnInfo> columns
}

service FrontendService {
Expand Down