diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java index 1c13bd27..bfa887fa 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java @@ -323,6 +323,7 @@ private boolean asyncFlush() throws Exception { stopScheduler(); LOG.info(String.format("Async stream load: db[%s] table[%s] rows[%d] bytes[%d] label[%s].", flushData.getDatabase(), flushData.getTable(), flushData.getBatchCount(), flushData.getBatchSize(), flushData.getLabel())); long startWithRetries = System.nanoTime(); + Exception firstException = null; for (int i = 0; i <= sinkOptions.getSinkMaxRetries(); i++) { try { long start = System.nanoTime(); @@ -349,8 +350,12 @@ private boolean asyncFlush() throws Exception { totalFlushFailedTimes.inc(); } LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e); + if (firstException == null) { + firstException = e; + } + if (i >= sinkOptions.getSinkMaxRetries()) { - throw e; + throw firstException; } if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException)e).needReCreateLabel()) { String oldLabel = flushData.getLabel(); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 1d6626b6..fa6ce88d 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -72,6 +72,9 @@ enum State { private volatile int numRetries; private volatile long lastFailTimeMs; + // First exception if retry many times + private volatile Throwable firstException; + public TransactionTableRegion(String uniqueKey, String database, String table, @@ -270,10 +273,17 @@ public boolean commit() { @Override public void fail(Throwable e) { + if (firstException == null) { + firstException = e; + } + if (numRetries >= maxRetries || !isRetryable(e)) { - manager.callback(e); + LOG.error("Failed to flush data for db: {}, table: {} after {} times retry, the last exception is", + database, table, numRetries, e); + manager.callback(firstException); return; } + responseFuture = null; numRetries += 1; lastFailTimeMs = System.currentTimeMillis(); @@ -291,6 +301,7 @@ public void complete(StreamLoadResponse response) { response.setFlushRows(chunk.numRows()); manager.callback(response); numRetries = 0; + firstException = null; LOG.info("Stream load flushed, db: {}, table: {}, label : {}", database, table, label); if (!inactiveChunks.isEmpty()) {