From 0c5cf688690bacc662a015b1bff0ecb556314b90 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 4 Dec 2023 20:29:55 +0530 Subject: [PATCH] TEZ-4522: Use OpenFile where FileStatus is available. (#318) (Ayush Saxena reviewed by Laszlo Bodor) --- .../org/apache/tez/common/TezCommonUtils.java | 6 ++++-- .../org/apache/tez/dag/app/RecoveryParser.java | 8 ++++---- .../mapreduce/split/SplitMetaInfoReaderTez.java | 3 ++- .../library/common/sort/impl/TezSpillRecord.java | 16 +++++++--------- .../org/apache/tez/tools/TFileRecordReader.java | 8 +++++--- 5 files changed, 22 insertions(+), 19 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index dd28eed395..28799c1192 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -293,9 +294,10 @@ public static Path getSummaryRecoveryPath(Path attemptRecoverPath) { public static void mkDirForAM(FileSystem fs, Path dir) throws IOException { FsPermission perm = new FsPermission(TEZ_AM_DIR_PERMISSION); fs.mkdirs(dir, perm); - if (!fs.getFileStatus(dir).getPermission().equals(perm)) { + FileStatus fileStatus = fs.getFileStatus(dir); + if (!fileStatus.getPermission().equals(perm)) { LOG.warn("Directory " + dir.toString() + " created with unexpected permissions : " - + fs.getFileStatus(dir).getPermission() + ". Fixing permissions to correct value : " + + fileStatus.getPermission() + ". Fixing permissions to correct value : " + perm.toString()); fs.setPermission(dir, perm); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 656f38fb10..0f40700cf3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.FutureIO; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezCommonUtils; @@ -430,12 +431,11 @@ private Path getSummaryPath(Path attemptRecoveryDataDir) { return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir); } - private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException { + private FSDataInputStream getSummaryStream(Path summaryPath, FileStatus summaryFileStatus) throws IOException { try { - return recoveryFS.open(summaryPath, recoveryBufferSize); + return FutureIO.awaitFuture(recoveryFS.openFile(summaryPath).withFileStatus(summaryFileStatus).build()); } catch (FileNotFoundException fnf) { return null; - } } @@ -667,7 +667,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException { + ", len=" + summaryFileStatus.getLen() + ", lastModTime=" + summaryFileStatus.getModificationTime()); FSDataInputStream summaryStream = getSummaryStream( - summaryFile); + summaryFile, summaryFileStatus); while (true) { RecoveryProtos.SummaryEventProto proto; try { diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java index 394c871ab9..d69d21127b 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.util.functional.FutureIO; import org.apache.tez.common.MRFrameworkConfigs; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -78,7 +79,7 @@ private static FSDataInputStream getFSDataIS(Configuration conf, throw new IOException("Split metadata size exceeded " + maxMetaInfoSize + ". Aborting job "); } - in = fs.open(metaSplitFile); + in = FutureIO.awaitFuture(fs.openFile(metaSplitFile).withFileStatus(fStatus).build()); byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length]; in.readFully(header); if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java index 1c9edeead7..feed70f496 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java @@ -28,11 +28,13 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.PureJavaCrc32; +import org.apache.hadoop.util.functional.FutureIO; import org.apache.tez.runtime.library.common.Constants; public class TezSpillRecord { @@ -66,11 +68,10 @@ public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc, String expectedIndexOwner) throws IOException { - final FSDataInputStream in = rfs.open(indexFileName); - try { - final long length = rfs.getFileStatus(indexFileName).getLen(); - final int partitions = - (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; + FileStatus fileStatus = rfs.getFileStatus(indexFileName); + final long length = fileStatus.getLen(); + try (FSDataInputStream in = FutureIO.awaitFuture(rfs.openFile(indexFileName).withFileStatus(fileStatus).build())) { + final int partitions = (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; buf = ByteBuffer.allocate(size); @@ -79,15 +80,12 @@ public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc, CheckedInputStream chk = new CheckedInputStream(in, crc); IOUtils.readFully(chk, buf.array(), 0, size); if (chk.getChecksum().getValue() != in.readLong()) { - throw new ChecksumException("Checksum error reading spill index: " + - indexFileName, -1); + throw new ChecksumException("Checksum error reading spill index: " + indexFileName, -1); } } else { IOUtils.readFully(in, buf.array(), 0, size); } entries = buf.asLongBuffer(); - } finally { - in.close(); } } diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java index 41744676e7..90dc729628 100644 --- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java +++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.functional.FutureIO; import java.io.BufferedReader; import java.io.EOFException; @@ -74,9 +76,9 @@ public class TFileRecordReader extends RecordReader { FileSystem fs = fileSplit.getPath().getFileSystem(context.getConfiguration()); splitPath = fileSplit.getPath(); - fin = fs.open(splitPath); - reader = new TFile.Reader(fin, fs.getFileStatus(splitPath).getLen(), - context.getConfiguration()); + FileStatus fileStatus = fs.getFileStatus(splitPath); + fin = FutureIO.awaitFuture(fs.openFile(splitPath).withFileStatus(fileStatus).build()); + reader = new TFile.Reader(fin, fileStatus.getLen(), context.getConfiguration()); scanner = reader.createScannerByByteRange(start, fileSplit.getLength()); }