Skip to content

Commit 2c970d9

Browse files
committed
fix
1 parent 272e687 commit 2c970d9

File tree

3 files changed

+61
-11
lines changed

3 files changed

+61
-11
lines changed

be/src/runtime/group_commit_mgr.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,13 @@ Status GroupCommitTable::get_first_block_load_queue(
260260
DCHECK(table_id == _table_id);
261261
std::unique_lock l(_lock);
262262
auto try_to_get_matched_queue = [&]() -> Status {
263+
DBUG_EXECUTE_IF("GroupCommitTable.get_first_block_load_queue.can_not_get_a_block_queue", {
264+
if (dp->param<int64_t>("table_id", -1) == table_id) {
265+
LOG(INFO) << "debug promise set: can not get a block queue for table=" << table_id;
266+
return Status::InternalError<false>("can not get a block queue for table_id: " +
267+
std::to_string(table_id));
268+
}
269+
};);
263270
for (const auto& [_, inner_block_queue] : _load_block_queues) {
264271
if (inner_block_queue->contain_load_id(load_id)) {
265272
load_block_queue = inner_block_queue;

fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java

+33
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,13 @@
7272
import org.apache.thrift.TException;
7373
import org.apache.thrift.TSerializer;
7474

75+
import java.net.HttpURLConnection;
76+
import java.net.URL;
7577
import java.util.ArrayList;
7678
import java.util.Collection;
7779
import java.util.List;
7880
import java.util.Map;
81+
import java.util.Map.Entry;
7982
import java.util.Optional;
8083
import java.util.concurrent.ExecutionException;
8184
import java.util.concurrent.Future;
@@ -328,4 +331,34 @@ private void setReturnInfo(ConnectContext ctx, boolean reuse, PGroupCommitInsert
328331
// update it, so that user can get loaded rows in fe.audit.log
329332
ctx.updateReturnRows((int) loadedRows);
330333
}
334+
335+
public static void removeBeDebugPoint() {
336+
try {
337+
for (Entry<Long, Backend> entry : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().entrySet()) {
338+
Backend backend = entry.getValue();
339+
if (!backend.isAlive()) {
340+
continue;
341+
}
342+
String host = backend.getHost();
343+
int port = backend.getBePort();
344+
345+
String urlStr = String.format(
346+
"http://%s:%d/api/debug_point/remove/GroupCommitTable.get_first_block_load_queue.can_not_get_a_block_queue",
347+
host, port);
348+
URL url = new URL(urlStr);
349+
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
350+
try {
351+
conn.setRequestMethod("POST");
352+
int responseCode = conn.getResponseCode();
353+
LOG.warn("remove debug point for backend {}:{}, response code: {}", host, port, responseCode);
354+
} catch (Exception e) {
355+
LOG.warn("failed to remove debug point for backend {}:{}", host, port, e);
356+
} finally {
357+
conn.disconnect();
358+
}
359+
}
360+
} catch (Exception e) {
361+
LOG.warn("failed to remove debug point for backends", e);
362+
}
363+
}
331364
}

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

+21-11
Original file line numberDiff line numberDiff line change
@@ -2443,23 +2443,33 @@ private void handleInsertStmt() throws Exception {
24432443
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows);
24442444
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
24452445
ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList();
2446-
if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0)
2447-
.contains("schema version not match")) {
2446+
boolean schemaVersionNotMatch = code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty()
2447+
&& errorMsgsList.get(0).contains("schema version not match");
2448+
boolean canNotGetBlockQueue = code != TStatusCode.OK && !errorMsgsList.isEmpty() && errorMsgsList.get(0)
2449+
.contains("can not get a block queue");
2450+
if (schemaVersionNotMatch || canNotGetBlockQueue) {
24482451
LOG.info("group commit insert failed. stmt: {}, query_id: {}, db_id: {}, table_id: {}"
24492452
+ ", schema version: {}, backend_id: {}, status: {}, retry: {}",
24502453
insertStmt.getOrigStmt().originStmt, DebugUtil.printId(context.queryId()), dbId, tableId,
24512454
nativeInsertStmt.getBaseSchemaVersion(), groupCommitPlanner.getBackendId(),
24522455
response.getStatus(), i);
24532456
if (i < maxRetry) {
2454-
List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable());
2455-
tables.sort((Comparator.comparing(TableIf::getId)));
2456-
MetaLockUtils.readLockTables(tables);
2457-
try {
2458-
insertStmt.reset();
2459-
analyzer = new Analyzer(context.getEnv(), context);
2460-
analyzeAndGenerateQueryPlan(context.getSessionVariable().toThrift());
2461-
} finally {
2462-
MetaLockUtils.readUnlockTables(tables);
2457+
if (schemaVersionNotMatch) {
2458+
List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable());
2459+
tables.sort((Comparator.comparing(TableIf::getId)));
2460+
MetaLockUtils.readLockTables(tables);
2461+
try {
2462+
insertStmt.reset();
2463+
analyzer = new Analyzer(context.getEnv(), context);
2464+
analyzeAndGenerateQueryPlan(context.getSessionVariable().toThrift());
2465+
} finally {
2466+
MetaLockUtils.readUnlockTables(tables);
2467+
}
2468+
}
2469+
long debugTableId = DebugPointUtil.getDebugParamOrDefault("GroupCommitInsert.retry", 0);
2470+
if (debugTableId == tableId) {
2471+
// Thread.sleep(10000);
2472+
GroupCommitPlanner.removeBeDebugPoint();
24632473
}
24642474
continue;
24652475
} else {

0 commit comments

Comments
 (0)