From 09a7633d80785726ee187ad0f43b25febd5addca Mon Sep 17 00:00:00 2001 From: rickyma Date: Wed, 17 Jan 2024 17:19:45 +0800 Subject: [PATCH] [#1464] fix(client): Improve the error log message for checkBlockSendResult --- .../spark/shuffle/writer/RssShuffleWriter.java | 7 +++++-- .../spark/shuffle/writer/RssShuffleWriter.java | 8 ++++++-- .../apache/uniffle/common/ShuffleServerInfo.java | 13 ++----------- .../uniffle/common/ShuffleServerInfoTest.java | 12 ++---------- 4 files changed, 15 insertions(+), 25 deletions(-) diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 4c9a62d1c0..62e37dcb4b 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -364,7 +364,9 @@ protected void sendCommit() { protected void checkBlockSendResult(Set blockIds) { long start = System.currentTimeMillis(); while (true) { - Set failedBlockIds = shuffleManager.getFailedBlockIds(taskId); + Map> failedBlockIdsWithShuffleServer = + shuffleManager.getFailedBlockIdsWithShuffleServer(taskId); + Set failedBlockIds = failedBlockIdsWithShuffleServer.keySet(); Set successBlockIds = shuffleManager.getSuccessBlockIds(taskId); // if failed when send data to shuffle server, mark task as failed if (failedBlockIds.size() > 0) { @@ -373,7 +375,8 @@ protected void checkBlockSendResult(Set blockIds) { + taskId + "] failed because " + failedBlockIds.size() - + " blocks can't be sent to shuffle server."; + + " blocks can't be sent to shuffle server: " + + failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet()); LOG.error(errorMsg); throw new RssSendFailedException(errorMsg); } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 2c0977071e..100c7f092a 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import scala.Function1; import scala.Option; @@ -388,7 +389,9 @@ protected void checkBlockSendResult(Set blockIds) { } private void checkIfBlocksFailed() { - Set failedBlockIds = shuffleManager.getFailedBlockIds(taskId); + Map> failedBlockIdsWithShuffleServer = + shuffleManager.getFailedBlockIdsWithShuffleServer(taskId); + Set failedBlockIds = failedBlockIdsWithShuffleServer.keySet(); if (!failedBlockIds.isEmpty()) { String errorMsg = "Send failed: Task[" @@ -396,7 +399,8 @@ private void checkIfBlocksFailed() { + "]" + " failed because " + failedBlockIds.size() - + " blocks can't be sent to shuffle server."; + + " blocks can't be sent to shuffle server: " + + failedBlockIdsWithShuffleServer.values().stream().collect(Collectors.toSet()); LOG.error(errorMsg); throw new RssSendFailedException(errorMsg); } diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java index bfe99eafaf..5b9a6fbe2e 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java @@ -91,9 +91,7 @@ public boolean equals(Object obj) { @Override public String toString() { if (nettyPort > 0) { - return "ShuffleServerInfo{id[" - + id - + "], host[" + return "ShuffleServerInfo{host[" + host + "]," + " grpc port[" @@ -102,14 +100,7 @@ public String toString() { + nettyPort + "]}"; } else { - return "ShuffleServerInfo{id[" - + id - + "], host[" - + host - + "]," - + " grpc port[" - + grpcPort - + "]}"; + return "ShuffleServerInfo{host[" + host + "]," + " grpc port[" + grpcPort + "]}"; } } diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java index 850a3bac91..feba4a4f57 100644 --- a/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java +++ b/common/src/test/java/org/apache/uniffle/common/ShuffleServerInfoTest.java @@ -47,19 +47,11 @@ public void testEquals() { public void testToString() { ShuffleServerInfo info = new ShuffleServerInfo("1", "localhost", 1234); assertEquals( - "ShuffleServerInfo{id[" - + info.getId() - + "], host[" - + info.getHost() - + "], grpc port[" - + info.getGrpcPort() - + "]}", + "ShuffleServerInfo{host[" + info.getHost() + "], grpc port[" + info.getGrpcPort() + "]}", info.toString()); ShuffleServerInfo newInfo = new ShuffleServerInfo("1", "localhost", 1234, 5678); assertEquals( - "ShuffleServerInfo{id[" - + info.getId() - + "], host[" + "ShuffleServerInfo{host[" + newInfo.getHost() + "], grpc port[" + newInfo.getGrpcPort()