From c6d9ee9ffb4721ed1a04c7f68bf872abcc3d83f7 Mon Sep 17 00:00:00 2001 From: Bin Fan Date: Fri, 26 May 2023 10:31:41 +0800 Subject: [PATCH 1/2] Fix a bug that creates 0 byte block file mistakenly --- .../java/alluxio/worker/block/MonoBlockStore.java | 14 +++++++++----- .../alluxio/underfs/hdfs/HdfsUnderFileSystem.java | 6 +++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java index 26b2757c885c..3b2f4a17e568 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java @@ -183,18 +183,18 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset, positionShort, options); BlockReader blockReader = new DelegatingBlockReader(reader, - () -> closeUfsBlock(sessionId, blockId)); + () -> closeUfsBlock(sessionId, blockId, true)); Metrics.WORKER_ACTIVE_CLIENTS.inc(); return blockReader; } catch (Exception e) { try { - closeUfsBlock(sessionId, blockId); + closeUfsBlock(sessionId, blockId, false); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); } String errorMessage = format("Failed to read from UFS, sessionId=%d, " + "blockId=%d, offset=%d, positionShort=%s, options=%s: %s", - sessionId, blockId, offset, positionShort, options, e); + sessionId, blockId, offset, positionShort, options, e.toString()); if (e instanceof FileNotFoundException) { throw new NotFoundException(errorMessage, e); } @@ -202,13 +202,17 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse } } - private void closeUfsBlock(long sessionId, long blockId) + private void closeUfsBlock(long sessionId, long blockId, boolean successful) throws IOException { try { mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { - commitBlock(sessionId, blockId, false); + if (successful) { + commitBlock(sessionId, blockId, false); + } else { + abortBlock(sessionId, blockId); + } } else { // When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH. // Counter will not be decrement in the commitblock(). diff --git a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java index 2c876fa0d979..8367724fa6b2 100755 --- a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java +++ b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java @@ -684,7 +684,7 @@ public InputStream open(String path, OpenOptions options) throws IOException { LOG.debug("Using original API to HDFS"); return new HdfsUnderFileInputStream(inputStream); } catch (IOException e) { - LOG.warn("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); + LOG.debug("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); te = e; if (options.getRecoverFailedOpen() && dfs != null && e.getMessage().toLowerCase() .startsWith("cannot obtain block length for")) { @@ -711,6 +711,10 @@ public InputStream open(String path, OpenOptions options) throws IOException { } } } + if (te != null) { + LOG.error("{} failed attempts to open \"{}\" with last error:", + retryPolicy.getAttemptCount(), path, te); + } throw te; } From 173d64bf47abbc3fcecf2cd5129123f7f0329e84 Mon Sep 17 00:00:00 2001 From: Beinan Date: Thu, 25 May 2023 20:58:28 -0700 Subject: [PATCH 2/2] Refine code to pass the check of findbugs --- .../main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java index 8367724fa6b2..aa672d4dd248 100755 --- a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java +++ b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java @@ -714,8 +714,9 @@ public InputStream open(String path, OpenOptions options) throws IOException { if (te != null) { LOG.error("{} failed attempts to open \"{}\" with last error:", retryPolicy.getAttemptCount(), path, te); + throw te; } - throw te; + throw new IllegalStateException("Exceeded the number of retry attempts with no exception"); } @Override